Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python, rust): cdc write-support for overwrite and replacewhere writes #2722

Merged
merged 3 commits into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 16 additions & 48 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ use datafusion_physical_expr::{
PhysicalExpr,
};
use futures::future::BoxFuture;
use std::iter;
use std::sync::Arc;
use std::time::{Instant, SystemTime, UNIX_EPOCH};

Expand All @@ -42,7 +41,7 @@ use serde::Serialize;
use super::cdc::should_write_cdc;
use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, PROTOCOL};
use super::write::{write_execution_plan_cdc, WriterStatsConfig};
use super::write::{execute_non_empty_expr_cdc, WriterStatsConfig};

use crate::delta_datafusion::expr::fmt_expr_to_sql;
use crate::delta_datafusion::{
Expand Down Expand Up @@ -201,53 +200,22 @@ async fn excute_non_empty_expr(
}

// CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
if let Ok(true) = should_write_cdc(&snapshot) {
// Create CDC scan
let cdc_predicate_expr = state.create_physical_expr(expression.clone(), &input_dfschema)?;
let cdc_scan: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?);

// Add literal column "_change_type"
let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string())));
let change_type_expr = state.create_physical_expr(change_type_lit, &input_dfschema)?;

// Project columns and lit
let project_expressions: Vec<(Arc<dyn PhysicalExpr>, String)> = scan
.schema()
.fields()
.into_iter()
.enumerate()
.map(|(idx, field)| -> (Arc<dyn PhysicalExpr>, String) {
(
Arc::new(expressions::Column::new(field.name(), idx)),
field.name().to_owned(),
)
})
.chain(iter::once((change_type_expr, "_change_type".to_owned())))
.collect();

let projected_scan: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
project_expressions,
cdc_scan.clone(),
)?);

let cdc_actions = write_execution_plan_cdc(
Some(snapshot),
state.clone(),
projected_scan.clone(),
table_partition_cols.clone(),
log_store.object_store(),
Some(snapshot.table_config().target_file_size() as usize),
None,
writer_properties,
false,
Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col
writer_stats_config,
None,
)
.await?;
if let Some(cdc_actions) = execute_non_empty_expr_cdc(
snapshot,
log_store,
state.clone(),
scan,
input_dfschema,
expression,
table_partition_cols,
writer_properties,
writer_stats_config,
)
.await?
{
actions.extend(cdc_actions)
};
}

Ok(actions)
}

Expand Down
219 changes: 180 additions & 39 deletions crates/core/src/operations/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,26 @@ use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::vec;
use std::{iter, vec};

use arrow_array::RecordBatch;
use arrow_cast::can_cast_types;
use arrow_schema::{ArrowError, DataType, Fields, SchemaRef as ArrowSchemaRef};
use datafusion::execution::context::{SessionContext, SessionState, TaskContext};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::physical_plan::projection::ProjectionExec;
use datafusion::physical_plan::{memory::MemoryExec, ExecutionPlan};
use datafusion_common::DFSchema;
use datafusion_expr::Expr;
use datafusion_common::{DFSchema, ScalarValue};
use datafusion_expr::{lit, Expr};
use datafusion_physical_expr::expressions::{self};
use datafusion_physical_expr::PhysicalExpr;
use futures::future::BoxFuture;
use futures::StreamExt;
use object_store::prefix::PrefixStore;
use parquet::file::properties::WriterProperties;
use tracing::log::*;

use super::cdc::should_write_cdc;
use super::datafusion_utils::Expression;
use super::transaction::{CommitBuilder, CommitProperties, TableReference, PROTOCOL};
use super::writer::{DeltaWriter, WriterConfig};
Expand Down Expand Up @@ -560,45 +564,135 @@ async fn execute_non_empty_expr(
rewrite: &[Add],
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
partition_scan: bool,
) -> DeltaResult<Vec<Action>> {
// For each identified file perform a parquet scan + filter + limit (1) + count.
// If returned count is not zero then append the file to be rewritten and removed from the log. Otherwise do nothing to the file.
let mut actions: Vec<Action> = Vec::new();

let input_schema = snapshot.input_schema()?;
let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;

let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state)
.with_files(rewrite)
// Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions
// Since it can't fetch a scalar from a dictionary type
.with_schema(snapshot.input_schema()?)
.build()
.await?;
let scan = Arc::new(scan);

// Apply the negation of the filter and rewrite files
let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone()))));

let predicate_expr = state.create_physical_expr(negated_expression, &input_dfschema)?;
// We don't want to verify the predicate against existing data
if !partition_scan {
// Apply the negation of the filter and rewrite files
let negated_expression = Expr::Not(Box::new(Expr::IsTrue(Box::new(expression.clone()))));

let predicate_expr = state.create_physical_expr(negated_expression, &input_dfschema)?;
let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);

let add_actions: Vec<Action> = write_execution_plan(
Some(snapshot),
state.clone(),
filter,
partition_columns.clone(),
log_store.object_store(),
Some(snapshot.table_config().target_file_size() as usize),
None,
writer_properties.clone(),
false,
None,
writer_stats_config.clone(),
None,
)
.await?;

let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(predicate_expr, scan.clone())?);
actions.extend(add_actions);
}

// We don't want to verify the predicate against existing data
let add_actions = write_execution_plan(
Some(snapshot),
state,
filter,
// CDC logic, simply filters data with predicate and adds the _change_type="delete" as literal column
if let Some(cdc_actions) = execute_non_empty_expr_cdc(
snapshot,
log_store,
state.clone(),
scan,
input_dfschema,
expression,
partition_columns,
log_store.object_store(),
Some(snapshot.table_config().target_file_size() as usize),
None,
writer_properties,
false,
None,
writer_stats_config,
None,
)
.await?;
.await?
{
actions.extend(cdc_actions)
}
Ok(actions)
}

