Skip to content

Commit

Permalink
refactor: delete cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Aug 4, 2024
1 parent a118b19 commit 2dd220d
Showing 1 changed file with 16 additions and 48 deletions.
64 changes: 16 additions & 48 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion_physical_expr::{
PhysicalExpr,
};
use futures::future::BoxFuture;
use std::iter;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

Expand All @@ -42,7 +41,7 @@ use serde::Serialize;
use super::cdc::should_write_cdc;
use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use super::write::{write_execution_plan_cdc, WriterStatsConfig};
use super::write::{execute_non_empty_expr_cdc, WriterStatsConfig};

use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
Expand Down Expand Up @@ -201,53 +200,22 @@ async fn excute_non_empty_expr(
}

// CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
if let Ok(true) = should_write_cdc(&snapshot) {
// Create CDC scan
let cdc_predicate_expr = state.create_physical_expr(expression.clone(), &input_dfschema)?;
let cdc_scan: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?);

// Add literal column "_change_type"
let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string())));
let change_type_expr = state.create_physical_expr(change_type_lit, &input_dfschema)?;

// Project columns and lit
let project_expressions: Vec<(Arc<dyn PhysicalExpr>, String)> = scan
.schema()
.fields()
.into_iter()
.enumerate()
.map(|(idx, field)| -> (Arc<dyn PhysicalExpr>, String) {
(
Arc::new(expressions::Column::new(field.name(), idx)),
field.name().to_owned(),
)
})
.chain(iter::once((change_type_expr, "_change_type".to_owned())))
.collect();

let projected_scan: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
project_expressions,
cdc_scan.clone(),
)?);

let cdc_actions = write_execution_plan_cdc(
Some(snapshot),
state.clone(),
projected_scan.clone(),
table_partition_cols.clone(),
log_store.object_store(),
Some(snapshot.table_config().target_file_size() as usize),
None,
writer_properties,
false,
Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col
writer_stats_config,
None,
)
.await?;
if let Some(cdc_actions) = execute_non_empty_expr_cdc(
snapshot,
log_store,
state.clone(),
scan,
input_dfschema,
expression,
table_partition_cols,
writer_properties,
writer_stats_config,
)
.await?
{
actions.extend(cdc_actions)
};
}

Ok(actions)
}

Expand Down

0 comments on commit 2dd220d

Please sign in to comment.