Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support StringViewArray interop with python: fix lingering C Data Interface issues for *ViewArray #6368

Merged
merged 8 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ on:
- arrow/**

jobs:

integration:
name: Archery test With other arrows
runs-on: ubuntu-latest
Expand Down Expand Up @@ -118,9 +117,9 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
rust: [ stable ]
# PyArrow 13 was the last version prior to introduction to Arrow PyCapsules
pyarrow: [ "13", "14" ]
rust: [stable]
# PyArrow 15 was the first version to introduce StringView/BinaryView support
pyarrow: ["15", "16", "17"]
steps:
- uses: actions/checkout@v4
with:
Expand Down
59 changes: 49 additions & 10 deletions arrow-array/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,13 @@ fn bit_width(data_type: &DataType, i: usize) -> Result<usize> {
"The datatype \"{data_type:?}\" expects 3 buffers, but requested {i}. Please verify that the C data interface is correctly implemented."
)))
}
// Variable-sized views: have 3 or more buffers.
// Buffer 1 are the u128 views
// Buffers 2...N-1 are u8 byte buffers
(DataType::Utf8View, 1) | (DataType::BinaryView,1) => u128::BITS as _,
(DataType::Utf8View, _) | (DataType::BinaryView, _) => {
u8::BITS as _
}
// type ids. UnionArray doesn't have null bitmap so buffer index begins with 0.
(DataType::Union(_, _), 0) => i8::BITS as _,
// Only DenseUnion has 2nd buffer
Expand Down Expand Up @@ -300,7 +307,7 @@ impl<'a> ImportedArrowArray<'a> {
};

let data_layout = layout(&self.data_type);
let buffers = self.buffers(data_layout.can_contain_null_mask)?;
let buffers = self.buffers(data_layout.can_contain_null_mask, data_layout.variadic)?;

let null_bit_buffer = if data_layout.can_contain_null_mask {
self.null_bit_buffer()
Expand Down Expand Up @@ -373,13 +380,30 @@ impl<'a> ImportedArrowArray<'a> {

/// returns all buffers, as organized by Rust (i.e. null buffer is skipped if it's present
/// in the spec of the type)
fn buffers(&self, can_contain_null_mask: bool) -> Result<Vec<Buffer>> {
fn buffers(&self, can_contain_null_mask: bool, variadic: bool) -> Result<Vec<Buffer>> {
// + 1: skip null buffer
let buffer_begin = can_contain_null_mask as usize;
(buffer_begin..self.array.num_buffers())
.map(|index| {
let len = self.buffer_len(index, &self.data_type)?;
let buffer_end = self.array.num_buffers() - usize::from(variadic);

let variadic_buffer_lens = if variadic {
// Each views array has 1 (optional) null buffer, 1 views buffer, 1 lengths buffer.
// Rest are variadic.
let num_variadic_buffers =
self.array.num_buffers() - (2 + usize::from(can_contain_null_mask));
if num_variadic_buffers == 0 {
&[]
} else {
let lengths = self.array.buffer(self.array.num_buffers() - 1);
// SAFETY: is lengths is non-null, then it must be valid for up to num_variadic_buffers.
unsafe { std::slice::from_raw_parts(lengths.cast::<i64>(), num_variadic_buffers) }
}
} else {
&[]
};

(buffer_begin..buffer_end)
.map(|index| {
let len = self.buffer_len(index, variadic_buffer_lens, &self.data_type)?;
match unsafe { create_buffer(self.owner.clone(), self.array, index, len) } {
Some(buf) => Ok(buf),
None if len == 0 => {
Expand All @@ -399,7 +423,12 @@ impl<'a> ImportedArrowArray<'a> {
/// Rust implementation uses fixed-sized buffers, which require knowledge of their `len`.
/// for variable-sized buffers, such as the second buffer of a stringArray, we need
/// to fetch offset buffer's len to build the second buffer.
fn buffer_len(&self, i: usize, dt: &DataType) -> Result<usize> {
fn buffer_len(
&self,
i: usize,
variadic_buffer_lengths: &[i64],
dt: &DataType,
) -> Result<usize> {
// Special handling for dictionary type as we only care about the key type in the case.
let data_type = match dt {
DataType::Dictionary(key_data_type, _) => key_data_type.as_ref(),
Expand Down Expand Up @@ -430,7 +459,7 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i32`, as Utf8 uses `i32` offsets.
#[allow(clippy::cast_ptr_alignment)]
Expand All @@ -444,14 +473,24 @@ impl<'a> ImportedArrowArray<'a> {
}

// the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1)
let len = self.buffer_len(1, dt)?;
let len = self.buffer_len(1, variadic_buffer_lengths, dt)?;
// first buffer is the null buffer => add(1)
// we assume that pointer is aligned for `i64`, as Large uses `i64` offsets.
#[allow(clippy::cast_ptr_alignment)]
let offset_buffer = self.array.buffer(1) as *const i64;
// get last offset
(unsafe { *offset_buffer.add(len / size_of::<i64>() - 1) }) as usize
}
// View types: these have variadic buffers.
// Buffer 1 is the views buffer, which stores 1 u128 per length of the array.
// Buffers 2..N-1 are the buffers holding the byte data. Their lengths are variable.
// Buffer N is of length (N - 2) and stores i64 containing the lengths of buffers 2..N-1
(DataType::Utf8View, 1) | (DataType::BinaryView, 1) => {
std::mem::size_of::<u128>() * length
}
(DataType::Utf8View, i) | (DataType::BinaryView, i) => {
variadic_buffer_lengths[i - 2] as usize
}
// buffer len of primitive types
_ => {
let bits = bit_width(data_type, i)?;
Expand Down Expand Up @@ -1453,8 +1492,8 @@ mod tests_from_ffi {
owner: &array,
};

let offset_buf_len = imported_array.buffer_len(1, &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &imported_array.data_type)?;
let offset_buf_len = imported_array.buffer_len(1, &[], &imported_array.data_type)?;
let data_buf_len = imported_array.buffer_len(2, &[], &imported_array.data_type)?;

assert_eq!(offset_buf_len, 4);
assert_eq!(data_buf_len, 0);
Expand Down
7 changes: 5 additions & 2 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,9 @@ impl Buffer {
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: offset={} length={}",
offset,
self.length
);
self.length -= offset;
// Safety:
Expand All @@ -221,7 +223,8 @@ impl Buffer {
pub fn slice_with_length(&self, offset: usize, length: usize) -> Self {
assert!(
offset.saturating_add(length) <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
"the offset of the new Buffer cannot exceed the existing length: slice offset={offset} length={length} selflen={}",
self.length
);
// Safety:
// offset + length <= self.length
Expand Down
20 changes: 16 additions & 4 deletions arrow-data/src/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::bit_mask::set_bits;
use crate::{layout, ArrayData};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_buffer::{Buffer, MutableBuffer, ScalarBuffer};
use arrow_schema::DataType;
use std::ffi::c_void;

Expand Down Expand Up @@ -121,7 +121,7 @@ impl FFI_ArrowArray {
pub fn new(data: &ArrayData) -> Self {
let data_layout = layout(data.data_type());

let buffers = if data_layout.can_contain_null_mask {
let mut buffers = if data_layout.can_contain_null_mask {
// * insert the null buffer at the start
// * make all others `Option<Buffer>`.
std::iter::once(align_nulls(data.offset(), data.nulls()))
Expand All @@ -132,7 +132,7 @@ impl FFI_ArrowArray {
};

// `n_buffers` is the number of buffers by the spec.
let n_buffers = {
let mut n_buffers = {
data_layout.buffers.len() + {
// If the layout has a null buffer by Arrow spec.
// Note that even the array doesn't have a null buffer because it has
Expand All @@ -141,10 +141,22 @@ impl FFI_ArrowArray {
}
} as i64;

if data_layout.variadic {
// Save the lengths of all variadic buffers into a new buffer.
// The first buffer is `views`, and the rest are variadic.
let mut data_buffers_lengths = Vec::new();
for buffer in data.buffers().iter().skip(1) {
data_buffers_lengths.push(buffer.len() as i64);
n_buffers += 1;
}

buffers.push(Some(ScalarBuffer::from(data_buffers_lengths).into_inner()));
n_buffers += 1;
}

let buffers_ptr = buffers
.iter()
.flat_map(|maybe_buffer| match maybe_buffer {
// note that `raw_data` takes into account the buffer's offset
Some(b) => Some(b.as_ptr() as *const c_void),
// This is for null buffer. We only put a null pointer for
// null buffer if by spec it can contain null mask.
Expand Down
2 changes: 1 addition & 1 deletion arrow/src/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ impl FromPyArrow for RecordBatch {
validate_pycapsule(array_capsule, "arrow_array")?;

let schema_ptr = unsafe { schema_capsule.reference::<FFI_ArrowSchema>() };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer() as _) };
let ffi_array = unsafe { FFI_ArrowArray::from_raw(array_capsule.pointer().cast()) };
let array_data = unsafe { ffi::from_ffi(ffi_array, schema_ptr) }.map_err(to_py_err)?;
if !matches!(array_data.data_type(), DataType::Struct(_)) {
return Err(PyTypeError::new_err(
Expand Down
5 changes: 4 additions & 1 deletion arrow/tests/pyarrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
use arrow::array::{ArrayRef, Int32Array, StringArray};
use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use arrow::record_batch::RecordBatch;
use arrow_array::StringViewArray;
use pyo3::Python;
use std::sync::Arc;

Expand All @@ -27,7 +28,9 @@ fn test_to_pyarrow() {

let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
let b: ArrayRef = Arc::new(StringArray::from(vec!["a", "b"]));
let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b)]).unwrap();
// The "very long string" will not be inlined, and force the creation of a data buffer.
let c: ArrayRef = Arc::new(StringViewArray::from(vec!["short", "a very long string"]));
let input = RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap();
println!("input: {:?}", input);

let res = Python::with_gil(|py| {
Expand Down
Loading