-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support unnest
in GROUP BY clause
#11469
Changes from all commits
f45f1b0
3012936
8d7937d
17a5158
3998224
810ca8b
2309e8b
b37a1e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,18 +26,20 @@ use crate::utils::{ | |
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest, | ||
}; | ||
|
||
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; | ||
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result}; | ||
use datafusion_common::{Column, UnnestOptions}; | ||
use datafusion_expr::expr::Alias; | ||
use datafusion_expr::expr_rewriter::{ | ||
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols, | ||
}; | ||
use datafusion_expr::logical_plan::tree_node::unwrap_arc; | ||
use datafusion_expr::utils::{ | ||
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns, | ||
find_aggregate_exprs, find_window_exprs, | ||
}; | ||
use datafusion_expr::{ | ||
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, | ||
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning, | ||
}; | ||
use sqlparser::ast::{ | ||
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr, | ||
|
@@ -297,6 +299,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { | |
input: LogicalPlan, | ||
select_exprs: Vec<Expr>, | ||
) -> Result<LogicalPlan> { | ||
// Try process group by unnest | ||
let input = self.try_process_aggregate_unnest(input)?; | ||
|
||
let mut intermediate_plan = input; | ||
let mut intermediate_select_exprs = select_exprs; | ||
// Each expr in select_exprs can contains multiple unnest stage | ||
|
@@ -354,6 +359,117 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> { | |
.build() | ||
} | ||
|
||
fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> { | ||
match input { | ||
LogicalPlan::Aggregate(agg) => { | ||
let agg_expr = agg.aggr_expr.clone(); | ||
let (new_input, new_group_by_exprs) = | ||
self.try_process_group_by_unnest(agg)?; | ||
LogicalPlanBuilder::from(new_input) | ||
.aggregate(new_group_by_exprs, agg_expr)? | ||
.build() | ||
} | ||
LogicalPlan::Filter(mut filter) => { | ||
filter.input = Arc::new( | ||
self.try_process_aggregate_unnest(unwrap_arc(filter.input))?, | ||
); | ||
Ok(LogicalPlan::Filter(filter)) | ||
} | ||
_ => Ok(input), | ||
} | ||
} | ||
|
||
/// Try converting Unnest(Expr) of group by to Unnest/Projection | ||
/// Return the new input and group_by_exprs of Aggregate. | ||
fn try_process_group_by_unnest( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This function seems to have some code repetition with function try_process_unnest. I wonder if there's a better way to handle this, such as planning unnest before aggregation, and then reusing the current group-by planning logic. This seems more intuitive to me. But I'm not sure about it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tracked in #11498 |
||
&self, | ||
agg: Aggregate, | ||
) -> Result<(LogicalPlan, Vec<Expr>)> { | ||
let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None; | ||
|
||
let Aggregate { | ||
input, | ||
group_expr, | ||
aggr_expr, | ||
.. | ||
} = agg; | ||
|
||
// process unnest of group_by_exprs, and input of agg will be rewritten | ||
// for example: | ||
// | ||
// ``` | ||
// Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { table: "tab" }), name: "array_col" }))]], aggr=[[]] | ||
// TableScan: tab | ||
// ``` | ||
// | ||
// will be transformed into | ||
// | ||
// ``` | ||
// Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]] | ||
// Unnest: lists[unnest(tab.array_col)] structs[] | ||
// Projection: tab.array_col AS unnest(tab.array_col) | ||
// TableScan: tab | ||
// ``` | ||
let mut intermediate_plan = unwrap_arc(input); | ||
let mut intermediate_select_exprs = group_expr; | ||
|
||
loop { | ||
let mut unnest_columns = vec![]; | ||
let mut inner_projection_exprs = vec![]; | ||
|
||
let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs | ||
.iter() | ||
.map(|expr| { | ||
transform_bottom_unnest( | ||
&intermediate_plan, | ||
&mut unnest_columns, | ||
&mut inner_projection_exprs, | ||
expr, | ||
) | ||
}) | ||
.collect::<Result<Vec<_>>>()? | ||
.into_iter() | ||
.flatten() | ||
.collect(); | ||
|
||
if unnest_columns.is_empty() { | ||
break; | ||
} else { | ||
let columns = unnest_columns.into_iter().map(|col| col.into()).collect(); | ||
let unnest_options = UnnestOptions::new().with_preserve_nulls(false); | ||
|
||
let mut projection_exprs = match &aggr_expr_using_columns { | ||
Some(exprs) => (*exprs).clone(), | ||
None => { | ||
let mut columns = HashSet::new(); | ||
for expr in &aggr_expr { | ||
expr.apply(|expr| { | ||
if let Expr::Column(c) = expr { | ||
columns.insert(Expr::Column(c.clone())); | ||
} | ||
Ok(TreeNodeRecursion::Continue) | ||
}) | ||
// As the closure always returns Ok, this "can't" error | ||
.expect("Unexpected error"); | ||
} | ||
aggr_expr_using_columns = Some(columns.clone()); | ||
columns | ||
} | ||
}; | ||
projection_exprs.extend(inner_projection_exprs); | ||
|
||
intermediate_plan = LogicalPlanBuilder::from(intermediate_plan) | ||
.project(projection_exprs)? | ||
.unnest_columns_with_options(columns, unnest_options)? | ||
.build()?; | ||
|
||
intermediate_select_exprs = outer_projection_exprs; | ||
} | ||
} | ||
|
||
Ok((intermediate_plan, intermediate_select_exprs)) | ||
} | ||
|
||
fn plan_selection( | ||
&self, | ||
selection: Option<SQLExpr>, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If unnest has already been processed by
try_process_aggregate_unnest
, does the following logic for handlingunnest
become redundant?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you. We need to verify this logic in the future
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracked in #11498