diff --git a/datafusion/common/src/functional_dependencies.rs b/datafusion/common/src/functional_dependencies.rs index 5f262d634af37..902566cb1c9c6 100644 --- a/datafusion/common/src/functional_dependencies.rs +++ b/datafusion/common/src/functional_dependencies.rs @@ -368,6 +368,13 @@ impl FunctionalDependencies { left_func_dependencies.extend(right_func_dependencies); left_func_dependencies } + JoinType::LeftGroup => { + right_func_dependencies.add_offset(left_cols_len); + left_func_dependencies = + left_func_dependencies.with_dependency(Dependency::Single); + right_func_dependencies.downgrade_dependencies(); + left_func_dependencies + } JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { // These joins preserve functional dependencies of the left side: left_func_dependencies diff --git a/datafusion/common/src/join_type.rs b/datafusion/common/src/join_type.rs index ac81d977b7296..6d6a663871038 100644 --- a/datafusion/common/src/join_type.rs +++ b/datafusion/common/src/join_type.rs @@ -67,6 +67,10 @@ pub enum JoinType { /// /// [1]: http://btw2017.informatik.uni-stuttgart.de/slidesandpapers/F1-10-37/paper_web.pdf LeftMark, + /// Left Group join + /// + /// [2]: https://www.vldb.org/pvldb/vol4/p843-moerkotte.pdf + LeftGroup, } impl JoinType { @@ -90,6 +94,9 @@ impl JoinType { JoinType::LeftMark => { unreachable!("LeftMark join type does not support swapping") } + JoinType::LeftGroup => { + unreachable!("LeftGroup join type does not support swapping") + } } } @@ -121,6 +128,7 @@ impl Display for JoinType { JoinType::LeftAnti => "LeftAnti", JoinType::RightAnti => "RightAnti", JoinType::LeftMark => "LeftMark", + JoinType::LeftGroup => "LeftGroup", }; write!(f, "{join_type}") } @@ -141,6 +149,7 @@ impl FromStr for JoinType { "LEFTANTI" => Ok(JoinType::LeftAnti), "RIGHTANTI" => Ok(JoinType::RightAnti), "LEFTMARK" => Ok(JoinType::LeftMark), + "LEFTGROUP" => Ok(JoinType::LeftGroup), _ => _not_impl_err!("The join type {s} does not exist or is not implemented"), } } diff --git a/datafusion/expr/src/logical_plan/builder.rs b/datafusion/expr/src/logical_plan/builder.rs index c7cff3ac26b11..ca92f3c864a14 100644 --- a/datafusion/expr/src/logical_plan/builder.rs +++ b/datafusion/expr/src/logical_plan/builder.rs @@ -870,6 +870,10 @@ impl LogicalPlanBuilder { return plan_err!("left_keys and right_keys were not the same length"); } + if join_type == JoinType::LeftGroup { + return plan_err!("Cannot create LefGroup Join manualy."); + } + let filter = if let Some(expr) = filter { let filter = normalize_col_with_schemas_and_ambiguity_check( expr, @@ -977,6 +981,8 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null, + group_expr: None, + aggr_expr: None, }))) } @@ -1041,6 +1047,8 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::Using, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + group_expr: None, + aggr_expr: None, }))) } } @@ -1058,6 +1066,8 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, null_equals_null: false, schema: DFSchemaRef::new(join_schema), + group_expr: None, + aggr_expr: None, }))) } @@ -1276,6 +1286,8 @@ impl LogicalPlanBuilder { join_constraint: JoinConstraint::On, schema: DFSchemaRef::new(join_schema), null_equals_null: false, + group_expr: None, + aggr_expr: None, }))) } @@ -1389,7 +1401,7 @@ pub fn build_join_schema( .collect::>(); left_fields.into_iter().chain(right_fields).collect() } - JoinType::Left => { + JoinType::Left | JoinType::LeftGroup => { // left then right, right set to nullable in case of not matched scenario let left_fields = left_fields .map(|(q, f)| (q.cloned(), Arc::clone(f))) diff --git a/datafusion/expr/src/logical_plan/invariants.rs b/datafusion/expr/src/logical_plan/invariants.rs index bde4acaae5629..aaaa7798f33ae 100644 --- a/datafusion/expr/src/logical_plan/invariants.rs +++ b/datafusion/expr/src/logical_plan/invariants.rs @@ -271,7 +271,8 @@ fn check_inner_plan(inner_plan: &LogicalPlan, can_contain_outer_ref: bool) -> Re JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti - | JoinType::LeftMark => { + | JoinType::LeftMark + | JoinType::LeftGroup => { check_inner_plan(left, can_contain_outer_ref)?; check_inner_plan(right, false) } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 24fb0609b0fe7..18fd41a643c19 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -545,7 +545,11 @@ impl LogicalPlan { join_type, .. }) => match join_type { - JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => { + JoinType::Inner + | JoinType::Left + | JoinType::Right + | JoinType::Full + | JoinType::LeftGroup => { if left.schema().fields().is_empty() { right.head_output_expr() } else { @@ -658,6 +662,8 @@ impl LogicalPlan { on, schema: _, null_equals_null, + group_expr, + aggr_expr, }) => { let schema = build_join_schema(left.schema(), right.schema(), &join_type)?; @@ -679,6 +685,8 @@ impl LogicalPlan { filter, schema: DFSchemaRef::new(schema), null_equals_null, + group_expr, + aggr_expr, })) } LogicalPlan::Subquery(_) => Ok(self), @@ -932,6 +940,8 @@ impl LogicalPlan { filter: filter_expr, schema: DFSchemaRef::new(schema), null_equals_null: *null_equals_null, + group_expr: None, + aggr_expr: None, })) } LogicalPlan::Subquery(Subquery { @@ -1330,9 +1340,10 @@ impl LogicalPlan { (left_max, right_max, _) => Some(left_max * right_max), } } - JoinType::LeftSemi | JoinType::LeftAnti | JoinType::LeftMark => { - left.max_rows() - } + JoinType::LeftSemi + | JoinType::LeftAnti + | JoinType::LeftMark + | JoinType::LeftGroup => left.max_rows(), JoinType::RightSemi | JoinType::RightAnti => right.max_rows(), }, LogicalPlan::Repartition(Repartition { input, .. }) => input.max_rows(), @@ -3295,6 +3306,9 @@ pub struct Join { pub schema: DFSchemaRef, /// If null_equals_null is true, null == null else null != null pub null_equals_null: bool, + /// Only for GroupJoin + pub group_expr: Option>, + pub aggr_expr: Option>, } impl Join { @@ -3328,6 +3342,8 @@ impl Join { join_constraint: original_join.join_constraint, schema: Arc::new(join_schema), null_equals_null: original_join.null_equals_null, + group_expr: original_join.group_expr.clone(), + aggr_expr: original_join.aggr_expr.clone(), }) } } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index 9a6103afd4b41..8c79ad1b64c5e 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -141,6 +141,8 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equals_null, + group_expr, + aggr_expr, }) => (left, right).map_elements(f)?.update_data(|(left, right)| { LogicalPlan::Join(Join { left, @@ -151,6 +153,8 @@ impl TreeNode for LogicalPlan { join_constraint, schema, null_equals_null, + group_expr, + aggr_expr, }) }), LogicalPlan::Limit(Limit { skip, fetch, input }) => input @@ -573,6 +577,8 @@ impl LogicalPlan { join_constraint, schema, null_equals_null, + group_expr, + aggr_expr, }) => (on, filter).map_elements(f)?.update_data(|(on, filter)| { LogicalPlan::Join(Join { left, @@ -583,6 +589,8 @@ impl LogicalPlan { join_constraint, schema, null_equals_null, + group_expr, + aggr_expr, }) }), LogicalPlan::Sort(Sort { expr, input, fetch }) => expr diff --git a/datafusion/optimizer/src/eliminate_cross_join.rs b/datafusion/optimizer/src/eliminate_cross_join.rs index d35572e6d34a3..6f149b0365f2f 100644 --- a/datafusion/optimizer/src/eliminate_cross_join.rs +++ b/datafusion/optimizer/src/eliminate_cross_join.rs @@ -329,6 +329,8 @@ fn find_inner_join( filter: None, schema: join_schema, null_equals_null: false, + group_expr: None, + aggr_expr: None, })); } } @@ -351,6 +353,8 @@ fn find_inner_join( join_type: JoinType::Inner, join_constraint: JoinConstraint::On, null_equals_null: false, + aggr_expr: None, + group_expr: None, })) } diff --git a/datafusion/optimizer/src/eliminate_outer_join.rs b/datafusion/optimizer/src/eliminate_outer_join.rs index 1ecb32ca2a435..abdeada2798dc 100644 --- a/datafusion/optimizer/src/eliminate_outer_join.rs +++ b/datafusion/optimizer/src/eliminate_outer_join.rs @@ -111,14 +111,8 @@ impl OptimizerRule for EliminateOuterJoin { }; let new_join = Arc::new(LogicalPlan::Join(Join { - left: join.left, - right: join.right, join_type: new_join_type, - join_constraint: join.join_constraint, - on: join.on.clone(), - filter: join.filter.clone(), - schema: Arc::clone(&join.schema), - null_equals_null: join.null_equals_null, + ..join })); Filter::try_new(filter.predicate, new_join) .map(|f| Transformed::yes(LogicalPlan::Filter(f))) diff --git a/datafusion/optimizer/src/extract_equijoin_predicate.rs b/datafusion/optimizer/src/extract_equijoin_predicate.rs index 48191ec206313..e7523f0574f69 100644 --- a/datafusion/optimizer/src/extract_equijoin_predicate.rs +++ b/datafusion/optimizer/src/extract_equijoin_predicate.rs @@ -76,6 +76,8 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + group_expr, + aggr_expr, }) => { let left_schema = left.schema(); let right_schema = right.schema(); @@ -93,6 +95,8 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + group_expr, + aggr_expr, }))) } else { Ok(Transformed::no(LogicalPlan::Join(Join { @@ -104,6 +108,8 @@ impl OptimizerRule for ExtractEquijoinPredicate { join_constraint, schema, null_equals_null, + group_expr, + aggr_expr, }))) } } diff --git a/datafusion/optimizer/src/groupby_and_join_to_groupjoin.rs b/datafusion/optimizer/src/groupby_and_join_to_groupjoin.rs new file mode 100644 index 0000000000000..669df50a969a4 --- /dev/null +++ b/datafusion/optimizer/src/groupby_and_join_to_groupjoin.rs @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`GroupByAndJoinToGroupJoin`] Rewrites Group-By and Join to Group join + +use crate::optimizer::ApplyOrder; +use crate::{OptimizerConfig, OptimizerRule}; +use datafusion_common::tree_node::Transformed; +use datafusion_common::Result; +use datafusion_expr::expr_rewriter::coerce_plan_expr_for_schema; +use datafusion_expr::logical_plan::{JoinConstraint, JoinType, LogicalPlan}; +use datafusion_expr::{Aggregate, Expr, GroupingSet, Join}; +use itertools::Itertools; +use std::sync::Arc; + +// Article: https://www.vldb.org/pvldb/vol4/p843-moerkotte.pdf +// Accelerating Queries with Group-By and Join by Groupjoin +#[derive(Default, Debug)] +pub struct GroupByAndJoinToGroupJoin {} + +impl GroupByAndJoinToGroupJoin { + #[allow(missing_docs)] + pub fn new() -> Self { + Self {} + } +} + +impl OptimizerRule for GroupByAndJoinToGroupJoin { + fn name(&self) -> &str { + "group_by_and_join_to_group_join" + } + + fn supports_rewrite(&self) -> bool { + true + } + + fn rewrite( + &self, + plan: LogicalPlan, + config: &dyn OptimizerConfig, + ) -> Result> { + match plan.clone() { + LogicalPlan::Aggregate(Aggregate { + input, + group_expr, + aggr_expr, + schema: aggregate_schema, + .. + }) => match Arc::unwrap_or_clone(input) { + LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type: JoinType::Left, + join_constraint, + schema: join_schema, + null_equals_null, + group_expr: _, + aggr_expr: _, + }) => { + if is_group_join(group_expr.clone(), on.clone()) { + let new_plan = LogicalPlan::Join(Join { + left, + right, + on, + filter, + join_type: JoinType::LeftGroup, + join_constraint, + schema: aggregate_schema, + null_equals_null, + group_expr: Some(group_expr), + aggr_expr: Some(aggr_expr), + }); + Ok(Transformed::yes(new_plan)) + } else { + Ok(Transformed::no(plan)) + } + } + _ => Ok(Transformed::no(plan)), + }, + _ => Ok(Transformed::no(plan)), + } + } +} + +fn is_group_join(group_expr: Vec, on: Vec<(Expr, Expr)>) -> bool { + dbg!(group_expr[0] == on[0].0) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::analyzer::type_coercion::TypeCoercion; + use crate::analyzer::Analyzer; + use crate::test::*; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::{config::ConfigOptions, Column}; + use datafusion_expr::{col, logical_plan::table_scan, LogicalPlanBuilder}; + use datafusion_functions_aggregate::expr_fn::{count, max, min}; + + fn schema() -> Schema { + Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("key", DataType::Utf8, false), + Field::new("value", DataType::Float64, false), + ]) + } + + fn assert_optimized_plan_equal(plan: LogicalPlan, expected: &str) -> Result<()> { + assert_optimized_plan_eq(Arc::new(GroupByAndJoinToGroupJoin {}), plan, expected) + } + + #[test] + fn left_aggregated_and_join_right() -> Result<()> { + let plan = table_scan( + Some("left"), + &Schema::new(vec![Field::new("key", DataType::UInt32, false)]), + None, + )? + .join( + table_scan( + Some("right"), + &Schema::new(vec![ + Field::new("key", DataType::UInt32, false), + Field::new("value", DataType::UInt32, false), + ]), + None, + )? + .build()?, + JoinType::Left, + (vec!["left.key"], vec!["right.key"]), + None, + )? + .aggregate(vec![col("left.key")], vec![max(col("value"))])? + .build()?; + + let expected = "\ + Aggregate: groupBy=[[left.key]], aggr=[[max(right.value)]]\ + \n Left Join: left.key = right.key\ + \n TableScan: left\ + \n TableScan: right\ + "; + assert_optimized_plan_equal(plan, expected) + } +} diff --git a/datafusion/optimizer/src/lib.rs b/datafusion/optimizer/src/lib.rs index 614284e1b4774..5714655d1acd2 100644 --- a/datafusion/optimizer/src/lib.rs +++ b/datafusion/optimizer/src/lib.rs @@ -46,6 +46,7 @@ pub mod eliminate_one_union; pub mod eliminate_outer_join; pub mod extract_equijoin_predicate; pub mod filter_null_join_keys; +pub mod groupby_and_join_to_groupjoin; pub mod optimize_projections; pub mod optimizer; pub mod propagate_empty_relation; diff --git a/datafusion/optimizer/src/optimize_projections/mod.rs b/datafusion/optimizer/src/optimize_projections/mod.rs index b7dd391586a18..88e5d1e3b5133 100644 --- a/datafusion/optimizer/src/optimize_projections/mod.rs +++ b/datafusion/optimizer/src/optimize_projections/mod.rs @@ -681,7 +681,8 @@ fn split_join_requirements( | JoinType::Left | JoinType::Right | JoinType::Full - | JoinType::LeftMark => { + | JoinType::LeftMark + | JoinType::LeftGroup => { // Decrease right side indices by `left_len` so that they point to valid // positions within the right child: indices.split_off(left_len) diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index 7cb0e7c2f1f7d..42e6a6964f796 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -160,7 +160,7 @@ pub struct PushDownFilter {} pub(crate) fn lr_is_preserved(join_type: JoinType) -> (bool, bool) { match join_type { JoinType::Inner => (true, true), - JoinType::Left => (true, false), + JoinType::Left | JoinType::LeftGroup => (true, false), JoinType::Right => (false, true), JoinType::Full => (false, false), // No columns from the right side of the join can be referenced in output @@ -191,6 +191,7 @@ pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) { JoinType::LeftAnti => (false, true), JoinType::RightAnti => (true, false), JoinType::LeftMark => (false, true), + JoinType::LeftGroup => (false, true), } } @@ -682,13 +683,14 @@ fn infer_join_predicates_from_on_filters( on_filters, inferred_predicates, ), - JoinType::Left | JoinType::LeftSemi | JoinType::LeftMark => { - infer_join_predicates_impl::( - join_col_keys, - on_filters, - inferred_predicates, - ) - } + JoinType::Left + | JoinType::LeftSemi + | JoinType::LeftMark + | JoinType::LeftGroup => infer_join_predicates_impl::( + join_col_keys, + on_filters, + inferred_predicates, + ), JoinType::Right | JoinType::RightSemi => { infer_join_predicates_impl::( join_col_keys, diff --git a/datafusion/optimizer/src/push_down_limit.rs b/datafusion/optimizer/src/push_down_limit.rs index 8a3aa4bb84599..b91f2ee4df926 100644 --- a/datafusion/optimizer/src/push_down_limit.rs +++ b/datafusion/optimizer/src/push_down_limit.rs @@ -250,6 +250,7 @@ fn push_down_join(mut join: Join, limit: usize) -> Transformed { Left | Right | Full | Inner => (Some(limit), Some(limit)), LeftAnti | LeftSemi | LeftMark => (Some(limit), None), RightAnti | RightSemi => (None, Some(limit)), + LeftGroup => (None, None), } } else { match join.join_type { diff --git a/datafusion/physical-expr/src/equivalence/class.rs b/datafusion/physical-expr/src/equivalence/class.rs index 5c749a1a5a6ec..33bd8e4fd1a7e 100644 --- a/datafusion/physical-expr/src/equivalence/class.rs +++ b/datafusion/physical-expr/src/equivalence/class.rs @@ -621,7 +621,11 @@ impl EquivalenceGroup { on: &[(PhysicalExprRef, PhysicalExprRef)], ) -> Self { match join_type { - JoinType::Inner | JoinType::Left | JoinType::Full | JoinType::Right => { + JoinType::Inner + | JoinType::Left + | JoinType::LeftGroup + | JoinType::Full + | JoinType::Right => { let mut result = Self::new( self.iter() .cloned() diff --git a/datafusion/sql/src/unparser/plan.rs b/datafusion/sql/src/unparser/plan.rs index 2e02baa4f7f58..2104450c18579 100644 --- a/datafusion/sql/src/unparser/plan.rs +++ b/datafusion/sql/src/unparser/plan.rs @@ -1079,6 +1079,7 @@ impl Unparser<'_> { JoinType::RightAnti => ast::JoinOperator::RightAnti(constraint), JoinType::RightSemi => ast::JoinOperator::RightSemi(constraint), JoinType::LeftMark => unimplemented!("Unparsing of Left Mark join type"), + JoinType::LeftGroup => unimplemented!("Unparsing of Left Group join type"), }) }