From d7436c72019853569a1ea7bfa2f10cd984fca71b Mon Sep 17 00:00:00 2001 From: Martin Hilton Date: Fri, 6 Oct 2023 18:00:24 +0100 Subject: [PATCH 1/6] add interval arithmetic for timestamp types Timestamp types have custom arithmetic and need special handling when attempting to determine potential interval ranges. Change the processing of comparison operator propagation to convert timestamp intervals into int64 intervals for processing. The results are converted back the the correct datatype at the end of the process. --- .../physical-expr/src/intervals/cp_solver.rs | 166 ++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index 0a090636dc4b..d35acb5c0119 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -30,6 +30,7 @@ use crate::intervals::interval_aritmetic::{apply_operator, Interval}; use crate::utils::{build_dag, ExprTreeNode}; use crate::PhysicalExpr; +use arrow::compute::CastOptions; use arrow_schema::DataType; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::type_coercion::binary::get_result_type; @@ -305,6 +306,17 @@ pub fn propagate_comparison( ) -> Result<(Option, Option)> { let left_type = left_child.get_datatype()?; let right_type = right_child.get_datatype()?; + if let (DataType::Timestamp(..), DataType::Timestamp(..)) = (&left_type, &right_type) + { + return propagate_comparison_timestamp( + op, + left_child, + right_child, + &left_type, + &right_type, + ); + } + let parent = comparison_operator_target(&left_type, op, &right_type)?; match (&left_type, &right_type) { // We can not compare a Duration type with a time interval type @@ -681,6 +693,33 @@ pub fn propagate_comparison_to_time_interval_at_right( } } +/// Propagate the constraints arising from comparison operators on +/// timestamp data types. Arithmetic on timestamps is treated specially, +/// for example subtracting two timestamps results in duration rather +/// than another timestamp. To work around this all values are converted +/// to int64 before applying the proagation operation. The results are +/// converted back to their original types before being returned. +fn propagate_comparison_timestamp( + op: &Operator, + left_child: &Interval, + right_child: &Interval, + left_type: &DataType, + right_type: &DataType, +) -> Result<(Option, Option)> { + let cast_options = CastOptions::default(); + let left_child_i64 = left_child.cast_to(&DataType::Int64, &cast_options)?; + let right_child_i64 = right_child.cast_to(&DataType::Int64, &cast_options)?; + let (left_result_i64, right_result_i64) = + propagate_comparison(op, &left_child_i64, &right_child_i64)?; + let left_result = left_result_i64 + .map(|interval| interval.cast_to(left_type, &cast_options)) + .transpose()?; + let right_result = right_result_i64 + .map(|interval| interval.cast_to(right_type, &cast_options)) + .transpose()?; + Ok((left_result, right_result)) +} + #[cfg(test)] mod tests { use super::*; @@ -688,6 +727,7 @@ mod tests { use crate::expressions::{BinaryExpr, Column}; use crate::intervals::test_utils::gen_conjunctive_numerical_expr; + use arrow::datatypes::TimeUnit; use datafusion_common::ScalarValue; use rand::rngs::StdRng; use rand::{Rng, SeedableRng}; @@ -1414,4 +1454,130 @@ mod tests { Ok(()) } + + #[test] + fn test_propagate_comparison() { + let left = Interval::new( + IntervalBound::make_unbounded(DataType::Int64).unwrap(), + IntervalBound::make_unbounded(DataType::Int64).unwrap(), + ); + let right = Interval::new( + IntervalBound::new(ScalarValue::Int64(Some(1000)), false), + IntervalBound::new(ScalarValue::Int64(Some(1000)), false), + ); + assert_eq!( + ( + Some(Interval::new( + IntervalBound::make_unbounded(DataType::Int64).unwrap(), + IntervalBound::new(ScalarValue::Int64(Some(1000)), true) + )), + Some(Interval::new( + IntervalBound::new(ScalarValue::Int64(Some(1000)), false), + IntervalBound::new(ScalarValue::Int64(Some(1000)), false) + )), + ), + propagate_comparison(&Operator::Lt, &left, &right).unwrap() + ); + + let left = Interval::new( + IntervalBound::make_unbounded(DataType::Timestamp( + TimeUnit::Nanosecond, + None, + )) + .unwrap(), + IntervalBound::make_unbounded(DataType::Timestamp( + TimeUnit::Nanosecond, + None, + )) + .unwrap(), + ); + let right = Interval::new( + IntervalBound::new(ScalarValue::TimestampNanosecond(Some(1000), None), false), + IntervalBound::new(ScalarValue::TimestampNanosecond(Some(1000), None), false), + ); + assert_eq!( + ( + Some(Interval::new( + IntervalBound::make_unbounded(DataType::Timestamp( + TimeUnit::Nanosecond, + None + )) + .unwrap(), + IntervalBound::new( + ScalarValue::TimestampNanosecond(Some(1000), None), + true + ) + )), + Some(Interval::new( + IntervalBound::new( + ScalarValue::TimestampNanosecond(Some(1000), None), + false + ), + IntervalBound::new( + ScalarValue::TimestampNanosecond(Some(1000), None), + false + ) + )), + ), + propagate_comparison(&Operator::Lt, &left, &right).unwrap() + ); + + let left = Interval::new( + IntervalBound::make_unbounded(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + )) + .unwrap(), + IntervalBound::make_unbounded(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + )) + .unwrap(), + ); + let right = Interval::new( + IntervalBound::new( + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + false, + ), + IntervalBound::new( + ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())), + false, + ), + ); + assert_eq!( + ( + Some(Interval::new( + IntervalBound::make_unbounded(DataType::Timestamp( + TimeUnit::Nanosecond, + Some("+05:00".into()), + )) + .unwrap(), + IntervalBound::new( + ScalarValue::TimestampNanosecond( + Some(1000), + Some("+05:00".into()) + ), + true + ) + )), + Some(Interval::new( + IntervalBound::new( + ScalarValue::TimestampNanosecond( + Some(1000), + Some("+05:00".into()) + ), + false + ), + IntervalBound::new( + ScalarValue::TimestampNanosecond( + Some(1000), + Some("+05:00".into()) + ), + false + ) + )), + ), + propagate_comparison(&Operator::Lt, &left, &right).unwrap() + ); + } } From 3ff234609fb2c2017e12fdbff61be6d188301a2c Mon Sep 17 00:00:00 2001 From: Martin Hilton Date: Mon, 9 Oct 2023 09:53:55 +0100 Subject: [PATCH 2/6] apply review suggestion The review from @berkaysynnada showed that it was not necessary to have special handling for timestamp types, but to make sure the new_zero function for scalars of a duration type return 0 rather than null values. Apply the suggested change, leaving the test to ensure the functionality doesn't break in the future. --- datafusion/common/src/scalar.rs | 8 ++-- .../physical-expr/src/intervals/cp_solver.rs | 39 ------------------- 2 files changed, 4 insertions(+), 43 deletions(-) diff --git a/datafusion/common/src/scalar.rs b/datafusion/common/src/scalar.rs index 32343b98fa24..ae9c29b5a141 100644 --- a/datafusion/common/src/scalar.rs +++ b/datafusion/common/src/scalar.rs @@ -833,15 +833,15 @@ impl ScalarValue { DataType::Interval(IntervalUnit::MonthDayNano) => { ScalarValue::IntervalMonthDayNano(Some(0)) } - DataType::Duration(TimeUnit::Second) => ScalarValue::DurationSecond(None), + DataType::Duration(TimeUnit::Second) => ScalarValue::DurationSecond(Some(0)), DataType::Duration(TimeUnit::Millisecond) => { - ScalarValue::DurationMillisecond(None) + ScalarValue::DurationMillisecond(Some(0)) } DataType::Duration(TimeUnit::Microsecond) => { - ScalarValue::DurationMicrosecond(None) + ScalarValue::DurationMicrosecond(Some(0)) } DataType::Duration(TimeUnit::Nanosecond) => { - ScalarValue::DurationNanosecond(None) + ScalarValue::DurationNanosecond(Some(0)) } _ => { return _not_impl_err!( diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index d35acb5c0119..de8c3da3a7cd 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -30,7 +30,6 @@ use crate::intervals::interval_aritmetic::{apply_operator, Interval}; use crate::utils::{build_dag, ExprTreeNode}; use crate::PhysicalExpr; -use arrow::compute::CastOptions; use arrow_schema::DataType; use datafusion_common::{DataFusionError, Result, ScalarValue}; use datafusion_expr::type_coercion::binary::get_result_type; @@ -306,17 +305,6 @@ pub fn propagate_comparison( ) -> Result<(Option, Option)> { let left_type = left_child.get_datatype()?; let right_type = right_child.get_datatype()?; - if let (DataType::Timestamp(..), DataType::Timestamp(..)) = (&left_type, &right_type) - { - return propagate_comparison_timestamp( - op, - left_child, - right_child, - &left_type, - &right_type, - ); - } - let parent = comparison_operator_target(&left_type, op, &right_type)?; match (&left_type, &right_type) { // We can not compare a Duration type with a time interval type @@ -693,33 +681,6 @@ pub fn propagate_comparison_to_time_interval_at_right( } } -/// Propagate the constraints arising from comparison operators on -/// timestamp data types. Arithmetic on timestamps is treated specially, -/// for example subtracting two timestamps results in duration rather -/// than another timestamp. To work around this all values are converted -/// to int64 before applying the proagation operation. The results are -/// converted back to their original types before being returned. -fn propagate_comparison_timestamp( - op: &Operator, - left_child: &Interval, - right_child: &Interval, - left_type: &DataType, - right_type: &DataType, -) -> Result<(Option, Option)> { - let cast_options = CastOptions::default(); - let left_child_i64 = left_child.cast_to(&DataType::Int64, &cast_options)?; - let right_child_i64 = right_child.cast_to(&DataType::Int64, &cast_options)?; - let (left_result_i64, right_result_i64) = - propagate_comparison(op, &left_child_i64, &right_child_i64)?; - let left_result = left_result_i64 - .map(|interval| interval.cast_to(left_type, &cast_options)) - .transpose()?; - let right_result = right_result_i64 - .map(|interval| interval.cast_to(right_type, &cast_options)) - .transpose()?; - Ok((left_result, right_result)) -} - #[cfg(test)] mod tests { use super::*; From 816d506e4af6e244efd17cbef0ee0be4719f5b55 Mon Sep 17 00:00:00 2001 From: Martin Hilton Date: Mon, 9 Oct 2023 11:31:45 +0100 Subject: [PATCH 3/6] clippy fixes Apply a number of changes suggested by clippy. --- .../core/src/physical_optimizer/enforce_distribution.rs | 2 +- datafusion/core/src/physical_optimizer/enforce_sorting.rs | 6 +++--- datafusion/core/src/physical_optimizer/pipeline_checker.rs | 2 +- .../replace_with_order_preserving_variants.rs | 2 +- datafusion/expr/src/expr.rs | 5 ++--- datafusion/optimizer/src/push_down_filter.rs | 2 +- datafusion/physical-expr/src/expressions/binary.rs | 4 ++-- datafusion/physical-expr/src/sort_properties.rs | 2 +- datafusion/physical-plan/src/aggregates/mod.rs | 2 +- datafusion/physical-plan/src/memory.rs | 2 +- datafusion/physical-plan/src/projection.rs | 4 +--- datafusion/physical-plan/src/visitor.rs | 2 +- datafusion/sql/src/expr/mod.rs | 2 +- 13 files changed, 17 insertions(+), 20 deletions(-) diff --git a/datafusion/core/src/physical_optimizer/enforce_distribution.rs b/datafusion/core/src/physical_optimizer/enforce_distribution.rs index 3463f3a31376..a1f509d28733 100644 --- a/datafusion/core/src/physical_optimizer/enforce_distribution.rs +++ b/datafusion/core/src/physical_optimizer/enforce_distribution.rs @@ -1529,7 +1529,7 @@ impl DistributionContext { self.plan .children() .into_iter() - .map(|child| DistributionContext::new(child)) + .map(DistributionContext::new) .collect() } } diff --git a/datafusion/core/src/physical_optimizer/enforce_sorting.rs b/datafusion/core/src/physical_optimizer/enforce_sorting.rs index 6e3b160c8922..95ec1973d017 100644 --- a/datafusion/core/src/physical_optimizer/enforce_sorting.rs +++ b/datafusion/core/src/physical_optimizer/enforce_sorting.rs @@ -151,7 +151,7 @@ impl PlanWithCorrespondingSort { self.plan .children() .into_iter() - .map(|child| PlanWithCorrespondingSort::new(child)) + .map(PlanWithCorrespondingSort::new) .collect() } } @@ -267,7 +267,7 @@ impl PlanWithCorrespondingCoalescePartitions { self.plan .children() .into_iter() - .map(|child| PlanWithCorrespondingCoalescePartitions::new(child)) + .map(PlanWithCorrespondingCoalescePartitions::new) .collect() } } @@ -602,7 +602,7 @@ fn analyze_window_sort_removal( let reqs = window_exec .required_input_ordering() .swap_remove(0) - .unwrap_or(vec![]); + .unwrap_or_default(); let sort_expr = PhysicalSortRequirement::to_sort_exprs(reqs); // Satisfy the ordering requirement so that the window can run: add_sort_above(&mut window_child, sort_expr, None)?; diff --git a/datafusion/core/src/physical_optimizer/pipeline_checker.rs b/datafusion/core/src/physical_optimizer/pipeline_checker.rs index 44679647b5b2..3b994d5f893f 100644 --- a/datafusion/core/src/physical_optimizer/pipeline_checker.rs +++ b/datafusion/core/src/physical_optimizer/pipeline_checker.rs @@ -108,7 +108,7 @@ impl TreeNode for PipelineStatePropagator { if !children.is_empty() { let new_children = children .into_iter() - .map(|child| PipelineStatePropagator::new(child)) + .map(PipelineStatePropagator::new) .map(transform) .collect::>>()?; let children_unbounded = new_children diff --git a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs index cb3b6c3d0741..ede95fc67721 100644 --- a/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs +++ b/datafusion/core/src/physical_optimizer/replace_with_order_preserving_variants.rs @@ -112,7 +112,7 @@ impl OrderPreservationContext { self.plan .children() .into_iter() - .map(|child| OrderPreservationContext::new(child)) + .map(OrderPreservationContext::new) .collect() } } diff --git a/datafusion/expr/src/expr.rs b/datafusion/expr/src/expr.rs index 94ef69eb7933..86ebaac6dd0f 100644 --- a/datafusion/expr/src/expr.rs +++ b/datafusion/expr/src/expr.rs @@ -39,9 +39,8 @@ use std::sync::Arc; /// represent logical expressions such as `A + 1`, or `CAST(c1 AS /// int)`. /// -/// An `Expr` can compute its [DataType](arrow::datatypes::DataType) -/// and nullability, and has functions for building up complex -/// expressions. +/// An `Expr` can compute its [DataType] and nullability, +/// and has functions for building up complex expressions. /// /// # Examples /// diff --git a/datafusion/optimizer/src/push_down_filter.rs b/datafusion/optimizer/src/push_down_filter.rs index fe726d5d7783..ee7b37979dc4 100644 --- a/datafusion/optimizer/src/push_down_filter.rs +++ b/datafusion/optimizer/src/push_down_filter.rs @@ -488,7 +488,7 @@ fn push_down_join( .filter .as_ref() .map(|e| utils::split_conjunction_owned(e.clone())) - .unwrap_or_else(Vec::new); + .unwrap_or_default(); let mut is_inner_join = false; let infer_predicates = if join.join_type == JoinType::Inner { diff --git a/datafusion/physical-expr/src/expressions/binary.rs b/datafusion/physical-expr/src/expressions/binary.rs index f75c2f951f56..63fa98011fdd 100644 --- a/datafusion/physical-expr/src/expressions/binary.rs +++ b/datafusion/physical-expr/src/expressions/binary.rs @@ -299,7 +299,7 @@ impl PhysicalExpr for BinaryExpr { }; if let Some(result) = scalar_result { - return result.map(|a| ColumnarValue::Array(a)); + return result.map(ColumnarValue::Array); } // if both arrays or both literals - extract arrays and continue execution @@ -308,7 +308,7 @@ impl PhysicalExpr for BinaryExpr { rhs.into_array(batch.num_rows()), ); self.evaluate_with_resolved_args(left, &left_data_type, right, &right_data_type) - .map(|a| ColumnarValue::Array(a)) + .map(ColumnarValue::Array) } fn children(&self) -> Vec> { diff --git a/datafusion/physical-expr/src/sort_properties.rs b/datafusion/physical-expr/src/sort_properties.rs index 001b86e60a86..097f491cb979 100644 --- a/datafusion/physical-expr/src/sort_properties.rs +++ b/datafusion/physical-expr/src/sort_properties.rs @@ -172,7 +172,7 @@ impl ExprOrdering { self.expr .children() .into_iter() - .map(|e| ExprOrdering::new(e)) + .map(ExprOrdering::new) .collect() } diff --git a/datafusion/physical-plan/src/aggregates/mod.rs b/datafusion/physical-plan/src/aggregates/mod.rs index b6e4b0a44dec..4b100c77f1b7 100644 --- a/datafusion/physical-plan/src/aggregates/mod.rs +++ b/datafusion/physical-plan/src/aggregates/mod.rs @@ -681,7 +681,7 @@ impl AggregateExec { for (expression, name) in group_by.expr.iter() { if let Some(column) = expression.as_any().downcast_ref::() { let new_col_idx = schema.index_of(name)?; - let entry = columns_map.entry(column.clone()).or_insert_with(Vec::new); + let entry = columns_map.entry(column.clone()).or_default(); entry.push(Column::new(name, new_col_idx)); }; } diff --git a/datafusion/physical-plan/src/memory.rs b/datafusion/physical-plan/src/memory.rs index 1dcdae56cfa3..4c83ff1528fa 100644 --- a/datafusion/physical-plan/src/memory.rs +++ b/datafusion/physical-plan/src/memory.rs @@ -81,7 +81,7 @@ impl DisplayAs for MemoryExec { output_ordering.iter().map(|e| e.to_string()).collect(); format!(", output_ordering={}", order_strings.join(",")) }) - .unwrap_or_else(|| "".to_string()); + .unwrap_or_default(); write!( f, diff --git a/datafusion/physical-plan/src/projection.rs b/datafusion/physical-plan/src/projection.rs index 4fc48e971ca9..f4a2cd339ee5 100644 --- a/datafusion/physical-plan/src/projection.rs +++ b/datafusion/physical-plan/src/projection.rs @@ -111,9 +111,7 @@ impl ProjectionExec { let idx = column.index(); let matching_input_field = input_schema.field(idx); let matching_input_column = Column::new(matching_input_field.name(), idx); - let entry = columns_map - .entry(matching_input_column) - .or_insert_with(Vec::new); + let entry = columns_map.entry(matching_input_column).or_default(); entry.push(Column::new(name, expr_idx)); }; } diff --git a/datafusion/physical-plan/src/visitor.rs b/datafusion/physical-plan/src/visitor.rs index 573e4f8b02be..ca826c50022d 100644 --- a/datafusion/physical-plan/src/visitor.rs +++ b/datafusion/physical-plan/src/visitor.rs @@ -38,7 +38,7 @@ pub fn accept( /// depth first walk of `ExecutionPlan` nodes. `pre_visit` is called /// before any children are visited, and then `post_visit` is called /// after all children have been visited. -//// +/// /// To use, define a struct that implements this trait and then invoke /// ['accept']. /// diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 2f6266f29d6a..1d8ed60c328d 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -721,7 +721,7 @@ fn rewrite_placeholder(expr: &mut Expr, other: &Expr, schema: &DFSchema) -> Resu Err(e) => { return Err(e.context(format!( "Can not find type of {other} needed to infer type of {expr}" - )))?; + ))); } Ok(dt) => { *data_type = Some(dt); From 1d196bfad094ae1a4c31fb44395a1f802b262a9c Mon Sep 17 00:00:00 2001 From: Martin Hilton Date: Mon, 9 Oct 2023 11:39:05 +0100 Subject: [PATCH 4/6] fix: edit clash --- datafusion/sql/src/expr/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sql/src/expr/mod.rs b/datafusion/sql/src/expr/mod.rs index 78f36f0faa06..cb34b6ca36e8 100644 --- a/datafusion/sql/src/expr/mod.rs +++ b/datafusion/sql/src/expr/mod.rs @@ -721,7 +721,7 @@ fn rewrite_placeholder(expr: &mut Expr, other: &Expr, schema: &DFSchema) -> Resu Err(e) => { Err(e.context(format!( "Can not find type of {other} needed to infer type of {expr}" - ))); + )))?; } Ok(dt) => { *data_type = Some(dt); From ee6fc4886f063f10de71c7bef5e1f66371e48267 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 9 Oct 2023 17:16:34 -0400 Subject: [PATCH 5/6] Update datafusion/physical-expr/src/intervals/cp_solver.rs --- datafusion/physical-expr/src/intervals/cp_solver.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index de8c3da3a7cd..91f2c771750b 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -1418,6 +1418,10 @@ mod tests { #[test] fn test_propagate_comparison() { + // In the examples below: + // `left` is unbounded: [?, ?], + // `right` is known to be [1000,1000] + // so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 1000) let left = Interval::new( IntervalBound::make_unbounded(DataType::Int64).unwrap(), IntervalBound::make_unbounded(DataType::Int64).unwrap(), From 0fd4cf8fd504464127e5680e9e72ea0f1a42cc6e Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 9 Oct 2023 17:17:20 -0400 Subject: [PATCH 6/6] fmt --- datafusion/physical-expr/src/intervals/cp_solver.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/physical-expr/src/intervals/cp_solver.rs b/datafusion/physical-expr/src/intervals/cp_solver.rs index 91f2c771750b..e7515341c52c 100644 --- a/datafusion/physical-expr/src/intervals/cp_solver.rs +++ b/datafusion/physical-expr/src/intervals/cp_solver.rs @@ -1419,7 +1419,7 @@ mod tests { #[test] fn test_propagate_comparison() { // In the examples below: - // `left` is unbounded: [?, ?], + // `left` is unbounded: [?, ?], // `right` is known to be [1000,1000] // so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 1000) let left = Interval::new(