Skip to content

Commit

Permalink
perf: directly create projection instead of using DataFrame::with_col…
Browse files Browse the repository at this point in the history
…umn (#2222)

# Description
`DataFrame::with_column` performs a linear operation in the number of
columns to append on an existing column, checking that nothing collides.
On top of this once the projection a normalization step (also linear in
number of columns) is performed before returning the dataframe.

For a merge where we are performing a `when_matched_update_all` type
operation on wide tables (100+ columns), this is in effect a `2*N^2`
operation as we were adding the remapped case columns one at a time with
`with_column` and then remapping it.

This PR uses `project` directly to construct the logical plan. We don't
need any of the special checking for name clashes or windowing that
`with_column` provides and we discard it immediately down to an
unoptimized logical plan anyway, so this produces no change to schema -
just a much more compact logical plan.

This reduces an example merge I had from taking 5+ minutes to just
optimize the table, down to about 13 seconds including the merge.
  • Loading branch information
emcake authored Feb 27, 2024
1 parent 51f1cd0 commit 2f2acba
Showing 1 changed file with 35 additions and 13 deletions.
48 changes: 35 additions & 13 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ async fn execute(

let projection = join.with_column(OPERATION_COLUMN, case)?;

let mut new_columns = projection;
let mut new_columns = vec![];
let mut write_projection = Vec::new();

for delta_field in snapshot.schema().fields() {
Expand Down Expand Up @@ -1223,7 +1223,7 @@ async fn execute(
Expr::Column(Column::from_qualified_name_ignore_case(name.clone()))
.alias(delta_field.name()),
);
new_columns = new_columns.with_column(&name, case)?;
new_columns.push((name, case));
}

let mut insert_when = Vec::with_capacity(ops.len());
Expand Down Expand Up @@ -1299,18 +1299,40 @@ async fn execute(
.end()
}

new_columns = new_columns.with_column(DELETE_COLUMN, build_case(delete_when, delete_then)?)?;
new_columns =
new_columns.with_column(TARGET_INSERT_COLUMN, build_case(insert_when, insert_then)?)?;
new_columns =
new_columns.with_column(TARGET_UPDATE_COLUMN, build_case(update_when, update_then)?)?;
new_columns = new_columns.with_column(
TARGET_DELETE_COLUMN,
new_columns.push((
DELETE_COLUMN.to_owned(),
build_case(delete_when, delete_then)?,
));
new_columns.push((
TARGET_INSERT_COLUMN.to_owned(),
build_case(insert_when, insert_then)?,
));
new_columns.push((
TARGET_UPDATE_COLUMN.to_owned(),
build_case(update_when, update_then)?,
));
new_columns.push((
TARGET_DELETE_COLUMN.to_owned(),
build_case(target_delete_when, target_delete_then)?,
)?;
new_columns = new_columns.with_column(TARGET_COPY_COLUMN, build_case(copy_when, copy_then)?)?;

let new_columns = new_columns.into_unoptimized_plan();
));
new_columns.push((
TARGET_COPY_COLUMN.to_owned(),
build_case(copy_when, copy_then)?,
));

let mut new_columns = {
let plan = projection.into_unoptimized_plan();
let mut fields: Vec<Expr> = plan
.schema()
.fields()
.iter()
.map(|f| col(f.qualified_column()))
.collect();

fields.extend(new_columns.into_iter().map(|(name, ex)| ex.alias(name)));

LogicalPlanBuilder::from(plan).project(fields)?.build()?
};

let distrbute_expr = col(file_column.as_str());

Expand Down

0 comments on commit 2f2acba

Please sign in to comment.