Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support unnest in GROUP BY clause #11469

Merged
merged 8 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 101 additions & 1 deletion datafusion/sql/src/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::utils::{
resolve_columns, resolve_positions_to_exprs, transform_bottom_unnest,
};

use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion};
use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
use datafusion_common::{Column, UnnestOptions};
use datafusion_expr::expr::Alias;
Expand All @@ -37,7 +38,7 @@ use datafusion_expr::utils::{
find_aggregate_exprs, find_window_exprs,
};
use datafusion_expr::{
Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
Aggregate, Expr, Filter, GroupingSet, LogicalPlan, LogicalPlanBuilder, Partitioning,
};
use sqlparser::ast::{
Distinct, Expr as SQLExpr, GroupByExpr, NamedWindowExpr, OrderByExpr,
Expand Down Expand Up @@ -297,6 +298,9 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
input: LogicalPlan,
select_exprs: Vec<Expr>,
) -> Result<LogicalPlan> {
// Try process group by unnest
let input = self.try_process_aggregate_unnest(input)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If unnest has already been processed by try_process_aggregate_unnest, does the following logic for handling unnest become redundant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you. We need to verify this logic in the future

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #11498


let mut intermediate_plan = input;
let mut intermediate_select_exprs = select_exprs;
// Each expr in select_exprs can contains multiple unnest stage
Expand Down Expand Up @@ -354,6 +358,102 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
.build()
}

fn try_process_aggregate_unnest(&self, input: LogicalPlan) -> Result<LogicalPlan> {
match &input {
LogicalPlan::Aggregate(agg) => {
let (new_input, new_group_by_exprs) =
self.try_process_group_by_unnest(agg)?;
LogicalPlanBuilder::from(new_input)
.aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
.build()
}
LogicalPlan::Filter(filter) => match filter.input.as_ref() {
LogicalPlan::Aggregate(agg) => {
let (new_input, new_group_by_exprs) =
self.try_process_group_by_unnest(agg)?;
LogicalPlanBuilder::from(new_input)
.aggregate(new_group_by_exprs, agg.aggr_expr.clone())?
.filter(filter.predicate.clone())?
.build()
}
_ => Ok(input),
},
_ => Ok(input),
}
}

fn try_process_group_by_unnest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function seems to have some code repetition with function try_process_unnest.

I wonder if there's a better way to handle this, such as planning unnest before aggregation, and then reusing the current group-by planning logic. This seems more intuitive to me. But I'm not sure about it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tracked in #11498

&self,
agg: &Aggregate,
) -> Result<(LogicalPlan, Vec<Expr>)> {
let mut aggr_expr_using_columns: Option<HashSet<Expr>> = None;

let input = agg.input.as_ref();
let group_by_exprs = &agg.group_expr;
let aggr_exprs = &agg.aggr_expr;

// rewrite group_by_exprs
let mut intermediate_plan = input.clone();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to avoid this clone?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pushed 810ca8b which avoids the cloning in this function

let mut intermediate_select_exprs = group_by_exprs.to_vec();

loop {
let mut unnest_columns = vec![];
let mut inner_projection_exprs = vec![];

let outer_projection_exprs: Vec<Expr> = intermediate_select_exprs
.iter()
.map(|expr| {
transform_bottom_unnest(
&intermediate_plan,
&mut unnest_columns,
&mut inner_projection_exprs,
expr,
)
})
.collect::<Result<Vec<_>>>()?
.into_iter()
.flatten()
.collect();

if unnest_columns.is_empty() {
break;
} else {
let columns = unnest_columns.into_iter().map(|col| col.into()).collect();
let unnest_options = UnnestOptions::new().with_preserve_nulls(false);

let mut projection_exprs = match &aggr_expr_using_columns {
Some(exprs) => (*exprs).clone(),
None => {
let mut columns = HashSet::new();
for expr in aggr_exprs {
expr.apply(|expr| {
if let Expr::Column(c) = expr {
columns.insert(Expr::Column(c.clone()));
}
Ok(TreeNodeRecursion::Continue)
})
// As the closure always returns Ok, this "can't" error
.expect("Unexpected error");
}
aggr_expr_using_columns = Some(columns.clone());
columns
}
};
projection_exprs.extend(inner_projection_exprs);

let plan = LogicalPlanBuilder::from(intermediate_plan.clone())
.project(projection_exprs)?
.unnest_columns_with_options(columns, unnest_options)?
.build()?;

intermediate_plan = plan;
intermediate_select_exprs = outer_projection_exprs;
}
}

Ok((intermediate_plan, intermediate_select_exprs))
}

