Skip to content

Commit

Permalink
feat(parquet): coerce_types flag for date64 (apache#1938)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsgibbons committed Aug 27, 2024
1 parent b711f23 commit 5566ca6
Show file tree
Hide file tree
Showing 8 changed files with 303 additions and 55 deletions.
120 changes: 111 additions & 9 deletions parquet/src/arrow/array_reader/primitive_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,8 @@ where
// As there is not always a 1:1 mapping between Arrow and Parquet, there
// are datatypes which we must convert explicitly.
// These are:
// - date64: we should cast int32 to date32, then date32 to date64.
// - decimal: cast in32 to decimal, int64 to decimal
// - decimal: cast int32 to decimal, int64 to decimal
let array = match target_type {
ArrowType::Date64 => {
// this is cheap as it internally reinterprets the data
let a = arrow_cast::cast(&array, &ArrowType::Date32)?;
arrow_cast::cast(&a, target_type)?
}
ArrowType::Decimal128(p, s) => {
// Apply conversion to all elements regardless of null slots as the conversion
// to `i128` is infallible. This improves performance by avoiding a branch in
Expand Down Expand Up @@ -305,9 +299,9 @@ mod tests {
use crate::util::test_common::rand_gen::make_pages;
use crate::util::InMemoryPageIterator;
use arrow::datatypes::ArrowPrimitiveType;
use arrow_array::{Array, PrimitiveArray};
use arrow_array::{Array, Date32Array, Date64Array, PrimitiveArray};

use arrow::datatypes::DataType::Decimal128;
use arrow::datatypes::DataType::{Date32, Date64, Decimal128};
use rand::distributions::uniform::SampleUniform;
use std::collections::VecDeque;

Expand Down Expand Up @@ -545,6 +539,14 @@ mod tests {
arrow::datatypes::Int32Type,
i32
);
test_primitive_array_reader_one_type!(
crate::data_type::Int64Type,
PhysicalType::INT64,
"DATE",
arrow::datatypes::Date64Type,
arrow::datatypes::Int64Type,
i64
);
test_primitive_array_reader_one_type!(
crate::data_type::Int32Type,
PhysicalType::INT32,
Expand Down Expand Up @@ -783,4 +785,104 @@ mod tests {
assert_ne!(array, &data_decimal_array)
}
}

#[test]
fn test_primitive_array_reader_date32_type() {
// parquet `INT32` to date
let message_type = "
message test_schema {
REQUIRED INT32 date1 (DATE);
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);

// create the array reader
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chunks::<Int32Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
-99999999,
99999999,
&mut Vec::new(),
&mut Vec::new(),
&mut data,
&mut page_lists,
true,
2,
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int32Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();

// read data from the reader
// the data type is date
let array = array_reader.next_batch(50).unwrap();
assert_eq!(array.data_type(), &Date32);
let array = array.as_any().downcast_ref::<Date32Array>().unwrap();
let data_date_array = data[0..50]
.iter()
.copied()
.map(Some)
.collect::<Date32Array>();
assert_eq!(array, &data_date_array);
}
}

#[test]
fn test_primitive_array_reader_date64_type() {
// parquet `INT64` to date
let message_type = "
message test_schema {
REQUIRED INT64 date1 (DATE);
}
";
let schema = parse_message_type(message_type)
.map(|t| Arc::new(SchemaDescriptor::new(Arc::new(t))))
.unwrap();
let column_desc = schema.column(0);

// create the array reader
{
let mut data = Vec::new();
let mut page_lists = Vec::new();
make_column_chunks::<Int64Type>(
column_desc.clone(),
Encoding::PLAIN,
100,
-999999999999999999,
999999999999999999,
&mut Vec::new(),
&mut Vec::new(),
&mut data,
&mut page_lists,
true,
2,
);
let page_iterator = InMemoryPageIterator::new(page_lists);

let mut array_reader =
PrimitiveArrayReader::<Int64Type>::new(Box::new(page_iterator), column_desc, None)
.unwrap();

// read data from the reader
// the data type is date
let array = array_reader.next_batch(50).unwrap();
assert_eq!(array.data_type(), &Date64);
let array = array.as_any().downcast_ref::<Date64Array>().unwrap();
let data_date_array = data[0..50]
.iter()
.copied()
.map(Some)
.collect::<Date64Array>();
assert_eq!(array, &data_date_array);
}
}
}
115 changes: 113 additions & 2 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,8 +948,8 @@ mod tests {
use arrow_array::builder::*;
use arrow_array::cast::AsArray;
use arrow_array::types::{
Decimal128Type, Decimal256Type, DecimalType, Float16Type, Float32Type, Float64Type,
Time32MillisecondType, Time64MicrosecondType,
Date32Type, Date64Type, Decimal128Type, Decimal256Type, DecimalType, Float16Type,
Float32Type, Float64Type, Time32MillisecondType, Time64MicrosecondType,
};
use arrow_array::*;
use arrow_buffer::{i256, ArrowNativeType, Buffer, IntervalDayTime};
Expand Down Expand Up @@ -1288,6 +1288,117 @@ mod tests {
Ok(())
}

#[test]
fn test_date32_roundtrip() -> Result<()> {
use arrow_array::Date32Array;

let schema = Arc::new(Schema::new(vec![Field::new(
"date32",
ArrowDataType::Date32,
false,
)]));

let mut buf = Vec::with_capacity(1024);

let mut writer = ArrowWriter::try_new(&mut buf, schema.clone(), None)?;

let original = RecordBatch::try_new(
schema,
vec![Arc::new(Date32Array::from(vec![
-1_000_000, -100_000, -10_000, -1_000, 0, 1_000, 10_000, 100_000, 1_000_000,
]))],
)?;

writer.write(&original)?;
writer.close()?;

let mut reader = ParquetRecordBatchReader::try_new(Bytes::from(buf), 1024)?;
let ret = reader.next().unwrap()?;
assert_eq!(ret, original);

// Ensure can be downcast to the correct type
ret.column(0).as_primitive::<Date32Type>();

Ok(())
}

#[test]
fn test_date64_roundtrip() -> Result<()> {
use arrow_array::Date64Array;

let schema = Arc::new(Schema::new(vec![
Field::new("small-date64", ArrowDataType::Date64, false),
Field::new("big-date64", ArrowDataType::Date64, false),
Field::new("invalid-date64", ArrowDataType::Date64, false),
]));

let mut default_buf = Vec::with_capacity(1024);
let mut coerce_buf = Vec::with_capacity(1024);

let coerce_props = WriterProperties::builder().set_coerce_types(true).build();

let mut default_writer = ArrowWriter::try_new(&mut default_buf, schema.clone(), None)?;
let mut coerce_writer =
ArrowWriter::try_new(&mut coerce_buf, schema.clone(), Some(coerce_props))?;

static NUM_MILLISECONDS_IN_DAY: i64 = 1000 * 60 * 60 * 24;

let original = RecordBatch::try_new(
schema,
vec![
// small-date64
Arc::new(Date64Array::from(vec![
-1_000_000 * NUM_MILLISECONDS_IN_DAY,
-1_000 * NUM_MILLISECONDS_IN_DAY,
0,
1_000 * NUM_MILLISECONDS_IN_DAY,
1_000_000 * NUM_MILLISECONDS_IN_DAY,
])),
// big-date64
Arc::new(Date64Array::from(vec![
-10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
-1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
0,
1_000_000_000 * NUM_MILLISECONDS_IN_DAY,
10_000_000_000 * NUM_MILLISECONDS_IN_DAY,
])),
// invalid-date64
Arc::new(Date64Array::from(vec![
-1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
-1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1,
1_000 * NUM_MILLISECONDS_IN_DAY + 1,
1_000_000 * NUM_MILLISECONDS_IN_DAY + 1,
])),
],
)?;

default_writer.write(&original)?;
coerce_writer.write(&original)?;

default_writer.close()?;
coerce_writer.close()?;

let mut default_reader = ParquetRecordBatchReader::try_new(Bytes::from(default_buf), 1024)?;
let mut coerce_reader = ParquetRecordBatchReader::try_new(Bytes::from(coerce_buf), 1024)?;

let default_ret = default_reader.next().unwrap()?;
let coerce_ret = coerce_reader.next().unwrap()?;

// Roundtrip should be successful when default writer used
assert_eq!(default_ret, original);

// Only small-date64 should roundtrip successfully when coerce_types writer is used
assert_eq!(coerce_ret.column(0), original.column(0));
assert_ne!(coerce_ret.column(1), original.column(1));
assert_ne!(coerce_ret.column(2), original.column(2));

// Ensure both can be downcast to the correct type
default_ret.column(0).as_primitive::<Date64Type>();
coerce_ret.column(0).as_primitive::<Date64Type>();

Ok(())
}
struct RandFixedLenGen {}

impl RandGen<FixedLenByteArrayType> for RandFixedLenGen {
Expand Down
17 changes: 2 additions & 15 deletions parquet/src/arrow/arrow_reader/statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,8 +371,7 @@ macro_rules! get_statistics {
[<$stat_type_prefix Int32StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter(
[<$stat_type_prefix Int32StatsIterator>]::new($iterator)
.map(|x| x.map(|x| i64::from(*x) * 24 * 60 * 60 * 1000)),
[<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied()),
))),
DataType::Timestamp(unit, timezone) =>{
let iter = [<$stat_type_prefix Int64StatsIterator>]::new($iterator).map(|x| x.copied());
Expand Down Expand Up @@ -945,19 +944,7 @@ macro_rules! get_data_page_statistics {
})
},
DataType::Date32 => Ok(Arc::new(Date32Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Date64 => Ok(
Arc::new(
Date64Array::from_iter([<$stat_type_prefix Int32DataPageStatsIterator>]::new($iterator)
.map(|x| {
x.into_iter()
.map(|x| {
x.and_then(|x| i64::try_from(x).ok())
})
.map(|x| x.map(|x| x * 24 * 60 * 60 * 1000))
}).flatten()
)
)
),
DataType::Date64 => Ok(Arc::new(Date64Array::from_iter([<$stat_type_prefix Int64DataPageStatsIterator>]::new($iterator).flatten()))),
DataType::Decimal128(precision, scale) => Ok(Arc::new(
Decimal128Array::from_iter([<$stat_type_prefix Decimal128DataPageStatsIterator>]::new($iterator).flatten()).with_precision_and_scale(*precision, *scale)?)),
DataType::Decimal256(precision, scale) => Ok(Arc::new(
Expand Down
14 changes: 10 additions & 4 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ impl<W: Write + Send> ArrowWriter<W> {
arrow_schema: SchemaRef,
options: ArrowWriterOptions,
) -> Result<Self> {
let mut props = options.properties;
let schema = match options.schema_root {
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s)?,
None => arrow_to_parquet_schema(&arrow_schema)?,
Some(s) => arrow_to_parquet_schema_with_root(&arrow_schema, &s, props.coerce_types())?,
None => arrow_to_parquet_schema(&arrow_schema, props.coerce_types())?,
};
let mut props = options.properties;
if !options.skip_arrow_metadata {
// add serialized arrow schema
add_encoded_arrow_schema_to_metadata(&arrow_schema, &mut props);
Expand Down Expand Up @@ -549,8 +549,8 @@ impl ArrowColumnChunk {
/// ]));
///
/// // Compute the parquet schema
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref()).unwrap();
/// let props = Arc::new(WriterProperties::default());
/// let parquet_schema = arrow_to_parquet_schema(schema.as_ref(), props.coerce_types()).unwrap();
///
/// // Create writers for each of the leaf columns
/// let col_writers = get_column_writers(&parquet_schema, &props, &schema).unwrap();
Expand Down Expand Up @@ -858,6 +858,12 @@ fn write_leaf(writer: &mut ColumnWriter<'_>, levels: &ArrayLevels) -> Result<usi
}
ColumnWriter::Int64ColumnWriter(ref mut typed) => {
match column.data_type() {
ArrowDataType::Date64 => {
let array = arrow_cast::cast(column, &ArrowDataType::Int64)?;

let array = array.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
}
ArrowDataType::Int64 => {
let array = column.as_primitive::<Int64Type>();
write_primitive(typed, array.values(), levels)
Expand Down
Loading

0 comments on commit 5566ca6

Please sign in to comment.