-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support unnest
in GROUP BY clause
#11469
Changes from 3 commits
f45f1b0
3012936
8d7937d
17a5158
3998224
810ca8b
2309e8b
b37a1e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,7 @@ use crate::utils::{ | |
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest, | ||
}; | ||
|
||
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; | ||
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; | ||
use datafusion_common::{Column, UnnestOptions}; | ||
use datafusion_expr::expr::Alias; | ||
|
@@ -37,7 +38,7 @@ use datafusion_expr::utils::{ | |
find_aggregate_exprs, find_window_exprs, | ||
}; | ||
use datafusion_expr::{ | ||
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, | ||
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, | ||
}; | ||
use sqlparser::ast::{ | ||
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, | ||
|
@@ -297,6 +298,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { | |
input: LogicalPlan, | ||
select_exprs: Vec<Expr>, | ||
) -> Result<LogicalPlan> { | ||
// Try process group by unnest | ||
let input = self.try_process_aggregate_unnest(input)?; | ||
|
||
let mut intermediate_plan = input; | ||
let mut intermediate_select_exprs = select_exprs; | ||
// Each expr in select_exprs can contains multiple unnest stage | ||
|
@@ -354,6 +358,102 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { | |
.build() | ||
} | ||
|
||
fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> { | ||
match &input { | ||
LogicalPlan::Aggregate(agg) => { | ||
let (new_input, new_group_by_exprs) = | ||
self.try_process_group_by_unnest(agg)?; | ||
LogicalPlanBuilder::from(new_input) | ||
.aggregate(new_group_by_exprs, agg.aggr_expr.clone())? | ||
.build() | ||
} | ||
LogicalPlan::Filter(filter) => match filter.input.as_ref() { | ||
LogicalPlan::Aggregate(agg) => { | ||
let (new_input, new_group_by_exprs) = | ||
self.try_process_group_by_unnest(agg)?; | ||
LogicalPlanBuilder::from(new_input) | ||
.aggregate(new_group_by_exprs, agg.aggr_expr.clone())? | ||
.filter(filter.predicate.clone())? | ||
.build() | ||
} | ||
_ => Ok(input), | ||
}, | ||
_ => Ok(input), | ||
} | ||
} | ||
|
||
fn try_process_group_by_unnest( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function seems to have some code repetition with function try_process_unnest. I wonder if there's a better way to handle this, such as planning unnest before aggregation, and then reusing the current group-by planning logic. This seems more intuitive to me. But I'm not sure about it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tracked in #11498 |
||
&self, | ||
agg: &Aggregate, | ||
) -> Result<(LogicalPlan, Vec<Expr>)> { | ||
let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None; | ||
|
||
let input = agg.input.as_ref(); | ||
let group_by_exprs = &agg.group_expr; | ||
let aggr_exprs = &agg.aggr_expr; | ||
|
||
// rewrite group_by_exprs | ||
JasonLi-cn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
let mut intermediate_plan = input.clone(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to avoid this clone? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I pushed 810ca8b which avoids the cloning in this function |
||
let mut intermediate_select_exprs = group_by_exprs.to_vec(); | ||
|
||
loop { | ||
let mut unnest_columns = vec![]; | ||
let mut inner_projection_exprs = vec![]; | ||
|
||
let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs | ||
.iter() | ||
.map(|expr| { | ||
transform_bottom_unnest( | ||
&intermediate_plan, | ||
&mut unnest_columns, | ||
&mut inner_projection_exprs, | ||
expr, | ||
) | ||
}) | ||
.collect::<Result<Vec<_>>>()? | ||
.into_iter() | ||
.flatten() | ||
.collect(); | ||
|
||
if unnest_columns.is_empty() { | ||
break; | ||
} else { | ||
let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); | ||
let unnest_options = UnnestOptions::new().with_preserve_nulls(false); | ||
|
||
let mut projection_exprs = match &aggr_expr_using_columns { | ||
Some(exprs) => (*exprs).clone(), | ||
None => { | ||
let mut columns = HashSet::new(); | ||
for expr in aggr_exprs { | ||
expr.apply(|expr| { | ||
if let Expr::Column(c) = expr { | ||
columns.insert(Expr::Column(c.clone())); | ||
} | ||
Ok(TreeNodeRecursion::Continue) | ||
}) | ||
// As the closure always returns Ok, this "can't" error | ||
.expect("Unexpected error"); | ||
} | ||
aggr_expr_using_columns = Some(columns.clone()); | ||
columns | ||
} | ||
}; | ||
projection_exprs.extend(inner_projection_exprs); | ||
|
||
let plan = LogicalPlanBuilder::from(intermediate_plan.clone()) | ||
JasonLi-cn marked this conversation as resolved.
Show resolved
Hide resolved
|
||
.project(projection_exprs)? | ||
.unnest_columns_with_options(columns, unnest_options)? | ||
.build()?; | ||
|
||
intermediate_plan = plan; | ||
intermediate_select_exprs = outer_projection_exprs; | ||
} | ||
} | ||
|
||
Ok((intermediate_plan, intermediate_select_exprs)) | ||
} | ||
|
||
fn plan_selection( | ||
&self, | ||
selection: Option<SQLExpr>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If unnest has already been processed by
try_process_aggregate_unnest
, does the following logic for handlingunnest
become redundant?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. We need to verify this logic in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in #11498