From 6cd1b8260f586bc69396ec877c1e636e387073ac Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 23 Oct 2023 08:29:54 -0700 Subject: [PATCH 1/6] feat(7181): provide slicing of CursorValues, and test for the various implementations --- datafusion/physical-plan/src/sorts/cursor.rs | 390 +++++++++++++++++-- datafusion/physical-plan/src/sorts/stream.rs | 2 +- 2 files changed, 361 insertions(+), 31 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index df90c97faf68..594835b33051 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -16,17 +16,17 @@ // under the License. use std::cmp::Ordering; +use std::sync::Arc; use arrow::buffer::ScalarBuffer; use arrow::compute::SortOptions; use arrow::datatypes::ArrowNativeTypeOp; -use arrow::row::Rows; +use arrow::row::{Row, Rows}; use arrow_array::types::ByteArrayType; use arrow_array::{ Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, PrimitiveArray, }; use arrow_buffer::{Buffer, OffsetBuffer}; -use datafusion_execution::memory_pool::MemoryReservation; /// A comparable collection of values for use with [`Cursor`] /// @@ -40,6 +40,13 @@ pub trait CursorValues { /// Returns comparison of `l[l_idx]` and `r[r_idx]` fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering; + + /// Slice at a given row index, returning a new Self + /// + /// # Panics + /// + /// Panics if the slice is out of bounds, or memory is insufficient + fn slice(&self, offset: usize, length: usize) -> Self; } /// A comparable cursor, used by sort operations @@ -95,6 +102,12 @@ impl Cursor { self.offset += 1; t } + + /// Ref to underlying [`CursorValues`] + #[allow(dead_code)] + pub fn cursor_values(&self) -> &T { + &self.values + } } impl PartialEq for Cursor { @@ -122,42 +135,61 @@ impl Ord for Cursor { /// Used for sorting when there are multiple columns in the sort key #[derive(Debug)] pub struct RowValues { - rows: Rows, + rows: Arc, - /// Tracks for the memory used by in the `Rows` of this - /// cursor. Freed on drop - #[allow(dead_code)] - reservation: MemoryReservation, + /// Lower bound of windowed RowValues. + offset: usize, + /// Upper bound of windowed RowValues. + limit: usize, } impl RowValues { - /// Create a new [`RowValues`] from `rows` and a `reservation` - /// that tracks its memory. There must be at least one row + /// Create a new [`RowValues`] from `Arc`. /// - /// Panics if the reservation is not for exactly `rows.size()` - /// bytes or if `rows` is empty. - pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { - assert_eq!( - rows.size(), - reservation.size(), - "memory reservation mismatch" - ); + /// Panics if `rows` is empty. + pub fn new(rows: Arc) -> Self { assert!(rows.num_rows() > 0); - Self { rows, reservation } + Self { + offset: 0, + limit: rows.num_rows(), + rows, + } + } + + /// Return value for idx. + fn get(&self, idx: usize) -> Row<'_> { + self.rows.row(idx + self.offset) } } impl CursorValues for RowValues { fn len(&self) -> usize { - self.rows.num_rows() + self.limit - self.offset } fn eq(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> bool { - l.rows.row(l_idx) == r.rows.row(r_idx) + l.get(l_idx) == r.get(r_idx) } fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { - l.rows.row(l_idx).cmp(&r.rows.row(r_idx)) + l.get(l_idx).cmp(&r.get(r_idx)) + } + + fn slice(&self, offset: usize, length: usize) -> Self { + assert!( + offset >= self.offset && self.offset + offset <= self.limit, + "slice offset is out of bounds" + ); + assert!( + self.offset + offset + length <= self.limit, + "slice length is out of bounds" + ); + + Self { + rows: self.rows.clone(), + offset: self.offset + offset, + limit: self.offset + offset + length, + } } } @@ -191,6 +223,16 @@ impl CursorValues for PrimitiveValues { fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { l.0[l_idx].compare(r.0[r_idx]) } + + fn slice(&self, offset: usize, length: usize) -> Self { + assert!(offset < self.0.len(), "slice offset is out of bounds"); + assert!( + offset + length - 1 < self.0.len(), + "slice length is out of bounds" + ); + + Self(self.0.slice(offset, length)) + } } pub struct ByteArrayValues { @@ -222,6 +264,31 @@ impl CursorValues for ByteArrayValues { fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering { l.value(l_idx).cmp(r.value(r_idx)) } + + fn slice(&self, offset: usize, length: usize) -> Self { + let start = self + .offsets + .get(offset) + .expect("slice offset is out of bounds") + .as_usize(); + let end = self + .offsets + .get(offset + length) + .expect("slice length is out of bounds") + .as_usize(); + + let offsets = self + .offsets + .slice(offset, length) + .iter() + .map(|o| T::usize_as(o.as_usize().wrapping_sub(start))) + .collect::>(); + + Self { + offsets: OffsetBuffer::new(ScalarBuffer::from(offsets)), + values: self.values.slice_with_length(start, end - start), + } + } } impl CursorArray for GenericByteArray { @@ -301,13 +368,31 @@ impl CursorValues for ArrayValues { }, } } + + fn slice(&self, offset: usize, length: usize) -> Self { + assert!(offset < self.values.len(), "slice offset is out of bounds"); + assert!( + offset + length - 1 < self.values.len(), + "slice length is out of bounds" + ); + + Self { + values: self.values.slice(offset, length), + null_threshold: self.null_threshold.saturating_sub(offset), + options: self.options, + } + } } #[cfg(test)] mod tests { + use arrow::row::{RowConverter, SortField}; + use arrow_array::{ArrayRef, Float32Array, Int16Array}; + use arrow_schema::DataType; + use super::*; - fn new_primitive( + fn new_primitive_cursor( options: SortOptions, values: ScalarBuffer, null_count: usize, @@ -334,9 +419,9 @@ mod tests { }; let buffer = ScalarBuffer::from(vec![i32::MAX, 1, 2, 3]); - let mut a = new_primitive(options, buffer, 1); + let mut a = new_primitive_cursor(options, buffer, 1); let buffer = ScalarBuffer::from(vec![1, 2, -2, -1, 1, 9]); - let mut b = new_primitive(options, buffer, 2); + let mut b = new_primitive_cursor(options, buffer, 2); // NULL == NULL assert_eq!(a.cmp(&b), Ordering::Equal); @@ -378,9 +463,9 @@ mod tests { }; let buffer = ScalarBuffer::from(vec![0, 1, i32::MIN, i32::MAX]); - let mut a = new_primitive(options, buffer, 2); + let mut a = new_primitive_cursor(options, buffer, 2); let buffer = ScalarBuffer::from(vec![-1, i32::MAX, i32::MIN]); - let mut b = new_primitive(options, buffer, 2); + let mut b = new_primitive_cursor(options, buffer, 2); // 0 > -1 assert_eq!(a.cmp(&b), Ordering::Greater); @@ -404,9 +489,9 @@ mod tests { }; let buffer = ScalarBuffer::from(vec![6, 1, i32::MIN, i32::MAX]); - let mut a = new_primitive(options, buffer, 3); + let mut a = new_primitive_cursor(options, buffer, 3); let buffer = ScalarBuffer::from(vec![67, -3, i32::MAX, i32::MIN]); - let mut b = new_primitive(options, buffer, 2); + let mut b = new_primitive_cursor(options, buffer, 2); // 6 > 67 assert_eq!(a.cmp(&b), Ordering::Greater); @@ -434,9 +519,9 @@ mod tests { }; let buffer = ScalarBuffer::from(vec![i32::MIN, i32::MAX, 6, 3]); - let mut a = new_primitive(options, buffer, 2); + let mut a = new_primitive_cursor(options, buffer, 2); let buffer = ScalarBuffer::from(vec![i32::MAX, 4546, -3]); - let mut b = new_primitive(options, buffer, 1); + let mut b = new_primitive_cursor(options, buffer, 1); // NULL == NULL assert_eq!(a.cmp(&b), Ordering::Equal); @@ -459,4 +544,249 @@ mod tests { b.advance(); assert_eq!(a.cmp(&b), Ordering::Less); } + + #[test] + fn test_slice_primitive() { + let options = SortOptions { + descending: false, + nulls_first: true, + }; + + let buffer = ScalarBuffer::from(vec![0, 1, 2]); + let mut cursor = new_primitive_cursor(options, buffer, 0); + + // from start + let sliced = Cursor::new(cursor.cursor_values().slice(0, 1)); + let expected = new_primitive_cursor(options, ScalarBuffer::from(vec![0]), 0); + assert_eq!( + sliced.cmp(&expected), + Ordering::Equal, + "should slice from start" + ); + + // with offset + let sliced = Cursor::new(cursor.cursor_values().slice(1, 2)); + let expected = new_primitive_cursor(options, ScalarBuffer::from(vec![1]), 0); + assert_eq!( + sliced.cmp(&expected), + Ordering::Equal, + "should slice with offset" + ); + + // cursor current position != start + cursor.advance(); + let sliced = Cursor::new(cursor.cursor_values().slice(0, 1)); + let expected = new_primitive_cursor(options, ScalarBuffer::from(vec![0]), 0); + assert_eq!( + sliced.cmp(&expected), + Ordering::Equal, + "should ignore current cursor position when sliced" + ); + } + + #[test] + fn test_slice_primitive_nulls_first() { + let options = SortOptions { + descending: false, + nulls_first: true, + }; + + let is_min = new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 0); + let is_null = + new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 1); + + let buffer = ScalarBuffer::from(vec![i32::MIN, 79, 2, i32::MIN]); + let mut a = new_primitive_cursor(options, buffer, 2); + let buffer = ScalarBuffer::from(vec![i32::MIN, -284, 3, i32::MIN, 2]); + let mut b = new_primitive_cursor(options, buffer, 2); + + // NULL == NULL + assert_eq!(a, is_null); + assert_eq!(a.cmp(&b), Ordering::Equal); + + // i32::MIN > NULL + a = Cursor::new(a.cursor_values().slice(3, 1)); + assert_eq!(a, is_min); + assert_eq!(a.cmp(&b), Ordering::Greater); + + // i32::MIN == i32::MIN + b = Cursor::new(b.cursor_values().slice(3, 2)); + assert_eq!(b, is_min); + assert_eq!(a.cmp(&b), Ordering::Equal); + + // i32::MIN < 2 + b = Cursor::new(b.cursor_values().slice(1, 1)); + assert_eq!(a.cmp(&b), Ordering::Less); + } + + #[test] + fn test_slice_primitive_nulls_last() { + let options = SortOptions { + descending: false, + nulls_first: false, + }; + + let is_min = new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 0); + let is_null = + new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 1); + + let buffer = ScalarBuffer::from(vec![i32::MIN, 79, 2, i32::MIN]); + let mut a = new_primitive_cursor(options, buffer, 2); + let buffer = ScalarBuffer::from(vec![i32::MIN, -284, 3, i32::MIN, 2]); + let mut b = new_primitive_cursor(options, buffer, 2); + + // i32::MIN == i32::MIN + assert_eq!(a, is_min); + assert_eq!(a.cmp(&b), Ordering::Equal); + + // i32::MIN < -284 + b = Cursor::new(b.cursor_values().slice(1, 3)); // slice to full length + assert_eq!(a.cmp(&b), Ordering::Less); + + // 79 > -284 + a = Cursor::new(a.cursor_values().slice(1, 2)); // slice to shorter than full length + assert_ne!(a, is_null); + assert_eq!(a.cmp(&b), Ordering::Greater); + + // NULL == NULL + a = Cursor::new(a.cursor_values().slice(1, 1)); + b = Cursor::new(b.cursor_values().slice(2, 1)); + assert_eq!(a, is_null); + assert_eq!(b, is_null); + assert_eq!(a.cmp(&b), Ordering::Equal); + } + + #[test] + #[should_panic(expected = "slice offset is out of bounds")] + fn test_slice_primitive_can_panic() { + let options = SortOptions { + descending: false, + nulls_first: true, + }; + + let buffer = ScalarBuffer::from(vec![0, 1, 2]); + let cursor = new_primitive_cursor(options, buffer, 0); + + cursor.cursor_values().slice(42, 1); + } + + #[test] + fn test_slice_rows() { + // rows + let cols = [ + Arc::new(Int16Array::from_iter([Some(1), Some(2), Some(3)])) as ArrayRef, + Arc::new(Float32Array::from_iter([Some(1.3), Some(2.5), Some(4.)])) + as ArrayRef, + ]; + let converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]) + .unwrap(); + + let mut a = Cursor::new(RowValues::new(Arc::new( + converter.convert_columns(&cols).unwrap(), + ))); + let mut b = Cursor::new(RowValues::new(Arc::new( + converter.convert_columns(&cols).unwrap(), + ))); + assert_eq!(a.cursor_values().len(), 3); + + // 1,1.3 == 1,1.3 + assert_eq!(a.cmp(&b), Ordering::Equal); + + // advance A. slice B full length. + // 2,2.5 > 1,1.3 + a.advance(); + b = Cursor::new(b.cursor_values().slice(0, 3)); + assert_eq!(a.cmp(&b), Ordering::Greater); + + // slice B ahead by 2. + // 2,2.5 < 3,4.0 + b = Cursor::new(b.cursor_values().slice(2, 1)); + assert_eq!(a.cmp(&b), Ordering::Less); + + // advanced cursor vs sliced cursor + assert_eq!(a.cursor_values().len(), 3); + assert_eq!(b.cursor_values().len(), 1); + } + + #[test] + #[should_panic(expected = "slice offset is out of bounds")] + fn test_slice_rows_can_panic() { + let cols = [ + Arc::new(Int16Array::from_iter([Some(1)])) as ArrayRef, + Arc::new(Float32Array::from_iter([Some(1.3)])) as ArrayRef, + ]; + let converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]) + .unwrap(); + + let rows = Arc::new(converter.convert_columns(&cols).unwrap()); + let cursor = Cursor::new(RowValues::new(rows)); + + cursor.cursor_values().slice(42, 1); + } + + fn new_bytearray_cursor( + values_str: &str, + offsets: Vec, + ) -> Cursor>> { + let values = Buffer::from_slice_ref(values_str); + let offsets = OffsetBuffer::new(offsets.into()); + + let options = SortOptions { + descending: false, + nulls_first: true, + }; + + Cursor::new(ArrayValues { + values: ByteArrayValues { + offsets: offsets.clone(), + values: values.clone(), + }, + null_threshold: 0, + options, + }) + } + + #[test] + fn test_slice_bytes() { + let mut a = new_bytearray_cursor("hellorainbowworld", vec![0, 5, 12, 17]); + let mut b = new_bytearray_cursor("hellorainbowworld", vec![0, 5, 12, 17]); + + let is_hello = new_bytearray_cursor("hello", vec![0, 5]); + let is_rainbow = new_bytearray_cursor("rainbow", vec![0, 7]); + let is_world = new_bytearray_cursor("world", vec![0, 5]); + + // hello == hello + assert_eq!(a.cmp(&b), Ordering::Equal); + + // advance A. slice B full length. + // rainbow > hello + a.advance(); + b = Cursor::new(b.cursor_values().slice(0, 3)); + assert_eq!(a.cmp(&is_rainbow), Ordering::Equal); + assert_eq!(b.cmp(&is_hello), Ordering::Equal); + assert_eq!(a.cmp(&b), Ordering::Greater); + + // slice B ahead by 2. + // rainbow < world + b = Cursor::new(b.cursor_values().slice(2, 1)); + assert_eq!(b.cmp(&is_world), Ordering::Equal); + assert_eq!(a.cmp(&b), Ordering::Less); + + // advanced cursor vs sliced cursor + assert_eq!(a.cursor_values().len(), 3); + assert_eq!(b.cursor_values().len(), 1); + } + + #[test] + #[should_panic(expected = "slice offset is out of bounds")] + fn test_slice_bytes_should_panic() { + let cursor = new_bytearray_cursor("hellorainbowworld", vec![0, 5, 12, 17]); + cursor.cursor_values().slice(42, 1); + } } diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 4cabdc6e178c..2e7ec17edfde 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -127,7 +127,7 @@ impl RowCursorStream { // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); rows_reservation.try_grow(rows.size())?; - Ok(RowValues::new(rows, rows_reservation)) + Ok(RowValues::new(Arc::new(rows))) } } From 95100720ce278d81a7db0e8c795eb05d66f3588f Mon Sep 17 00:00:00 2001 From: wiedld Date: Mon, 23 Oct 2023 20:08:34 -0700 Subject: [PATCH 2/6] fix(7181): memory reservation is required, but pass by rc'ed --- datafusion/physical-plan/src/sorts/cursor.rs | 57 +++++++++++++------- datafusion/physical-plan/src/sorts/stream.rs | 2 +- 2 files changed, 38 insertions(+), 21 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 594835b33051..031dc4a50fd6 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -27,6 +27,7 @@ use arrow_array::{ Array, ArrowPrimitiveType, GenericByteArray, OffsetSizeTrait, PrimitiveArray, }; use arrow_buffer::{Buffer, OffsetBuffer}; +use datafusion_execution::memory_pool::MemoryReservation; /// A comparable collection of values for use with [`Cursor`] /// @@ -141,18 +142,29 @@ pub struct RowValues { offset: usize, /// Upper bound of windowed RowValues. limit: usize, + + /// Tracks for the memory used by in the `Rows` of this + /// cursor. Freed on drop + #[allow(dead_code)] + reservation: Arc, } impl RowValues { /// Create a new [`RowValues`] from `Arc`. /// /// Panics if `rows` is empty. - pub fn new(rows: Arc) -> Self { + pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { + assert_eq!( + rows.size(), + reservation.size(), + "memory reservation mismatch" + ); assert!(rows.num_rows() > 0); Self { offset: 0, limit: rows.num_rows(), - rows, + rows: Arc::new(rows), + reservation: Arc::new(reservation), } } @@ -189,6 +201,7 @@ impl CursorValues for RowValues { rows: self.rows.clone(), offset: self.offset + offset, limit: self.offset + offset + length, + reservation: self.reservation.clone(), } } } @@ -389,6 +402,9 @@ mod tests { use arrow::row::{RowConverter, SortField}; use arrow_array::{ArrayRef, Float32Array, Int16Array}; use arrow_schema::DataType; + use datafusion_execution::memory_pool::{ + GreedyMemoryPool, MemoryConsumer, MemoryPool, + }; use super::*; @@ -670,6 +686,22 @@ mod tests { cursor.cursor_values().slice(42, 1); } + fn new_row_cursor(cols: &[Arc; 2]) -> Cursor { + let converter = RowConverter::new(vec![ + SortField::new(DataType::Int16), + SortField::new(DataType::Float32), + ]) + .unwrap(); + + let rows = converter.convert_columns(cols).unwrap(); + + let pool: Arc = Arc::new(GreedyMemoryPool::new(rows.size())); + let mut reservation = MemoryConsumer::new("test").register(&pool); + reservation.try_grow(rows.size()).unwrap(); + + Cursor::new(RowValues::new(rows, reservation)) + } + #[test] fn test_slice_rows() { // rows @@ -678,18 +710,9 @@ mod tests { Arc::new(Float32Array::from_iter([Some(1.3), Some(2.5), Some(4.)])) as ArrayRef, ]; - let converter = RowConverter::new(vec![ - SortField::new(DataType::Int16), - SortField::new(DataType::Float32), - ]) - .unwrap(); - let mut a = Cursor::new(RowValues::new(Arc::new( - converter.convert_columns(&cols).unwrap(), - ))); - let mut b = Cursor::new(RowValues::new(Arc::new( - converter.convert_columns(&cols).unwrap(), - ))); + let mut a = new_row_cursor(&cols); + let mut b = new_row_cursor(&cols); assert_eq!(a.cursor_values().len(), 3); // 1,1.3 == 1,1.3 @@ -718,14 +741,8 @@ mod tests { Arc::new(Int16Array::from_iter([Some(1)])) as ArrayRef, Arc::new(Float32Array::from_iter([Some(1.3)])) as ArrayRef, ]; - let converter = RowConverter::new(vec![ - SortField::new(DataType::Int16), - SortField::new(DataType::Float32), - ]) - .unwrap(); - let rows = Arc::new(converter.convert_columns(&cols).unwrap()); - let cursor = Cursor::new(RowValues::new(rows)); + let cursor = new_row_cursor(&cols); cursor.cursor_values().slice(42, 1); } diff --git a/datafusion/physical-plan/src/sorts/stream.rs b/datafusion/physical-plan/src/sorts/stream.rs index 2e7ec17edfde..4cabdc6e178c 100644 --- a/datafusion/physical-plan/src/sorts/stream.rs +++ b/datafusion/physical-plan/src/sorts/stream.rs @@ -127,7 +127,7 @@ impl RowCursorStream { // track the memory in the newly created Rows. let mut rows_reservation = self.reservation.new_empty(); rows_reservation.try_grow(rows.size())?; - Ok(RowValues::new(Arc::new(rows))) + Ok(RowValues::new(rows, rows_reservation)) } } From 6156a57571ac32e660c2d998215ad3eb25db3a30 Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 24 Oct 2023 12:23:39 -0700 Subject: [PATCH 3/6] fix(7181): upper bounds check should be non-inclusive for RowValues --- datafusion/physical-plan/src/sorts/cursor.rs | 20 +++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 031dc4a50fd6..779d1ae13c5c 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -140,7 +140,7 @@ pub struct RowValues { /// Lower bound of windowed RowValues. offset: usize, - /// Upper bound of windowed RowValues. + /// Upper bound of windowed RowValues (not inclusive). limit: usize, /// Tracks for the memory used by in the `Rows` of this @@ -189,7 +189,7 @@ impl CursorValues for RowValues { fn slice(&self, offset: usize, length: usize) -> Self { assert!( - offset >= self.offset && self.offset + offset <= self.limit, + offset >= self.offset && self.offset + offset < self.limit, "slice offset is out of bounds" ); assert!( @@ -683,7 +683,9 @@ mod tests { let buffer = ScalarBuffer::from(vec![0, 1, 2]); let cursor = new_primitive_cursor(options, buffer, 0); - cursor.cursor_values().slice(42, 1); + cursor + .cursor_values() + .slice(cursor.cursor_values().len(), 1); } fn new_row_cursor(cols: &[Arc; 2]) -> Cursor { @@ -744,7 +746,9 @@ mod tests { let cursor = new_row_cursor(&cols); - cursor.cursor_values().slice(42, 1); + cursor + .cursor_values() + .slice(cursor.cursor_values().len(), 1); } fn new_bytearray_cursor( @@ -770,7 +774,7 @@ mod tests { } #[test] - fn test_slice_bytes() { + fn test_slice_bytearray() { let mut a = new_bytearray_cursor("hellorainbowworld", vec![0, 5, 12, 17]); let mut b = new_bytearray_cursor("hellorainbowworld", vec![0, 5, 12, 17]); @@ -802,8 +806,10 @@ mod tests { #[test] #[should_panic(expected = "slice offset is out of bounds")] - fn test_slice_bytes_should_panic() { + fn test_slice_bytearray_should_panic() { let cursor = new_bytearray_cursor("hellorainbowworld", vec![0, 5, 12, 17]); - cursor.cursor_values().slice(42, 1); + cursor + .cursor_values() + .slice(cursor.cursor_values().len(), 1); } } From af1a55456618f7b936b7d8bc08788f5ecb3bbd2f Mon Sep 17 00:00:00 2001 From: wiedld Date: Tue, 24 Oct 2023 13:25:23 -0700 Subject: [PATCH 4/6] fix(7181): fix bounds check on RowValues::slice, and add regression tests to add CursorValue implementations --- datafusion/physical-plan/src/sorts/cursor.rs | 58 +++++++++++++++++--- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 779d1ae13c5c..8f3d8525aed9 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -189,7 +189,7 @@ impl CursorValues for RowValues { fn slice(&self, offset: usize, length: usize) -> Self { assert!( - offset >= self.offset && self.offset + offset < self.limit, + self.offset + offset < self.limit, "slice offset is out of bounds" ); assert!( @@ -568,7 +568,7 @@ mod tests { nulls_first: true, }; - let buffer = ScalarBuffer::from(vec![0, 1, 2]); + let buffer = ScalarBuffer::from(vec![0, 1, 2, 3]); let mut cursor = new_primitive_cursor(options, buffer, 0); // from start @@ -598,6 +598,16 @@ mod tests { Ordering::Equal, "should ignore current cursor position when sliced" ); + + // slicing on a slice (a.k.a. combining offsets) + let sliced = Cursor::new(cursor.cursor_values().slice(1, 3)); + let sliced = Cursor::new(sliced.cursor_values().slice(2, 1)); + let expected = new_primitive_cursor(options, ScalarBuffer::from(vec![3]), 0); + assert_eq!( + sliced.cmp(&expected), + Ordering::Equal, + "should respect previous slice/windowed boundaries, when re-slicing" + ); } #[test] @@ -708,14 +718,19 @@ mod tests { fn test_slice_rows() { // rows let cols = [ - Arc::new(Int16Array::from_iter([Some(1), Some(2), Some(3)])) as ArrayRef, - Arc::new(Float32Array::from_iter([Some(1.3), Some(2.5), Some(4.)])) + Arc::new(Int16Array::from_iter([Some(1), Some(2), Some(3), Some(4)])) as ArrayRef, + Arc::new(Float32Array::from_iter([ + Some(1.3), + Some(2.5), + Some(4.), + Some(4.2), + ])) as ArrayRef, ]; let mut a = new_row_cursor(&cols); let mut b = new_row_cursor(&cols); - assert_eq!(a.cursor_values().len(), 3); + assert_eq!(a.cursor_values().len(), 4); // 1,1.3 == 1,1.3 assert_eq!(a.cmp(&b), Ordering::Equal); @@ -732,8 +747,22 @@ mod tests { assert_eq!(a.cmp(&b), Ordering::Less); // advanced cursor vs sliced cursor - assert_eq!(a.cursor_values().len(), 3); + assert_eq!(a.cursor_values().len(), 4); assert_eq!(b.cursor_values().len(), 1); + + // slicing on a slice (a.k.a. combining offsets) + let cursor = new_row_cursor(&cols); + let sliced = Cursor::new(cursor.cursor_values().slice(1, 3)); + let sliced = Cursor::new(sliced.cursor_values().slice(2, 1)); + let mut expected = new_row_cursor(&cols); + expected.advance(); + expected.advance(); + expected.advance(); + assert_eq!( + sliced.cmp(&expected), + Ordering::Equal, + "should respect previous slice/windowed boundaries, when re-slicing" + ); } #[test] @@ -775,12 +804,13 @@ mod tests { #[test] fn test_slice_bytearray() { - let mut a = new_bytearray_cursor("hellorainbowworld", vec![0, 5, 12, 17]); - let mut b = new_bytearray_cursor("hellorainbowworld", vec![0, 5, 12, 17]); + let mut a = new_bytearray_cursor("hellorainbowworldzoo", vec![0, 5, 12, 17, 20]); + let mut b = new_bytearray_cursor("hellorainbowworldzoo", vec![0, 5, 12, 17, 20]); let is_hello = new_bytearray_cursor("hello", vec![0, 5]); let is_rainbow = new_bytearray_cursor("rainbow", vec![0, 7]); let is_world = new_bytearray_cursor("world", vec![0, 5]); + let is_zoo = new_bytearray_cursor("zoo", vec![0, 3]); // hello == hello assert_eq!(a.cmp(&b), Ordering::Equal); @@ -800,8 +830,18 @@ mod tests { assert_eq!(a.cmp(&b), Ordering::Less); // advanced cursor vs sliced cursor - assert_eq!(a.cursor_values().len(), 3); + assert_eq!(a.cursor_values().len(), 4); assert_eq!(b.cursor_values().len(), 1); + + // slicing on a slice (a.k.a. combining offsets) + let cursor = new_bytearray_cursor("hellorainbowworldzoo", vec![0, 5, 12, 17, 20]); + let sliced = Cursor::new(cursor.cursor_values().slice(1, 3)); + let sliced = Cursor::new(sliced.cursor_values().slice(2, 1)); + assert_eq!( + sliced.cmp(&is_zoo), + Ordering::Equal, + "should respect previous slice/windowed boundaries, when re-slicing" + ); } #[test] From 6f8420ebef67ec524a637a8d3ddc0b707df64c9c Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 26 Oct 2023 10:41:14 -0700 Subject: [PATCH 5/6] chore(7181): update doc comments --- datafusion/physical-plan/src/sorts/cursor.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index 8f3d8525aed9..a898501e02e5 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -42,11 +42,11 @@ pub trait CursorValues { /// Returns comparison of `l[l_idx]` and `r[r_idx]` fn compare(l: &Self, l_idx: usize, r: &Self, r_idx: usize) -> Ordering; - /// Slice at a given row index, returning a new Self + /// Returns a zero-copy slice of this [`CursorValues`] with the indicated offset and length. /// /// # Panics /// - /// Panics if the slice is out of bounds, or memory is insufficient + /// Panics if the slice is out of bounds fn slice(&self, offset: usize, length: usize) -> Self; } @@ -138,9 +138,9 @@ impl Ord for Cursor { pub struct RowValues { rows: Arc, - /// Lower bound of windowed RowValues. + /// Lower bound within `rows`. offset: usize, - /// Upper bound of windowed RowValues (not inclusive). + /// Upper bound within `rows` (not inclusive). limit: usize, /// Tracks for the memory used by in the `Rows` of this @@ -150,7 +150,7 @@ pub struct RowValues { } impl RowValues { - /// Create a new [`RowValues`] from `Arc`. + /// Create a new [`RowValues`] from [`Rows`]. /// /// Panics if `rows` is empty. pub fn new(rows: Rows, reservation: MemoryReservation) -> Self { From 164e7bc0bc483bff1da0dcb9bce9824df4ec8d4a Mon Sep 17 00:00:00 2001 From: wiedld Date: Thu, 26 Oct 2023 12:36:28 -0700 Subject: [PATCH 6/6] test(7181): refactor null handling tests, to make behavior differences more apparent --- datafusion/physical-plan/src/sorts/cursor.rs | 282 +++++++++++-------- 1 file changed, 171 insertions(+), 111 deletions(-) diff --git a/datafusion/physical-plan/src/sorts/cursor.rs b/datafusion/physical-plan/src/sorts/cursor.rs index a898501e02e5..3a0cb61398fc 100644 --- a/datafusion/physical-plan/src/sorts/cursor.rs +++ b/datafusion/physical-plan/src/sorts/cursor.rs @@ -427,6 +427,47 @@ mod tests { Cursor::new(values) } + #[test] + fn test_ord_nulls() { + // nulls first + let options = SortOptions { + descending: false, + nulls_first: true, + }; + let is_min = new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 0); + let is_null = + new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 1); + assert_eq!( + is_min.cmp(&is_null), + Ordering::Greater, + "should have MIN > NULL, when nulls first" + ); + assert_eq!( + is_null.cmp(&is_min), + Ordering::Less, + "should have NULL < MIN, when nulls first" + ); + + // nulls last + let options = SortOptions { + descending: false, + nulls_first: false, + }; + let is_min = new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 0); + let is_null = + new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 1); + assert_eq!( + is_min.cmp(&is_null), + Ordering::Less, + "should have MIN < NULL, when nulls last" + ); + assert_eq!( + is_null.cmp(&is_min), + Ordering::Greater, + "should have NULL > MIN, when nulls last" + ); + } + #[test] fn test_primitive_nulls_first() { let options = SortOptions { @@ -563,125 +604,144 @@ mod tests { #[test] fn test_slice_primitive() { - let options = SortOptions { - descending: false, - nulls_first: true, - }; - - let buffer = ScalarBuffer::from(vec![0, 1, 2, 3]); - let mut cursor = new_primitive_cursor(options, buffer, 0); - - // from start - let sliced = Cursor::new(cursor.cursor_values().slice(0, 1)); - let expected = new_primitive_cursor(options, ScalarBuffer::from(vec![0]), 0); - assert_eq!( - sliced.cmp(&expected), - Ordering::Equal, - "should slice from start" - ); + fn run_test(options: SortOptions, num_nulls: usize, scenario: &str) { + let is_min = Arc::new(new_primitive_cursor( + options, + ScalarBuffer::from(vec![i32::MIN]), + 0, + )); + let is_null = Arc::new(new_primitive_cursor( + options, + ScalarBuffer::from(vec![i32::MIN]), + 1, + )); + + let buffer = ScalarBuffer::from(vec![i32::MIN, 79, 2, i32::MIN]); + let mut a = new_primitive_cursor(options, buffer, num_nulls); + let buffer = ScalarBuffer::from(vec![i32::MIN, -284, 3, i32::MIN, 2]); + let mut b = new_primitive_cursor(options, buffer.clone(), num_nulls); + + // start + // NULL == NULL, or i32::MIN == i32::MIN + assert_eq!(a.cmp(&b), Ordering::Equal); + let expected_val = match (options.nulls_first, num_nulls > 0) { + (true, true) => is_null.clone(), // nulls_first + (false, true) => is_min.clone(), // nulls_last + (_, false) => is_min.clone(), // no_nulls + }; + assert_eq!( + a.cmp(&expected_val), + Ordering::Equal, + "{scenario}: should have null mask applied properly, at start of values" + ); + + // start + // NULL == NULL, or i32::MIN == i32::MIN + a.advance(); + a.advance(); + a = Cursor::new(a.cursor_values().slice(0, 4)); + assert_eq!( + a.cmp(&b), + Ordering::Equal, + "{scenario}: should ignore cursor position when sliced" + ); + assert_eq!( + a.cursor_values().len(), + 4, + "{scenario}: should be able to slice to the full length" + ); + + // slice a + // i32::MIN > NULL (nulls first), or NULL > i32::MIN (nulls last), or i32::MIN == i32::MIN (no nulls) + a = Cursor::new(a.cursor_values().slice(3, 1)); + let (expected_val, expected_order) = + match (options.nulls_first, num_nulls > 0) { + (true, true) => (is_min.clone(), Ordering::Greater), // nulls_first + (false, true) => (is_null, Ordering::Greater), // nulls_last, so nulls are considered greater + (_, false) => (is_min, Ordering::Equal), // no_nulls + }; + assert_eq!( + a.cmp(&expected_val), + Ordering::Equal, + "{scenario}: should have null mask applied properly, at end of values" + ); + assert_eq!( + a.cmp(&b), + expected_order, + "{scenario}: should be able to slice with offset, with null mask" + ); + assert_eq!( + a.cursor_values().len(), + 1, + "{scenario}: should be able to slice to shorten length" + ); + + // slice b, compare sliced_a to sliced_b + // i32::MIN == i32::MIN, or NULL == NULL + b = Cursor::new(b.cursor_values().slice(3, 2)); + assert_eq!( + a.cmp(&b), + Ordering::Equal, + "{scenario}: should be able to slice with offset" + ); + assert_eq!(b.cursor_values().len(), 2, "{scenario}: should have a smaller apparent length for the underlying cursor values"); + + // re-slice b + // i32::MIN < 2 (nulls first), or NULL == NULL (nulls last), or i32::MIN < 2 (no nulls) + b = Cursor::new(b.cursor_values().slice(1, 1)); + let expected_order = match (options.nulls_first, num_nulls > 0) { + (true, true) => Ordering::Less, // nulls_first + (false, true) => Ordering::Equal, // nulls_last + (_, false) => Ordering::Less, // no_nulls + }; + assert_eq!(a.cmp(&b), expected_order, "{scenario}: should respect previous slice/windowed boundaries, when re-slicing"); + + // length change: on slice vs advance + let mut cursor = new_primitive_cursor(options, buffer, num_nulls); + assert_eq!( + cursor.cursor_values().len(), + 5, + "{scenario}: expect initial length" + ); + cursor.advance(); + cursor.advance(); + assert_eq!(cursor.cursor_values().len(), 5, "{scenario}: expect advancing cursor does not impact cursor_values length"); + cursor = Cursor::new(cursor.cursor_values().slice(2, 2)); + assert_eq!( + cursor.cursor_values().len(), + 2, + "{scenario}: expect cursor_values slicing to impact length" + ); + } - // with offset - let sliced = Cursor::new(cursor.cursor_values().slice(1, 2)); - let expected = new_primitive_cursor(options, ScalarBuffer::from(vec![1]), 0); - assert_eq!( - sliced.cmp(&expected), - Ordering::Equal, - "should slice with offset" + run_test( + SortOptions { + descending: false, + nulls_first: true, + }, + 2, + "nulls_first", ); - // cursor current position != start - cursor.advance(); - let sliced = Cursor::new(cursor.cursor_values().slice(0, 1)); - let expected = new_primitive_cursor(options, ScalarBuffer::from(vec![0]), 0); - assert_eq!( - sliced.cmp(&expected), - Ordering::Equal, - "should ignore current cursor position when sliced" + run_test( + SortOptions { + descending: false, + nulls_first: false, + }, + 2, + "nulls_last", ); - // slicing on a slice (a.k.a. combining offsets) - let sliced = Cursor::new(cursor.cursor_values().slice(1, 3)); - let sliced = Cursor::new(sliced.cursor_values().slice(2, 1)); - let expected = new_primitive_cursor(options, ScalarBuffer::from(vec![3]), 0); - assert_eq!( - sliced.cmp(&expected), - Ordering::Equal, - "should respect previous slice/windowed boundaries, when re-slicing" + run_test( + SortOptions { + descending: false, + nulls_first: false, + }, + 0, + "no_nulls", ); } - #[test] - fn test_slice_primitive_nulls_first() { - let options = SortOptions { - descending: false, - nulls_first: true, - }; - - let is_min = new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 0); - let is_null = - new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 1); - - let buffer = ScalarBuffer::from(vec![i32::MIN, 79, 2, i32::MIN]); - let mut a = new_primitive_cursor(options, buffer, 2); - let buffer = ScalarBuffer::from(vec![i32::MIN, -284, 3, i32::MIN, 2]); - let mut b = new_primitive_cursor(options, buffer, 2); - - // NULL == NULL - assert_eq!(a, is_null); - assert_eq!(a.cmp(&b), Ordering::Equal); - - // i32::MIN > NULL - a = Cursor::new(a.cursor_values().slice(3, 1)); - assert_eq!(a, is_min); - assert_eq!(a.cmp(&b), Ordering::Greater); - - // i32::MIN == i32::MIN - b = Cursor::new(b.cursor_values().slice(3, 2)); - assert_eq!(b, is_min); - assert_eq!(a.cmp(&b), Ordering::Equal); - - // i32::MIN < 2 - b = Cursor::new(b.cursor_values().slice(1, 1)); - assert_eq!(a.cmp(&b), Ordering::Less); - } - - #[test] - fn test_slice_primitive_nulls_last() { - let options = SortOptions { - descending: false, - nulls_first: false, - }; - - let is_min = new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 0); - let is_null = - new_primitive_cursor(options, ScalarBuffer::from(vec![i32::MIN]), 1); - - let buffer = ScalarBuffer::from(vec![i32::MIN, 79, 2, i32::MIN]); - let mut a = new_primitive_cursor(options, buffer, 2); - let buffer = ScalarBuffer::from(vec![i32::MIN, -284, 3, i32::MIN, 2]); - let mut b = new_primitive_cursor(options, buffer, 2); - - // i32::MIN == i32::MIN - assert_eq!(a, is_min); - assert_eq!(a.cmp(&b), Ordering::Equal); - - // i32::MIN < -284 - b = Cursor::new(b.cursor_values().slice(1, 3)); // slice to full length - assert_eq!(a.cmp(&b), Ordering::Less); - - // 79 > -284 - a = Cursor::new(a.cursor_values().slice(1, 2)); // slice to shorter than full length - assert_ne!(a, is_null); - assert_eq!(a.cmp(&b), Ordering::Greater); - - // NULL == NULL - a = Cursor::new(a.cursor_values().slice(1, 1)); - b = Cursor::new(b.cursor_values().slice(2, 1)); - assert_eq!(a, is_null); - assert_eq!(b, is_null); - assert_eq!(a.cmp(&b), Ordering::Equal); - } - #[test] #[should_panic(expected = "slice offset is out of bounds")] fn test_slice_primitive_can_panic() {