fn plan_selection(
&self,
selection: Option<SQLExpr>,
Expand Down
128 changes: 124 additions & 4 deletions datafusion/sqllogictest/test_files/unnest.slt
Original file line number Diff line number Diff line change
Expand Up @@ -496,12 +496,10 @@ select unnest(column1) from (select * from (values([1,2,3]), ([4,5,6])) limit 1
5
6

## FIXME: https://github.com/apache/datafusion/issues/11198
## FIXME: https://github.com/apache/datafusion/issues/11198
query error DataFusion error: Error during planning: Projections require unique expression names but the expression "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 0 and "UNNEST\(Column\(Column \{ relation: Some\(Bare \{ table: "unnest_table" \}\), name: "column1" \}\)\)" at position 1 have the same name. Consider aliasing \("AS"\) one of them.
select unnest(column1), unnest(column1) from unnest_table;

statement ok
drop table unnest_table;

## unnest list followed by unnest struct
query ???
Expand Down Expand Up @@ -556,4 +554,126 @@ physical_plan
05)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
06)----------UnnestExec
07)------------ProjectionExec: expr=[column3@0 as unnest(recursive_unnest_table.column3), column3@0 as column3]
08)--------------MemoryExec: partitions=1, partition_sizes=[1]
08)--------------MemoryExec: partitions=1, partition_sizes=[1]


## group by unnest

### without agg exprs
query I
select unnest(column1) c1 from unnest_table group by c1 order by c1;
----
1
2
3
4
5
6
12

query II
select unnest(column1) c1, unnest(column2) c2 from unnest_table group by c1, c2 order by c1, c2;
----
1 7
2 NULL
3 NULL
4 8
5 9
6 11
12 NULL
NULL 10
NULL 12
NULL 42
NULL NULL

query III
select unnest(column1) c1, unnest(column2) c2, column3 c3 from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1
2 NULL 1
3 NULL 1
4 8 2
5 9 2
6 11 3
12 NULL NULL
NULL 10 2
NULL 12 3
NULL 42 NULL
NULL NULL NULL

### with agg exprs

query IIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(1) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1
2 NULL 1 1
3 NULL 1 1
4 8 2 1
5 9 2 1
6 11 3 1
12 NULL NULL 1
NULL 10 2 1
NULL 12 3 1
NULL 42 NULL 1
NULL NULL NULL 1

query IIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1
2 NULL 1 1
3 NULL 1 1
4 8 2 1
5 9 2 1
6 11 3 0
12 NULL NULL 0
NULL 10 2 1
NULL 12 3 0
NULL 42 NULL 0
NULL NULL NULL 0

query IIIII
select unnest(column1) c1, unnest(column2) c2, column3 c3, count(column4), sum(column3) from unnest_table group by c1, c2, c3 order by c1, c2, c3;
----
1 7 1 1 1
2 NULL 1 1 1
3 NULL 1 1 1
4 8 2 1 2
5 9 2 1 2
6 11 3 0 3
12 NULL NULL 0 NULL
NULL 10 2 1 2
NULL 12 3 0 3
NULL 42 NULL 0 NULL
NULL NULL NULL 0 NULL

### group by recursive unnest list

query ?
select unnest(unnest(column2)) c2 from recursive_unnest_table group by c2 order by c2;
----
[1]
[1, 1]
[2]
[3, 4]
[5]
[7, 8]
[, 6]
NULL

query ?I
select unnest(unnest(column2)) c2, count(column3) from recursive_unnest_table group by c2 order by c2;
----
[1] 1
[1, 1] 1
[2] 1
[3, 4] 1
[5] 1
[7, 8] 1
[, 6] 1
NULL 1

### TODO: group by unnest struct
query error DataFusion error: Error during planning: Projection references non\-aggregate values
select unnest(column1) c1 from nested_unnest_table group by c1.c0;