Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support unnest in GROUP BY clause #11469

Merged
merged 8 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 117 additions & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,20 @@ use crate::utils::{
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{Column, UnnestOptions};
use datafusion_expr::expr::Alias;
use datafusion_expr::expr_rewriter::{
normalize_col, normalize_col_with_schemas_and_ambiguity_check, normalize_cols,
};
use datafusion_expr::logical_plan::tree_node::unwrap_arc;
use datafusion_expr::utils::{
expand_qualified_wildcard, expand_wildcard, expr_as_column_expr, expr_to_columns,
find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
Expand Down Expand Up @@ -297,6 +299,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: LogicalPlan,
select_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
// Try process group by unnest
let input = self.try_process_aggregate_unnest(input)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If unnest has already been processed by try_process_aggregate_unnest, does the following logic for handling unnest become redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. We need to verify this logic in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #11498


let mut intermediate_plan = input;
let mut intermediate_select_exprs = select_exprs;
// Each expr in select_exprs can contains multiple unnest stage
Expand Down Expand Up @@ -354,6 +359,117 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.build()
}

fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> {
match input {
LogicalPlan::Aggregate(agg) => {
let agg_expr = agg.aggr_expr.clone();
let (new_input, new_group_by_exprs) =
self.try_process_group_by_unnest(agg)?;
LogicalPlanBuilder::from(new_input)
.aggregate(new_group_by_exprs, agg_expr)?
.build()
}
LogicalPlan::Filter(mut filter) => {
filter.input = Arc::new(
self.try_process_aggregate_unnest(unwrap_arc(filter.input))?,
);
Ok(LogicalPlan::Filter(filter))
}
_ => Ok(input),
}
}

/// Try converting Unnest(Expr) of group by to Unnest/Projection
/// Return the new input and group_by_exprs of Aggregate.
fn try_process_group_by_unnest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems to have some code repetition with function try_process_unnest.

I wonder if there's a better way to handle this, such as planning unnest before aggregation, and then reusing the current group-by planning logic. This seems more intuitive to me. But I'm not sure about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #11498

&self,
agg: Aggregate,
) -> Result<(LogicalPlan, Vec<Expr>)> {
let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;

let Aggregate {
input,
group_expr,
aggr_expr,
..
} = agg;

// process unnest of group_by_exprs, and input of agg will be rewritten
// for example:
//
// ```
// Aggregate: groupBy=[[UNNEST(Column(Column { relation: Some(Bare { table: "tab" }), name: "array_col" }))]], aggr=[[]]
// TableScan: tab
// ```
//
// will be transformed into
//
// ```
// Aggregate: groupBy=[[unnest(tab.array_col)]], aggr=[[]]
// Unnest: lists[unnest(tab.array_col)] structs[]
// Projection: tab.array_col AS unnest(tab.array_col)
// TableScan: tab
// ```
let mut intermediate_plan = unwrap_arc(input);
let mut intermediate_select_exprs = group_expr;

loop {
let mut unnest_columns = vec![];
let mut inner_projection_exprs = vec![];

let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
.iter()
.map(|expr| {
transform_bottom_unnest(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

if unnest_columns.is_empty() {
break;
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);

let mut projection_exprs = match &aggr_expr_using_columns {
Some(exprs) => (*exprs).clone(),
None => {
let mut columns = HashSet::new();
for expr in &aggr_expr {
expr.apply(|expr| {
if let Expr::Column(c) = expr {
columns.insert(Expr::Column(c.clone()));
}
Ok(TreeNodeRecursion::Continue)
})
// As the closure always returns Ok, this "can't" error
.expect("Unexpected error");
}
aggr_expr_using_columns = Some(columns.clone());
columns
}
};
projection_exprs.extend(inner_projection_exprs);

intermediate_plan = LogicalPlanBuilder::from(intermediate_plan)
.project(projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.build()?;

intermediate_select_exprs = outer_projection_exprs;
}
}

Ok((intermediate_plan, intermediate_select_exprs))
}

fn plan_selection(
&self,
selection: Option<SQLExpr>,
Expand Down
134 changes: 132 additions & 2 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -500,8 +500,6 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1
query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them.
select unnest(column1), unnest(column1) from unnest_table;

statement ok
drop table unnest_table;

## unnest list followed by unnest struct
query ???
Expand Down Expand Up @@ -557,3 +555,135 @@ physical_plan
06)----------UnnestExec
07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3]
08)--------------MemoryExec: partitions=1, partition_sizes=[1]

## group by unnest

### without agg exprs
query I
select unnest(column1) c1 from unnest_table group by c1 order by c1;
----
1
2
3
4
5
6
12

query II
select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, c2 order by c1, c2;
----
1 7
2 NULL
3 NULL
4 8
5 9
6 11
12 NULL
NULL 10
NULL 12
NULL 42
NULL NULL

query III
select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1
2 NULL 1
3 NULL 1
4 8 2
5 9 2
6 11 3
12 NULL NULL
NULL 10 2
NULL 12 3
NULL 42 NULL
NULL NULL NULL

### with agg exprs

query IIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1
2 NULL 1 1
3 NULL 1 1
4 8 2 1
5 9 2 1
6 11 3 1
12 NULL NULL 1
NULL 10 2 1
NULL 12 3 1
NULL 42 NULL 1
NULL NULL NULL 1

query IIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1
2 NULL 1 1
3 NULL 1 1
4 8 2 1
5 9 2 1
6 11 3 0
12 NULL NULL 0
NULL 10 2 1
NULL 12 3 0
NULL 42 NULL 0
NULL NULL NULL 0

query IIIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4), sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1 1
2 NULL 1 1 1
3 NULL 1 1 1
4 8 2 1 2
5 9 2 1 2
6 11 3 0 3
12 NULL NULL 0 NULL
NULL 10 2 1 2
NULL 12 3 0 3
NULL 42 NULL 0 NULL
NULL NULL NULL 0 NULL

query II
select unnest(column1), count(*) from unnest_table group by unnest(column1) order by unnest(column1) desc;
----
12 1
6 1
5 1
4 1
3 1
2 1
1 1

### group by recursive unnest list

query ?
select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2 order by c2;
----
[1]
[1, 1]
[2]
[3, 4]
[5]
[7, 8]
[, 6]
NULL

query ?I
select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table group by c2 order by c2;
----
[1] 1
[1, 1] 1
[2] 1
[3, 4] 1
[5] 1
[7, 8] 1
[, 6] 1
NULL 1

### TODO: group by unnest struct
query error DataFusion error: Error during planning: Projection references non\-aggregate values
select unnest(column1) c1 from nested_unnest_table group by c1.c0;
Loading