Skip to content

Commit

Permalink
add interval arithmetic for timestamp types (#7758)
Browse files Browse the repository at this point in the history
* add interval arithmetic for timestamp types

Timestamp types have custom arithmetic and need special handling
when attempting to determine potential interval ranges. Change the
processing of comparison operator propagation to convert timestamp
intervals into int64 intervals for processing. The results are
converted back the the correct datatype at the end of the process.

* apply review suggestion

The review from @berkaysynnada showed that it was not necessary to
have special handling for timestamp types, but to make sure the
new_zero function for scalars of a duration type return 0 rather
than null values. Apply the suggested change, leaving the test to
ensure the functionality doesn't break in the future.

* clippy fixes

Apply a number of changes suggested by clippy.

* fix: edit clash

* Update datafusion/physical-expr/src/intervals/cp_solver.rs

* fmt

---------

Co-authored-by: Andrew Lamb <[email protected]>
  • Loading branch information
mhilton and alamb authored Oct 9, 2023
1 parent b0ea758 commit 96561a4
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 4 deletions.
8 changes: 4 additions & 4 deletions datafusion/common/src/scalar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -833,15 +833,15 @@ impl ScalarValue {
DataType::Interval(IntervalUnit::MonthDayNano) => {
ScalarValue::IntervalMonthDayNano(Some(0))
}
DataType::Duration(TimeUnit::Second) => ScalarValue::DurationSecond(None),
DataType::Duration(TimeUnit::Second) => ScalarValue::DurationSecond(Some(0)),
DataType::Duration(TimeUnit::Millisecond) => {
ScalarValue::DurationMillisecond(None)
ScalarValue::DurationMillisecond(Some(0))
}
DataType::Duration(TimeUnit::Microsecond) => {
ScalarValue::DurationMicrosecond(None)
ScalarValue::DurationMicrosecond(Some(0))
}
DataType::Duration(TimeUnit::Nanosecond) => {
ScalarValue::DurationNanosecond(None)
ScalarValue::DurationNanosecond(Some(0))
}
_ => {
return _not_impl_err!(
Expand Down
131 changes: 131 additions & 0 deletions datafusion/physical-expr/src/intervals/cp_solver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,7 @@ mod tests {

use crate::expressions::{BinaryExpr, Column};
use crate::intervals::test_utils::gen_conjunctive_numerical_expr;
use arrow::datatypes::TimeUnit;
use datafusion_common::ScalarValue;
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
Expand Down Expand Up @@ -1414,4 +1415,134 @@ mod tests {

Ok(())
}

#[test]
fn test_propagate_comparison() {
// In the examples below:
// `left` is unbounded: [?, ?],
// `right` is known to be [1000,1000]
// so `left` < `right` results in no new knowledge of `right` but knowing that `left` is now < 1000:` [?, 1000)
let left = Interval::new(
IntervalBound::make_unbounded(DataType::Int64).unwrap(),
IntervalBound::make_unbounded(DataType::Int64).unwrap(),
);
let right = Interval::new(
IntervalBound::new(ScalarValue::Int64(Some(1000)), false),
IntervalBound::new(ScalarValue::Int64(Some(1000)), false),
);
assert_eq!(
(
Some(Interval::new(
IntervalBound::make_unbounded(DataType::Int64).unwrap(),
IntervalBound::new(ScalarValue::Int64(Some(1000)), true)
)),
Some(Interval::new(
IntervalBound::new(ScalarValue::Int64(Some(1000)), false),
IntervalBound::new(ScalarValue::Int64(Some(1000)), false)
)),
),
propagate_comparison(&Operator::Lt, &left, &right).unwrap()
);

let left = Interval::new(
IntervalBound::make_unbounded(DataType::Timestamp(
TimeUnit::Nanosecond,
None,
))
.unwrap(),
IntervalBound::make_unbounded(DataType::Timestamp(
TimeUnit::Nanosecond,
None,
))
.unwrap(),
);
let right = Interval::new(
IntervalBound::new(ScalarValue::TimestampNanosecond(Some(1000), None), false),
IntervalBound::new(ScalarValue::TimestampNanosecond(Some(1000), None), false),
);
assert_eq!(
(
Some(Interval::new(
IntervalBound::make_unbounded(DataType::Timestamp(
TimeUnit::Nanosecond,
None
))
.unwrap(),
IntervalBound::new(
ScalarValue::TimestampNanosecond(Some(1000), None),
true
)
)),
Some(Interval::new(
IntervalBound::new(
ScalarValue::TimestampNanosecond(Some(1000), None),
false
),
IntervalBound::new(
ScalarValue::TimestampNanosecond(Some(1000), None),
false
)
)),
),
propagate_comparison(&Operator::Lt, &left, &right).unwrap()
);

let left = Interval::new(
IntervalBound::make_unbounded(DataType::Timestamp(
TimeUnit::Nanosecond,
Some("+05:00".into()),
))
.unwrap(),
IntervalBound::make_unbounded(DataType::Timestamp(
TimeUnit::Nanosecond,
Some("+05:00".into()),
))
.unwrap(),
);
let right = Interval::new(
IntervalBound::new(
ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())),
false,
),
IntervalBound::new(
ScalarValue::TimestampNanosecond(Some(1000), Some("+05:00".into())),
false,
),
);
assert_eq!(
(
Some(Interval::new(
IntervalBound::make_unbounded(DataType::Timestamp(
TimeUnit::Nanosecond,
Some("+05:00".into()),
))
.unwrap(),
IntervalBound::new(
ScalarValue::TimestampNanosecond(
Some(1000),
Some("+05:00".into())
),
true
)
)),
Some(Interval::new(
IntervalBound::new(
ScalarValue::TimestampNanosecond(
Some(1000),
Some("+05:00".into())
),
false
),
IntervalBound::new(
ScalarValue::TimestampNanosecond(
Some(1000),
Some("+05:00".into())
),
false
)
)),
),
propagate_comparison(&Operator::Lt, &left, &right).unwrap()
);
}
}

0 comments on commit 96561a4

Please sign in to comment.