From cfdb9dbf233b2df4c8413fd3d738e92a2404a8a8 Mon Sep 17 00:00:00 2001 From: emcake <3726783+emcake@users.noreply.github.com> Date: Mon, 26 Feb 2024 22:57:49 +0000 Subject: [PATCH 1/2] perf: directly create projection instead of using DataFrame::with_column --- crates/core/src/operations/merge/mod.rs | 49 ++++++++++++++++++------- 1 file changed, 36 insertions(+), 13 deletions(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 636ed6f66d..4cff745172 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1184,7 +1184,7 @@ async fn execute( let projection = join.with_column(OPERATION_COLUMN, case)?; - let mut new_columns = projection; + let mut new_columns = vec![]; let mut write_projection = Vec::new(); for delta_field in snapshot.schema().fields() { @@ -1223,7 +1223,8 @@ async fn execute( Expr::Column(Column::from_qualified_name_ignore_case(name.clone())) .alias(delta_field.name()), ); - new_columns = new_columns.with_column(&name, case)?; + //new_columns = new_columns.with_column(&name, case)?; + new_columns.push((name, case)); } let mut insert_when = Vec::with_capacity(ops.len()); @@ -1299,18 +1300,40 @@ async fn execute( .end() } - new_columns = new_columns.with_column(DELETE_COLUMN, build_case(delete_when, delete_then)?)?; - new_columns = - new_columns.with_column(TARGET_INSERT_COLUMN, build_case(insert_when, insert_then)?)?; - new_columns = - new_columns.with_column(TARGET_UPDATE_COLUMN, build_case(update_when, update_then)?)?; - new_columns = new_columns.with_column( - TARGET_DELETE_COLUMN, + new_columns.push(( + DELETE_COLUMN.to_owned(), + build_case(delete_when, delete_then)?, + )); + new_columns.push(( + TARGET_INSERT_COLUMN.to_owned(), + build_case(insert_when, insert_then)?, + )); + new_columns.push(( + TARGET_UPDATE_COLUMN.to_owned(), + build_case(update_when, update_then)?, + )); + new_columns.push(( + TARGET_DELETE_COLUMN.to_owned(), build_case(target_delete_when, target_delete_then)?, - )?; - new_columns = new_columns.with_column(TARGET_COPY_COLUMN, build_case(copy_when, copy_then)?)?; - - let new_columns = new_columns.into_unoptimized_plan(); + )); + new_columns.push(( + TARGET_COPY_COLUMN.to_owned(), + build_case(copy_when, copy_then)?, + )); + + let mut new_columns = { + let plan = projection.into_unoptimized_plan(); + let mut fields: Vec = plan + .schema() + .fields() + .iter() + .map(|f| col(f.qualified_column())) + .collect(); + + fields.extend(new_columns.into_iter().map(|(name, ex)| ex.alias(name))); + + LogicalPlanBuilder::from(plan).project(fields)?.build()? + }; let distrbute_expr = col(file_column.as_str()); From 2a413919d782aac0175fc675d18d518d36c16a0f Mon Sep 17 00:00:00 2001 From: emcake <3726783+emcake@users.noreply.github.com> Date: Mon, 26 Feb 2024 22:58:41 +0000 Subject: [PATCH 2/2] remove comment --- crates/core/src/operations/merge/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index 4cff745172..6190e8f724 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -1223,7 +1223,6 @@ async fn execute( Expr::Column(Column::from_qualified_name_ignore_case(name.clone())) .alias(delta_field.name()), ); - //new_columns = new_columns.with_column(&name, case)?; new_columns.push((name, case)); }