Ok(add_actions)
/// If CDC is enabled it writes all the deletions based on predicate into _change_data directory
#[allow(clippy::too_many_arguments)]
pub(crate) async fn execute_non_empty_expr_cdc(
snapshot: &DeltaTableState,
log_store: LogStoreRef,
state: SessionState,
scan: Arc<crate::delta_datafusion::DeltaScan>,
input_dfschema: DFSchema,
expression: &Expr,
table_partition_cols: Vec<String>,
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Option<Vec<Action>>> {
match should_write_cdc(snapshot) {
// Create CDC scan
Ok(true) => {
let cdc_predicate_expr =
state.create_physical_expr(expression.clone(), &input_dfschema)?;
let cdc_scan: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(cdc_predicate_expr, scan.clone())?);

// Add literal column "_change_type"
let change_type_lit = lit(ScalarValue::Utf8(Some("delete".to_string())));
let change_type_expr = state.create_physical_expr(change_type_lit, &input_dfschema)?;

// Project columns and lit
let project_expressions: Vec<(Arc<dyn PhysicalExpr>, String)> = scan
.schema()
.fields()
.into_iter()
.enumerate()
.map(|(idx, field)| -> (Arc<dyn PhysicalExpr>, String) {
(
Arc::new(expressions::Column::new(field.name(), idx)),
field.name().to_owned(),
)
})
.chain(iter::once((change_type_expr, "_change_type".to_owned())))
.collect();

let projected_scan: Arc<dyn ExecutionPlan> = Arc::new(ProjectionExec::try_new(
project_expressions,
cdc_scan.clone(),
)?);

let cdc_actions = write_execution_plan_cdc(
Some(snapshot),
state.clone(),
projected_scan.clone(),
table_partition_cols.clone(),
log_store.object_store(),
Some(snapshot.table_config().target_file_size() as usize),
None,
writer_properties,
false,
Some(SchemaMode::Overwrite), // If not overwrite, the plan schema is not taken but table schema, however we need the plan schema since it has the _change_type_col
writer_stats_config,
None,
)
.await?;
Ok(Some(cdc_actions))
}
_ => Ok(None),
}
}

// This should only be called wth a valid predicate
Expand All @@ -616,24 +710,20 @@ async fn prepare_predicate_actions(
let candidates =
find_files(snapshot, log_store.clone(), &state, Some(predicate.clone())).await?;

let add = if candidates.partition_scan {
Vec::new()
} else {
execute_non_empty_expr(
snapshot,
log_store,
state,
partition_columns,
&predicate,
&candidates.candidates,
writer_properties,
writer_stats_config,
)
.await?
};
let remove = candidates.candidates;
let mut actions = execute_non_empty_expr(
&snapshot,
log_store,
state,
partition_columns,
&predicate,
&candidates.candidates,
writer_properties,
writer_stats_config,
candidates.partition_scan,
)
.await?;

let mut actions: Vec<Action> = add.into_iter().collect();
let remove = candidates.candidates;

for action in remove {
actions.push(Action::Remove(Remove {
Expand All @@ -652,6 +742,42 @@ async fn prepare_predicate_actions(
Ok(actions)
}

/// If CDC is enabled it writes all add add actions data as deletions into _change_data directory
async fn execute_non_empty_expr_cdc_all_actions(
snapshot: &DeltaTableState,
log_store: LogStoreRef,
state: SessionState,
table_partition_cols: Vec<String>,
writer_properties: Option<WriterProperties>,
writer_stats_config: WriterStatsConfig,
) -> DeltaResult<Option<Vec<Action>>> {
let current_state_add_actions = &snapshot.file_actions()?;
// Since all files get removed, check to write CDC
let scan = DeltaScanBuilder::new(snapshot, log_store.clone(), &state)
.with_files(current_state_add_actions)
// Use input schema which doesn't wrap partition values, otherwise divide_by_partition_value won't work on UTF8 partitions
// Since it can't fetch a scalar from a dictionary type
.with_schema(snapshot.input_schema()?)
.build()
.await?;

let input_schema = snapshot.input_schema()?;
let input_dfschema: DFSchema = input_schema.clone().as_ref().clone().try_into()?;

Ok(execute_non_empty_expr_cdc(
snapshot,
log_store,
state,
scan.into(),
input_dfschema,
&Expr::Literal(ScalarValue::Boolean(Some(true))), // Keep all data
table_partition_cols,
writer_properties,
writer_stats_config,
)
.await?)
}

impl std::future::IntoFuture for WriteBuilder {
type Output = DeltaResult<DeltaTable>;
type IntoFuture = BoxFuture<'static, Self::Output>;
Expand Down Expand Up @@ -961,6 +1087,21 @@ impl std::future::IntoFuture for WriteBuilder {
.into_iter()
.map(|p| p.remove_action(true).into());
actions.extend(remove_actions);

let cdc_actions = execute_non_empty_expr_cdc_all_actions(
snapshot,
this.log_store.clone(),
state,
partition_columns.clone(),
this.writer_properties,
writer_stats_config,
)
.await?;

// ADD CDC ACTIONS HERE
if let Some(cdc_actions) = cdc_actions {
actions.extend(cdc_actions);
}
}
};
}
Expand Down
Loading
Loading