From bbd7c8b6c3a192e503ae860e77dc18090161a128 Mon Sep 17 00:00:00 2001 From: nglime Date: Mon, 18 Nov 2024 14:11:41 -0600 Subject: [PATCH 01/20] Added set up for the example of flattening from pyarrow. --- arrow-array/src/record_batch.rs | 35 +++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 372ca63f30a1..a9e410a872c0 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1206,6 +1206,41 @@ mod tests { assert_ne!(batch1, batch2); } + #[test] + fn flattening() { + let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); + let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); + let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)])); + + let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); + let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); + let year_field = Arc::new(Field::new("year", DataType::Int64, true)); + + let a = Arc::new(StructArray::from(vec![ + (animals_field.clone(), Arc::new(animals) as ArrayRef), + (n_legs_field.clone(), Arc::new(n_legs) as ArrayRef), + (year_field.clone(), Arc::new(year) as ArrayRef), + ])); + let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); + + let schema = Schema::new(vec![ + Field::new("a", DataType::Struct(Fields::from(vec![ + animals_field, + n_legs_field, + year_field, + ])), false), + Field::new("month", DataType::Int64, true) + ]); + + let record_batch = + RecordBatch::try_new(Arc::new(schema), vec![ + a, + month, + ]).expect("valid conversion"); + + println!("{:?}", record_batch); + } + #[test] fn project() { let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); From 8abcd2542caa32fb2ca57142f1cfe4febea6cce5 Mon Sep 17 00:00:00 2001 From: nglime Date: Tue, 19 Nov 2024 21:10:04 -0600 Subject: [PATCH 02/20] Logic for recursive normalizer with a base normalize function, based on pola-rs. --- arrow-array/src/record_batch.rs | 32 ++++++------ arrow-schema/src/schema.rs | 87 ++++++++++++++++++++++++++++++++- 2 files changed, 104 insertions(+), 15 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index a9e410a872c0..b644a0d62312 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -19,8 +19,8 @@ //! [schema](arrow_schema::Schema). use crate::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{ArrowError, DataType, Field, Schema, SchemaBuilder, SchemaRef}; -use std::ops::Index; +use arrow_schema::{ArrowError, DataType, Field, Fields, Schema, SchemaBuilder, SchemaRef}; +use std::ops::{Deref, Index}; use std::sync::Arc; /// Trait for types that can read `RecordBatch`'s. @@ -1207,7 +1207,7 @@ mod tests { } #[test] - fn flattening() { + fn normalize() { let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)])); @@ -1224,21 +1224,23 @@ mod tests { let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); let schema = Schema::new(vec![ - Field::new("a", DataType::Struct(Fields::from(vec![ - animals_field, - n_legs_field, - year_field, - ])), false), - Field::new("month", DataType::Int64, true) + Field::new( + "a", + DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])), + false, + ), + Field::new("month", DataType::Int64, true), ]); + let normalized = schema.clone().normalize(".", 0).unwrap(); + println!("{:?}", normalized); let record_batch = - RecordBatch::try_new(Arc::new(schema), vec![ - a, - month, - ]).expect("valid conversion"); + RecordBatch::try_new(Arc::new(schema), vec![a, month]).expect("valid conversion"); + + println!("Fields: {:?}", record_batch.schema().fields()); + println!("Metadata{:?}", record_batch.columns()); - println!("{:?}", record_batch); + //println!("{:?}", record_batch); } #[test] @@ -1353,7 +1355,9 @@ mod tests { let metadata = vec![("foo".to_string(), "bar".to_string())] .into_iter() .collect(); + println!("Metadata: {:?}", metadata); let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata); + println!("Metadata schema: {:?}", metadata_schema); let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap(); // Cannot remove metadata diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index cc3a8a308a83..9924f2b5be1e 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -22,7 +22,7 @@ use std::sync::Arc; use crate::error::ArrowError; use crate::field::Field; -use crate::{FieldRef, Fields}; +use crate::{DataType, FieldRef, Fields}; /// A builder to facilitate building a [`Schema`] from iteratively from [`FieldRef`] #[derive(Debug, Default)] @@ -413,6 +413,91 @@ impl Schema { &self.metadata } + pub fn normalize(self, separator: &str, mut max_level: usize) -> Result { + if max_level == 0 { + max_level = usize::MAX; + } + let mut new_fields: Vec = vec![]; + for field in self.fields() { + match field.data_type() { + //DataType::List(f) => field, + //DataType::ListView(_) => field, + //DataType::FixedSizeList(_, _) => field, + //DataType::LargeList(_) => field, + //DataType::LargeListView(_) => field, + DataType::Struct(nested_fields) => { + let field_name = field.name().as_str(); + new_fields = [ + new_fields, + Self::normalizer( + nested_fields.to_vec(), + field_name, + separator, + max_level - 1, + ), + ] + .concat(); + } + //DataType::Union(_, _) => field, + //DataType::Dictionary(_, _) => field, + //DataType::Map(_, _) => field, + //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field + _ => new_fields.push(Field::new( + field.name(), + field.data_type().clone(), + field.is_nullable(), + )), + }; + } + Ok(Self::new_with_metadata(new_fields, self.metadata.clone())) + } + + fn normalizer( + fields: Vec, + key_string: &str, + separator: &str, + max_level: usize, + ) -> Vec { + if max_level > 0 { + let mut new_fields: Vec = vec![]; + for field in fields { + match field.data_type() { + //DataType::List(f) => field, + //DataType::ListView(_) => field, + //DataType::FixedSizeList(_, _) => field, + //DataType::LargeList(_) => field, + //DataType::LargeListView(_) => field, + DataType::Struct(nested_fields) => { + let field_name = field.name().as_str(); + let new_key = format!("{key_string}{separator}{field_name}"); + new_fields = [ + new_fields, + Self::normalizer( + nested_fields.to_vec(), + new_key.as_str(), + separator, + max_level - 1, + ), + ] + .concat(); + } + //DataType::Union(_, _) => field, + //DataType::Dictionary(_, _) => field, + //DataType::Map(_, _) => field, + //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field + _ => new_fields.push(Field::new( + format!("{key_string}{separator}{}", field.name()), + field.data_type().clone(), + field.is_nullable(), + )), + }; + } + new_fields + } else { + todo!() + } + } + /// Look up a column by name and return a immutable reference to the column along with /// its index. pub fn column_with_name(&self, name: &str) -> Option<(usize, &Field)> { From 6bba7d39925f0f2f1e25f96dc3772e86005a8870 Mon Sep 17 00:00:00 2001 From: nglime Date: Fri, 22 Nov 2024 22:37:01 -0600 Subject: [PATCH 03/20] Added recursive normalize function for `Schema`, and started building iterative function for `RecordBatch`. Not sure which one is better currently. --- arrow-array/src/record_batch.rs | 58 +++++++++++++++++++++++++++++++-- arrow-schema/src/schema.rs | 14 ++++---- 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index b644a0d62312..2be5ca10903c 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -18,8 +18,9 @@ //! A two-dimensional batch of column-oriented data with a defined //! [schema](arrow_schema::Schema). +use std::collections::{BinaryHeap, VecDeque}; use crate::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{ArrowError, DataType, Field, Fields, Schema, SchemaBuilder, SchemaRef}; +use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef}; use std::ops::{Deref, Index}; use std::sync::Arc; @@ -403,7 +404,59 @@ impl RecordBatch { ) } - /// Returns the number of columns in the record batch. + /// Normalize a semi-structured RecordBatch into a flat table + /// If max_level is 0, normalizes all levels. + pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { + if max_level == 0 { + max_level = usize::MAX; + } + if self.num_rows() == 0 { + // No data, only need to normalize the schema + return Ok(Self::new_empty(Arc::new(self.schema.normalize(separator, max_level)?))); + } + let mut queue: VecDeque<(usize, &Arc, &FieldRef)> = VecDeque::new(); + + // push fields + for (c, f) in self.columns.iter().zip(self.schema().fields()) { + queue.push_front((0, c, f)); + } + + while !queue.is_empty() { + match queue.pop_front() { + Some((depth, c, f)) => { + match f.data_type() { + //DataType::List(f) => field, + //DataType::ListView(_) => field, + //DataType::FixedSizeList(_, _) => field, + //DataType::LargeList(_) => field, + //DataType::LargeListView(_) => field, + DataType::Struct(nested_fields) => { + let field_name = f.name().as_str(); + /*new_fields = [ + new_fields, + Self::normalizer( + nested_fields.to_vec(), + field_name, + separator, + max_level - 1, + ), + ] + .concat();*/ + } + //DataType::Union(_, _) => field, + //DataType::Dictionary(_, _) => field, + //DataType::Map(_, _) => field, + //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field + _ => queue.push_front((0, c, f)), + } + }, + None => break, + }; + } + todo!() + } + + /// Returns the number of columns in the record batch. /// /// # Example /// @@ -1216,6 +1269,7 @@ mod tests { let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); let year_field = Arc::new(Field::new("year", DataType::Int64, true)); + let a = Arc::new(StructArray::from(vec![ (animals_field.clone(), Arc::new(animals) as ArrayRef), (n_legs_field.clone(), Arc::new(n_legs) as ArrayRef), diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 9924f2b5be1e..62f74c8b435b 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -413,7 +413,9 @@ impl Schema { &self.metadata } - pub fn normalize(self, separator: &str, mut max_level: usize) -> Result { + /// Returns a new schema, normalized based on the max_level + /// This carries metadata from the parent schema over as well + pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { if max_level == 0 { max_level = usize::MAX; } @@ -462,11 +464,11 @@ impl Schema { let mut new_fields: Vec = vec![]; for field in fields { match field.data_type() { - //DataType::List(f) => field, - //DataType::ListView(_) => field, - //DataType::FixedSizeList(_, _) => field, - //DataType::LargeList(_) => field, - //DataType::LargeListView(_) => field, + //DataType::List(f) => , + //DataType::ListView(_) => , + //DataType::FixedSizeList(_, _) => , + //DataType::LargeList(_) => , + //DataType::LargeListView(_) => , DataType::Struct(nested_fields) => { let field_name = field.name().as_str(); let new_key = format!("{key_string}{separator}{field_name}"); From 55eb9533f7fd0da4b6b36e9cc7d059191e90f250 Mon Sep 17 00:00:00 2001 From: nglime Date: Sat, 23 Nov 2024 11:16:35 -0600 Subject: [PATCH 04/20] Built out a bit more of the iterative normalize. --- arrow-array/src/record_batch.rs | 72 +++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 2be5ca10903c..92cdb700b8d9 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -18,9 +18,11 @@ //! A two-dimensional batch of column-oriented data with a defined //! [schema](arrow_schema::Schema). -use std::collections::{BinaryHeap, VecDeque}; use crate::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{ArrowError, DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef}; +use arrow_schema::{ + ArrowError, DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef, +}; +use std::collections::VecDeque; use std::ops::{Deref, Index}; use std::sync::Arc; @@ -412,51 +414,61 @@ impl RecordBatch { } if self.num_rows() == 0 { // No data, only need to normalize the schema - return Ok(Self::new_empty(Arc::new(self.schema.normalize(separator, max_level)?))); + return Ok(Self::new_empty(Arc::new( + self.schema.normalize(separator, max_level)?, + ))); } let mut queue: VecDeque<(usize, &Arc, &FieldRef)> = VecDeque::new(); // push fields - for (c, f) in self.columns.iter().zip(self.schema().fields()) { + for (c, f) in self.columns.iter().zip(self.schema.fields()) { queue.push_front((0, c, f)); } while !queue.is_empty() { match queue.pop_front() { Some((depth, c, f)) => { - match f.data_type() { - //DataType::List(f) => field, - //DataType::ListView(_) => field, - //DataType::FixedSizeList(_, _) => field, - //DataType::LargeList(_) => field, - //DataType::LargeListView(_) => field, - DataType::Struct(nested_fields) => { - let field_name = f.name().as_str(); - /*new_fields = [ - new_fields, - Self::normalizer( - nested_fields.to_vec(), - field_name, - separator, - max_level - 1, - ), - ] - .concat();*/ + + if depth < max_level { + match (c.data_type(), f.data_type()) { + //DataType::List(f) => field, + //DataType::ListView(_) => field, + //DataType::FixedSizeList(_, _) => field, + //DataType::LargeList(_) => field, + //DataType::LargeListView(_) => field, + (DataType::Struct(cf), DataType::Struct(ff)) => { + let field_name = f.name().as_str(); + let new_key = format!("{key_string}{separator}{field_name}"); + ff.iter().rev().zip(cf.iter().rev()).map(|(field, ())| { + let updated_field = Field::new( + format!("{key_string}{separator}{}", field.name()), + field.data_type().clone(), + field.is_nullable(), + ); + queue.push_front(( + depth + 1, + c, // TODO: need to modify c -- if it's a StructArray, it needs to have the fields modified. + &Arc::new(updated_field), + )) + }); + } + //DataType::Union(_, _) => field, + //DataType::Dictionary(_, _) => field, + //DataType::Map(_, _) => field, + //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field + _ => queue.push_front((depth, c, f)), } - //DataType::Union(_, _) => field, - //DataType::Dictionary(_, _) => field, - //DataType::Map(_, _) => field, - //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field - _ => queue.push_front((0, c, f)), + } else { + queue.push_front((depth, c, f)); } - }, + } None => break, }; } todo!() } - /// Returns the number of columns in the record batch. + /// Returns the number of columns in the record batch. /// /// # Example /// @@ -1269,12 +1281,12 @@ mod tests { let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); let year_field = Arc::new(Field::new("year", DataType::Int64, true)); - let a = Arc::new(StructArray::from(vec![ (animals_field.clone(), Arc::new(animals) as ArrayRef), (n_legs_field.clone(), Arc::new(n_legs) as ArrayRef), (year_field.clone(), Arc::new(year) as ArrayRef), ])); + let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); let schema = Schema::new(vec![ From 30d6294d1346dbbdf6ac9bfc7df847a8e05ccaec Mon Sep 17 00:00:00 2001 From: nglime Date: Sat, 23 Nov 2024 21:03:20 -0600 Subject: [PATCH 05/20] Fixed normalize function for `RecordBatch`. Adjusted test case to match the example from PyArrow. --- arrow-array/src/record_batch.rs | 108 ++++++++++++++++---------------- arrow-schema/src/schema.rs | 42 +++++-------- 2 files changed, 68 insertions(+), 82 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 92cdb700b8d9..1ba8402b1bd8 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -18,12 +18,11 @@ //! A two-dimensional batch of column-oriented data with a defined //! [schema](arrow_schema::Schema). +use crate::cast::AsArray; use crate::{new_empty_array, Array, ArrayRef, StructArray}; -use arrow_schema::{ - ArrowError, DataType, Field, FieldRef, Fields, Schema, SchemaBuilder, SchemaRef, -}; +use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaBuilder, SchemaRef}; use std::collections::VecDeque; -use std::ops::{Deref, Index}; +use std::ops::Index; use std::sync::Arc; /// Trait for types that can read `RecordBatch`'s. @@ -406,7 +405,8 @@ impl RecordBatch { ) } - /// Normalize a semi-structured RecordBatch into a flat table + /// Normalize a semi-structured [`RecordBatch`] into a flat table. + /// /// If max_level is 0, normalizes all levels. pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { if max_level == 0 { @@ -418,54 +418,47 @@ impl RecordBatch { self.schema.normalize(separator, max_level)?, ))); } - let mut queue: VecDeque<(usize, &Arc, &FieldRef)> = VecDeque::new(); + let mut queue: VecDeque<(usize, (ArrayRef, FieldRef))> = VecDeque::new(); - // push fields for (c, f) in self.columns.iter().zip(self.schema.fields()) { - queue.push_front((0, c, f)); + queue.push_back((0, ((*c).clone(), (*f).clone()))); } - while !queue.is_empty() { - match queue.pop_front() { - Some((depth, c, f)) => { - - if depth < max_level { - match (c.data_type(), f.data_type()) { - //DataType::List(f) => field, - //DataType::ListView(_) => field, - //DataType::FixedSizeList(_, _) => field, - //DataType::LargeList(_) => field, - //DataType::LargeListView(_) => field, - (DataType::Struct(cf), DataType::Struct(ff)) => { - let field_name = f.name().as_str(); - let new_key = format!("{key_string}{separator}{field_name}"); - ff.iter().rev().zip(cf.iter().rev()).map(|(field, ())| { - let updated_field = Field::new( - format!("{key_string}{separator}{}", field.name()), - field.data_type().clone(), - field.is_nullable(), - ); - queue.push_front(( - depth + 1, - c, // TODO: need to modify c -- if it's a StructArray, it needs to have the fields modified. - &Arc::new(updated_field), - )) - }); - } - //DataType::Union(_, _) => field, - //DataType::Dictionary(_, _) => field, - //DataType::Map(_, _) => field, - //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field - _ => queue.push_front((depth, c, f)), + let mut columns: Vec = Vec::new(); + let mut fields: Vec = Vec::new(); + + while let Some((depth, (c, f))) = queue.pop_front() { + if depth < max_level { + match f.data_type() { + DataType::Struct(ff) => { + // Need to zip these in reverse to maintain original order + for (cff, fff) in c + .as_struct() + .columns() + .iter() + .rev() + .zip(ff.into_iter().rev()) + { + let new_key = format!("{}{separator}{}", f.name(), fff.name()); + let updated_field = Field::new( + new_key.as_str(), + fff.data_type().clone(), + fff.is_nullable(), + ); + queue.push_front((depth + 1, (cff.clone(), Arc::new(updated_field)))) } - } else { - queue.push_front((depth, c, f)); + } + _ => { + columns.push(c); + fields.push(f); } } - None => break, - }; + } else { + columns.push(c); + fields.push(f); + } } - todo!() + RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) } /// Returns the number of columns in the record batch. @@ -1282,9 +1275,9 @@ mod tests { let year_field = Arc::new(Field::new("year", DataType::Int64, true)); let a = Arc::new(StructArray::from(vec![ - (animals_field.clone(), Arc::new(animals) as ArrayRef), - (n_legs_field.clone(), Arc::new(n_legs) as ArrayRef), - (year_field.clone(), Arc::new(year) as ArrayRef), + (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef), + (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef), + (year_field.clone(), Arc::new(year.clone()) as ArrayRef), ])); let month = Arc::new(Int64Array::from(vec![Some(4), Some(6)])); @@ -1297,16 +1290,21 @@ mod tests { ), Field::new("month", DataType::Int64, true), ]); - let normalized = schema.clone().normalize(".", 0).unwrap(); - println!("{:?}", normalized); - let record_batch = - RecordBatch::try_new(Arc::new(schema), vec![a, month]).expect("valid conversion"); + let record_batch = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) + .expect("valid conversion"); + + let normalized = record_batch.normalize(".", 0).expect("valid normalization"); - println!("Fields: {:?}", record_batch.schema().fields()); - println!("Metadata{:?}", record_batch.columns()); + let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ("a.animals", animals.clone(), true), + ("a.n_legs", n_legs.clone(), true), + ("a.year", year.clone(), true), + ("month", month.clone(), true), + ]) + .expect("valid conversion"); - //println!("{:?}", record_batch); + assert_eq!(expected, normalized); } #[test] diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 62f74c8b435b..e8aff42db104 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -419,14 +419,9 @@ impl Schema { if max_level == 0 { max_level = usize::MAX; } - let mut new_fields: Vec = vec![]; + let mut new_fields: Vec = vec![]; for field in self.fields() { match field.data_type() { - //DataType::List(f) => field, - //DataType::ListView(_) => field, - //DataType::FixedSizeList(_, _) => field, - //DataType::LargeList(_) => field, - //DataType::LargeListView(_) => field, DataType::Struct(nested_fields) => { let field_name = field.name().as_str(); new_fields = [ @@ -440,15 +435,11 @@ impl Schema { ] .concat(); } - //DataType::Union(_, _) => field, - //DataType::Dictionary(_, _) => field, - //DataType::Map(_, _) => field, - //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field - _ => new_fields.push(Field::new( + _ => new_fields.push(Arc::new(Field::new( field.name(), field.data_type().clone(), field.is_nullable(), - )), + ))), }; } Ok(Self::new_with_metadata(new_fields, self.metadata.clone())) @@ -459,16 +450,11 @@ impl Schema { key_string: &str, separator: &str, max_level: usize, - ) -> Vec { + ) -> Vec { + let mut new_fields: Vec = vec![]; if max_level > 0 { - let mut new_fields: Vec = vec![]; for field in fields { match field.data_type() { - //DataType::List(f) => , - //DataType::ListView(_) => , - //DataType::FixedSizeList(_, _) => , - //DataType::LargeList(_) => , - //DataType::LargeListView(_) => , DataType::Struct(nested_fields) => { let field_name = field.name().as_str(); let new_key = format!("{key_string}{separator}{field_name}"); @@ -483,21 +469,23 @@ impl Schema { ] .concat(); } - //DataType::Union(_, _) => field, - //DataType::Dictionary(_, _) => field, - //DataType::Map(_, _) => field, - //DataType::RunEndEncoded(_, _) => field, // not sure how to support this field - _ => new_fields.push(Field::new( + _ => new_fields.push(Arc::new(Field::new( format!("{key_string}{separator}{}", field.name()), field.data_type().clone(), field.is_nullable(), - )), + ))), }; } - new_fields } else { - todo!() + for field in fields { + new_fields.push(Arc::new(Field::new( + format!("{key_string}{separator}{}", field.name()), + field.data_type().clone(), + field.is_nullable(), + ))); + } } + new_fields } /// Look up a column by name and return a immutable reference to the column along with From 0ed979d0244d511282b3945e38704650be84cd63 Mon Sep 17 00:00:00 2001 From: nglime Date: Sun, 24 Nov 2024 21:51:18 -0600 Subject: [PATCH 06/20] Added tests for `Schema` normalization. Partial tests for `RecordBatch`. --- arrow-array/src/record_batch.rs | 114 ++++++++++++++++++++++++++++---- arrow-schema/src/schema.rs | 81 +++++++++++++++++++++++ 2 files changed, 183 insertions(+), 12 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 1ba8402b1bd8..b56e2138fa6a 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -432,14 +432,8 @@ impl RecordBatch { match f.data_type() { DataType::Struct(ff) => { // Need to zip these in reverse to maintain original order - for (cff, fff) in c - .as_struct() - .columns() - .iter() - .rev() - .zip(ff.into_iter().rev()) - { - let new_key = format!("{}{separator}{}", f.name(), fff.name()); + for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() { + let new_key = format!("{}{}{}", f.name(), separator, fff.name()); let updated_field = Field::new( new_key.as_str(), fff.data_type().clone(), @@ -1291,10 +1285,10 @@ mod tests { Field::new("month", DataType::Int64, true), ]); - let record_batch = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) - .expect("valid conversion"); - - let normalized = record_batch.normalize(".", 0).expect("valid normalization"); + let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) + .expect("valid conversion") + .normalize(".", 0) + .expect("valid normalization"); let expected = RecordBatch::try_from_iter_with_nullable(vec![ ("a.animals", animals.clone(), true), @@ -1307,6 +1301,102 @@ mod tests { assert_eq!(expected, normalized); } + #[test] + fn normalize_nested() { + // Initialize schema + let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let b = Arc::new(Field::new("b", DataType::Int64, false)); + let c = Arc::new(Field::new("c", DataType::Int64, true)); + + let d = Arc::new(Field::new("d", DataType::Utf8, true)); + let e = Arc::new(Field::new("e", DataType::Int64, false)); + let f = Arc::new(Field::new("f", DataType::Int64, true)); + + let one = Arc::new(Field::new( + "1", + DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), + false, + )); + let two = Arc::new(Field::new( + "2", + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + )); + + let exclamation = Arc::new(Field::new( + "!", + DataType::Struct(Fields::from(vec![one, two])), + false, + )); + + // Initialize fields + let a_field: ArrayRef = Arc::new(StringArray::from(vec!["a1_field_data", "a1_field_data"])); + let b_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(0), Some(1)])); + let c_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2)])); + + let d_field: ArrayRef = Arc::new(StringArray::from(vec!["d1_field_data", "d2_field_data"])); + let e_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(3), Some(4)])); + let f_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(5)])); + + let one_field = Arc::new(StructArray::from(vec![ + (a.clone(), Arc::new(a_field.clone()) as ArrayRef), + (b.clone(), Arc::new(b_field.clone()) as ArrayRef), + (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ])); + let two_field = Arc::new(StructArray::from(vec![ + (a.clone(), Arc::new(a_field.clone()) as ArrayRef), + (b.clone(), Arc::new(b_field.clone()) as ArrayRef), + (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ])); + + /*let exclamation_field = Arc::new(StructArray::from(vec![ + (one.clone(), Arc::new(one_field.clone()) as ArrayRef), + (two.clone(), Arc::new(two_field.clone()) as ArrayRef), + ]));*/ + + let schema = Schema::new(vec![exclamation.clone()]); + /*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) + .expect("valid conversion");*/ + //.normalize(".", 0) + //.expect("valid normalization"); + + /*let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ("a.animals", animals.clone(), true), + ("a.n_legs", n_legs.clone(), true), + ("a.year", year.clone(), true), + ("month", month.clone(), true), + ]) + .expect("valid conversion");*/ + + //assert_eq!(expected, normalized); + } + + #[test] + fn normalize_empty() { + let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); + let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); + let year_field = Arc::new(Field::new("year", DataType::Int64, true)); + + let schema = Schema::new(vec![ + Field::new( + "a", + DataType::Struct(Fields::from(vec![animals_field, n_legs_field, year_field])), + false, + ), + Field::new("month", DataType::Int64, true), + ]); + + let normalized = RecordBatch::new_empty(Arc::new(schema.clone())) + .normalize(".", 0) + .expect("valid normalization"); + + let expected = RecordBatch::new_empty(Arc::new( + schema.normalize(".", 0).expect("valid normalization"), + )); + + assert_eq!(expected, normalized); + } + #[test] fn project() { let a: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)])); diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index e8aff42db104..1832f97cf17f 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -772,6 +772,87 @@ mod tests { schema.index_of("nickname").unwrap(); } + #[test] + fn normalize() { + let schema = Schema::new(vec![ + Field::new( + "a", + DataType::Struct(Fields::from(vec![ + Arc::new(Field::new("animals", DataType::Utf8, true)), + Arc::new(Field::new("n_legs", DataType::Int64, true)), + Arc::new(Field::new("year", DataType::Int64, true)), + ])), + false, + ), + Field::new("month", DataType::Int64, true), + ]) + .normalize(".", 0) + .expect("valid normalization"); + + let expected = Schema::new(vec![ + Field::new("a.animals", DataType::Utf8, true), + Field::new("a.n_legs", DataType::Int64, true), + Field::new("a.year", DataType::Int64, true), + Field::new("month", DataType::Int64, true), + ]); + + assert_eq!(schema, expected); + } + + #[test] + fn normalize_nested() { + let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let b = Arc::new(Field::new("b", DataType::Int64, false)); + let c = Arc::new(Field::new("c", DataType::Int64, true)); + + let d = Arc::new(Field::new("d", DataType::Utf8, true)); + let e = Arc::new(Field::new("e", DataType::Int64, false)); + let f = Arc::new(Field::new("f", DataType::Int64, true)); + + let one = Arc::new(Field::new( + "1", + DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), + false, + )); + let two = Arc::new(Field::new( + "2", + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + )); + + let exclamation = Arc::new(Field::new( + "!", + DataType::Struct(Fields::from(vec![one, two])), + false, + )); + + let normalize_all = Schema::new(vec![exclamation.clone()]) + .normalize(".", 0) + .expect("valid normalization"); + + let expected = Schema::new(vec![ + Field::new("!.1.a", DataType::Utf8, true), + Field::new("!.1.b", DataType::Int64, false), + Field::new("!.1.c", DataType::Int64, true), + Field::new("!.2.d", DataType::Utf8, true), + Field::new("!.2.e", DataType::Int64, false), + Field::new("!.2.f", DataType::Int64, true), + ]); + + assert_eq!(normalize_all, expected); + + let normalize_depth_one = Schema::new(vec![exclamation]) + .normalize(".", 1) + .expect("valid normalization"); + + let expected = Schema::new(vec![ + Field::new("!.1", DataType::Struct(Fields::from(vec![a, b, c])), false), + Field::new("!.2", DataType::Struct(Fields::from(vec![d, e, f])), true), + ]); + + assert_eq!(normalize_depth_one, expected); + } + #[test] #[should_panic( expected = "Unable to get field named \\\"nickname\\\". Valid fields: [\\\"first_name\\\", \\\"last_name\\\", \\\"address\\\", \\\"interests\\\"]" From d9d08cd63f4aae7d1208fd773fc8209f0493c00c Mon Sep 17 00:00:00 2001 From: nglime Date: Sun, 24 Nov 2024 21:54:24 -0600 Subject: [PATCH 07/20] Removed stray comments. --- arrow-array/src/record_batch.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index b56e2138fa6a..6b5b5af22a84 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1349,10 +1349,10 @@ mod tests { (c.clone(), Arc::new(c_field.clone()) as ArrayRef), ])); - /*let exclamation_field = Arc::new(StructArray::from(vec![ + let exclamation_field = Arc::new(StructArray::from(vec![ (one.clone(), Arc::new(one_field.clone()) as ArrayRef), (two.clone(), Arc::new(two_field.clone()) as ArrayRef), - ]));*/ + ])); let schema = Schema::new(vec![exclamation.clone()]); /*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) @@ -1509,9 +1509,7 @@ mod tests { let metadata = vec![("foo".to_string(), "bar".to_string())] .into_iter() .collect(); - println!("Metadata: {:?}", metadata); let metadata_schema = nullable_schema.as_ref().clone().with_metadata(metadata); - println!("Metadata schema: {:?}", metadata_schema); let batch = batch.with_schema(Arc::new(metadata_schema)).unwrap(); // Cannot remove metadata From d1b3260441271c848c2096c6843716bdbd8da5a9 Mon Sep 17 00:00:00 2001 From: nglime Date: Sun, 24 Nov 2024 21:58:53 -0600 Subject: [PATCH 08/20] Commenting out exclamation field. --- arrow-array/src/record_batch.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 6b5b5af22a84..beb4c0a8690a 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1349,10 +1349,10 @@ mod tests { (c.clone(), Arc::new(c_field.clone()) as ArrayRef), ])); - let exclamation_field = Arc::new(StructArray::from(vec![ + /*let exclamation_field = Arc::new(StructArray::from(vec![ (one.clone(), Arc::new(one_field.clone()) as ArrayRef), (two.clone(), Arc::new(two_field.clone()) as ArrayRef), - ])); + ]));*/ let schema = Schema::new(vec![exclamation.clone()]); /*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) From 7adda580776abf257378ebdd09b83d0961b9b7d4 Mon Sep 17 00:00:00 2001 From: nglime Date: Wed, 4 Dec 2024 22:06:29 -0600 Subject: [PATCH 09/20] Fixed test for `RecordBatch`. --- arrow-array/src/record_batch.rs | 92 +++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 33 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index b0557173ea56..88916aaf4487 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1295,14 +1295,10 @@ mod tests { #[test] fn normalize_nested() { // Initialize schema - let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let a = Arc::new(Field::new("a", DataType::Int64, true)); let b = Arc::new(Field::new("b", DataType::Int64, false)); let c = Arc::new(Field::new("c", DataType::Int64, true)); - let d = Arc::new(Field::new("d", DataType::Utf8, true)); - let e = Arc::new(Field::new("e", DataType::Int64, false)); - let f = Arc::new(Field::new("f", DataType::Int64, true)); - let one = Arc::new(Field::new( "1", DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), @@ -1310,56 +1306,86 @@ mod tests { )); let two = Arc::new(Field::new( "2", - DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), true, )); let exclamation = Arc::new(Field::new( "!", - DataType::Struct(Fields::from(vec![one, two])), + DataType::Struct(Fields::from(vec![one.clone(), two.clone()])), false, )); - // Initialize fields - let a_field: ArrayRef = Arc::new(StringArray::from(vec!["a1_field_data", "a1_field_data"])); - let b_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(0), Some(1)])); - let c_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2)])); + let schema = Schema::new(vec![exclamation.clone()]); - let d_field: ArrayRef = Arc::new(StringArray::from(vec!["d1_field_data", "d2_field_data"])); - let e_field: ArrayRef = Arc::new(Int64Array::from(vec![Some(3), Some(4)])); - let f_field: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(5)])); + // Initialize fields + let a_field = Int64Array::from(vec![Some(0), Some(1)]); + let b_field = Int64Array::from(vec![Some(2), Some(3)]); + let c_field = Int64Array::from(vec![None, Some(4)]); - let one_field = Arc::new(StructArray::from(vec![ + let one_field = StructArray::from(vec![ (a.clone(), Arc::new(a_field.clone()) as ArrayRef), (b.clone(), Arc::new(b_field.clone()) as ArrayRef), (c.clone(), Arc::new(c_field.clone()) as ArrayRef), - ])); - let two_field = Arc::new(StructArray::from(vec![ + ]); + let two_field = StructArray::from(vec![ (a.clone(), Arc::new(a_field.clone()) as ArrayRef), (b.clone(), Arc::new(b_field.clone()) as ArrayRef), (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ]); + + let exclamation_field = Arc::new(StructArray::from(vec![ + (one.clone(), Arc::new(one_field) as ArrayRef), + (two.clone(), Arc::new(two_field) as ArrayRef), ])); - /*let exclamation_field = Arc::new(StructArray::from(vec![ - (one.clone(), Arc::new(one_field.clone()) as ArrayRef), - (two.clone(), Arc::new(two_field.clone()) as ArrayRef), - ]));*/ + // Normalize top level + let normalized = RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()]) + .expect("valid conversion") + .normalize(".", 1) + .expect("valid normalization"); - let schema = Schema::new(vec![exclamation.clone()]); - /*let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) - .expect("valid conversion");*/ - //.normalize(".", 0) - //.expect("valid normalization"); + let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ( + "!.1", + Arc::new(StructArray::from(vec![ + (a.clone(), Arc::new(a_field.clone()) as ArrayRef), + (b.clone(), Arc::new(b_field.clone()) as ArrayRef), + (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ])) as ArrayRef, + false, + ), + ( + "!.2", + Arc::new(StructArray::from(vec![ + (a.clone(), Arc::new(a_field.clone()) as ArrayRef), + (b.clone(), Arc::new(b_field.clone()) as ArrayRef), + (c.clone(), Arc::new(c_field.clone()) as ArrayRef), + ])) as ArrayRef, + true, + ), + ]) + .expect("valid conversion"); - /*let expected = RecordBatch::try_from_iter_with_nullable(vec![ - ("a.animals", animals.clone(), true), - ("a.n_legs", n_legs.clone(), true), - ("a.year", year.clone(), true), - ("month", month.clone(), true), + assert_eq!(expected, normalized); + + // Normalize all levels + let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) + .expect("valid conversion") + .normalize(".", 0) + .expect("valid normalization"); + + let expected = RecordBatch::try_from_iter_with_nullable(vec![ + ("!.1.a", Arc::new(a_field.clone()) as ArrayRef, true), + ("!.1.b", Arc::new(b_field.clone()) as ArrayRef, false), + ("!.1.c", Arc::new(c_field.clone()) as ArrayRef, true), + ("!.2.a", Arc::new(a_field.clone()) as ArrayRef, true), + ("!.2.b", Arc::new(b_field.clone()) as ArrayRef, false), + ("!.2.c", Arc::new(c_field.clone()) as ArrayRef, true), ]) - .expect("valid conversion");*/ + .expect("valid conversion"); - //assert_eq!(expected, normalized); + assert_eq!(expected, normalized); } #[test] From 9c9c69952d53bc236c5f8ee078b6b607a515f595 Mon Sep 17 00:00:00 2001 From: nglime Date: Wed, 4 Dec 2024 22:07:59 -0600 Subject: [PATCH 10/20] Formatting. --- arrow-array/src/record_batch.rs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 88916aaf4487..ec2b974fdf9e 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1340,10 +1340,11 @@ mod tests { ])); // Normalize top level - let normalized = RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()]) - .expect("valid conversion") - .normalize(".", 1) - .expect("valid normalization"); + let normalized = + RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()]) + .expect("valid conversion") + .normalize(".", 1) + .expect("valid normalization"); let expected = RecordBatch::try_from_iter_with_nullable(vec![ ( From 4422add041b6decabe7455a4986362f0c1cc01e6 Mon Sep 17 00:00:00 2001 From: nglime Date: Mon, 30 Dec 2024 22:38:05 -0600 Subject: [PATCH 11/20] Additional documentation for `normalize` functions. Switched `Schema` normalization to iterative approach. --- arrow-array/src/record_batch.rs | 84 +++++++++++++++------ arrow-schema/src/schema.rs | 129 ++++++++++++++++---------------- 2 files changed, 123 insertions(+), 90 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index ec2b974fdf9e..230bdc2f3ad3 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -397,50 +397,86 @@ impl RecordBatch { } /// Normalize a semi-structured [`RecordBatch`] into a flat table. + /// If `max_level` is 0, normalizes all levels. /// - /// If max_level is 0, normalizes all levels. + /// # Example + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, RecordBatch}; + /// # use arrow_schema::{DataType, Field, Fields, Schema}; + /// + /// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); + /// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); + /// + /// let animals_field = Arc::new(Field::new("animals", DataType::Utf8, true)); + /// let n_legs_field = Arc::new(Field::new("n_legs", DataType::Int64, true)); + /// + /// let a = Arc::new(StructArray::from(vec![ + /// (animals_field.clone(), Arc::new(animals.clone()) as ArrayRef), + /// (n_legs_field.clone(), Arc::new(n_legs.clone()) as ArrayRef), + /// ])); + /// + /// let schema = Schema::new(vec![ + /// Field::new( + /// "a", + /// DataType::Struct(Fields::from(vec![animals_field, n_legs_field])), + /// false, + /// ) + /// ]); + /// + /// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a]) + /// .expect("valid conversion") + /// .normalize(".", 0) + /// .expect("valid normalization"); + /// + /// let expected = RecordBatch::try_from_iter_with_nullable(vec![ + /// ("a.animals", animals.clone(), true), + /// ("a.n_legs", n_legs.clone(), true), + /// ]) + /// .expect("valid conversion"); + /// + /// assert_eq!(expected, normalized); + /// ``` pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { if max_level == 0 { max_level = usize::MAX; } - if self.num_rows() == 0 { - // No data, only need to normalize the schema - return Ok(Self::new_empty(Arc::new( - self.schema.normalize(separator, max_level)?, - ))); - } - let mut queue: VecDeque<(usize, (ArrayRef, FieldRef))> = VecDeque::new(); - + let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &DataType, bool)> = VecDeque::new(); for (c, f) in self.columns.iter().zip(self.schema.fields()) { - queue.push_back((0, ((*c).clone(), (*f).clone()))); + let name_vec: Vec<&str> = vec![f.name()]; + queue.push_back((0, c, name_vec, f.data_type(), f.is_nullable())); } - let mut columns: Vec = Vec::new(); let mut fields: Vec = Vec::new(); - while let Some((depth, (c, f))) = queue.pop_front() { + while let Some((depth, c, name, data_type, nullable)) = queue.pop_front() { if depth < max_level { - match f.data_type() { + match data_type { DataType::Struct(ff) => { // Need to zip these in reverse to maintain original order for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() { - let new_key = format!("{}{}{}", f.name(), separator, fff.name()); - let updated_field = Field::new( - new_key.as_str(), - fff.data_type().clone(), + let mut name = name.clone(); + name.push(separator); + name.push(fff.name().as_str()); + queue.push_front(( + depth + 1, + cff, + name.clone(), + fff.data_type(), fff.is_nullable(), - ); - queue.push_front((depth + 1, (cff.clone(), Arc::new(updated_field)))) + )) } } _ => { - columns.push(c); - fields.push(f); + let updated_field = Field::new(name.concat(), data_type.clone(), nullable); + columns.push(c.clone()); + fields.push(Arc::new(updated_field)); } } } else { - columns.push(c); - fields.push(f); + let updated_field = Field::new(name.concat(), data_type.clone(), nullable); + fields.push(Arc::new(updated_field)); } } RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) @@ -1250,7 +1286,7 @@ mod tests { } #[test] - fn normalize() { + fn normalize_simple() { let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); let year: ArrayRef = Arc::new(Int64Array::from(vec![None, Some(2022)])); diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index ca0532cecdc7..36ee496bf604 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::HashMap; +use std::collections::{HashMap, VecDeque}; use std::fmt; use std::hash::Hash; use std::sync::Arc; @@ -413,79 +413,76 @@ impl Schema { &self.metadata } - /// Returns a new schema, normalized based on the max_level - /// This carries metadata from the parent schema over as well + /// Returns a new schema, normalized based on the max_level field. + /// If `max_level` is 0, normalizes all levels. + /// + /// This carries metadata from the parent schema over. + /// + /// # Example + /// + /// ``` + /// # use std::sync::Arc; + /// # use arrow_schema::{DataType, Field, Fields, Schema}; + /// + /// let schema = Schema::new(vec![ + /// Field::new( + /// "a", + /// DataType::Struct(Fields::from(vec![ + /// Arc::new(Field::new("animals", DataType::Utf8, true)), + /// Arc::new(Field::new("n_legs", DataType::Int64, true)), + /// ])), + /// false, + /// ), + /// ]) + /// .normalize(".", 0) + /// .expect("valid normalization"); + /// + /// let expected = Schema::new(vec![ + /// Field::new("a.animals", DataType::Utf8, true), + /// Field::new("a.n_legs", DataType::Int64, true), + /// ]); + /// + /// assert_eq!(schema, expected); + /// ``` pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { if max_level == 0 { max_level = usize::MAX; } - let mut new_fields: Vec = vec![]; - for field in self.fields() { - match field.data_type() { - DataType::Struct(nested_fields) => { - let field_name = field.name().as_str(); - new_fields = [ - new_fields, - Self::normalizer( - nested_fields.to_vec(), - field_name, - separator, - max_level - 1, - ), - ] - .concat(); - } - _ => new_fields.push(Arc::new(Field::new( - field.name(), - field.data_type().clone(), - field.is_nullable(), - ))), - }; + let mut queue: VecDeque<(usize, Vec<&str>, &DataType, bool)> = VecDeque::new(); + for f in self.fields() { + let name_vec: Vec<&str> = vec![f.name()]; + queue.push_back((0, name_vec, f.data_type(), f.is_nullable())); } - Ok(Self::new_with_metadata(new_fields, self.metadata.clone())) - } - - fn normalizer( - fields: Vec, - key_string: &str, - separator: &str, - max_level: usize, - ) -> Vec { - let mut new_fields: Vec = vec![]; - if max_level > 0 { - for field in fields { - match field.data_type() { - DataType::Struct(nested_fields) => { - let field_name = field.name().as_str(); - let new_key = format!("{key_string}{separator}{field_name}"); - new_fields = [ - new_fields, - Self::normalizer( - nested_fields.to_vec(), - new_key.as_str(), - separator, - max_level - 1, - ), - ] - .concat(); + let mut fields: Vec = Vec::new(); + + while let Some((depth, name, data_type, nullable)) = queue.pop_front() { + if depth < max_level { + match data_type { + DataType::Struct(ff) => { + // Need to zip these in reverse to maintain original order + for fff in ff.into_iter().rev() { + let mut name = name.clone(); + name.push(separator); + name.push(fff.name().as_str()); + queue.push_front(( + depth + 1, + name.clone(), + fff.data_type(), + fff.is_nullable(), + )) + } } - _ => new_fields.push(Arc::new(Field::new( - format!("{key_string}{separator}{}", field.name()), - field.data_type().clone(), - field.is_nullable(), - ))), - }; - } - } else { - for field in fields { - new_fields.push(Arc::new(Field::new( - format!("{key_string}{separator}{}", field.name()), - field.data_type().clone(), - field.is_nullable(), - ))); + _ => { + let updated_field = Field::new(name.concat(), data_type.clone(), nullable); + fields.push(Arc::new(updated_field)); + } + } + } else { + let updated_field = Field::new(name.concat(), data_type.clone(), nullable); + fields.push(Arc::new(updated_field)); } } - new_fields + Ok(Schema::new(fields)) } /// Look up a column by name and return a immutable reference to the column along with From d0dc5a7feeedd94c4a09052b02be1337f615a63b Mon Sep 17 00:00:00 2001 From: nglime Date: Mon, 30 Dec 2024 22:44:36 -0600 Subject: [PATCH 12/20] Forgot to push to the columns in the else case. --- arrow-array/src/record_batch.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 230bdc2f3ad3..9c6e2004dfe3 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -476,6 +476,7 @@ impl RecordBatch { } } else { let updated_field = Field::new(name.concat(), data_type.clone(), nullable); + columns.push(c.clone()); fields.push(Arc::new(updated_field)); } } From 1e40c984c2ddce1a577e9112fd368a9fd5c86254 Mon Sep 17 00:00:00 2001 From: nglime Date: Tue, 31 Dec 2024 00:18:28 -0600 Subject: [PATCH 13/20] Adjusted the documentation to include the parameters. --- arrow-array/src/record_batch.rs | 13 ++++++++++++- arrow-schema/src/schema.rs | 21 ++++++++++++++------- 2 files changed, 26 insertions(+), 8 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 9c6e2004dfe3..facbbeb4b449 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -397,7 +397,18 @@ impl RecordBatch { } /// Normalize a semi-structured [`RecordBatch`] into a flat table. - /// If `max_level` is 0, normalizes all levels. + /// + /// `separator`: Nested [`Field`]s will generate names separated by `separator`, e.g. for + /// separator= "." and the schema: + /// + /// "foo": StructArray<"bar": Utf8> + /// + /// will generate: + /// + /// "foo.bar": Utf8 + /// + /// `max_level`: The maximum number of levels (depth of the `Schema` and `Columns`) to + /// normalize. If `0`, normalizes all levels. /// /// # Example /// diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 36ee496bf604..6ba42b0f2c38 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -414,16 +414,25 @@ impl Schema { } /// Returns a new schema, normalized based on the max_level field. - /// If `max_level` is 0, normalizes all levels. - /// + /// `separator`: Nested [`Field`]s will generate names separated by `separator`, e.g. for + /// separator= "." and the schema: + /// + /// "foo": StructArray<"bar": Utf8> + /// + /// will generate: + /// + /// "foo.bar": Utf8 + /// + /// `max_level`: The maximum number of levels (depth of the `Schema`) to normalize. If `0`, + /// normalizes all levels. + /// /// This carries metadata from the parent schema over. - /// + /// /// # Example - /// + /// /// ``` /// # use std::sync::Arc; /// # use arrow_schema::{DataType, Field, Fields, Schema}; - /// /// let schema = Schema::new(vec![ /// Field::new( /// "a", @@ -436,12 +445,10 @@ impl Schema { /// ]) /// .normalize(".", 0) /// .expect("valid normalization"); - /// /// let expected = Schema::new(vec![ /// Field::new("a.animals", DataType::Utf8, true), /// Field::new("a.n_legs", DataType::Int64, true), /// ]); - /// /// assert_eq!(schema, expected); /// ``` pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { From 3c424d127f684f732cfb79e4366e4d2fc7a5478a Mon Sep 17 00:00:00 2001 From: nglime Date: Tue, 31 Dec 2024 00:18:47 -0600 Subject: [PATCH 14/20] Formatting. --- arrow-array/src/record_batch.rs | 8 ++++---- arrow-schema/src/schema.rs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index facbbeb4b449..e2e03daff9b6 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -397,16 +397,16 @@ impl RecordBatch { } /// Normalize a semi-structured [`RecordBatch`] into a flat table. - /// + /// /// `separator`: Nested [`Field`]s will generate names separated by `separator`, e.g. for /// separator= "." and the schema: - /// + /// /// "foo": StructArray<"bar": Utf8> - /// + /// /// will generate: /// /// "foo.bar": Utf8 - /// + /// /// `max_level`: The maximum number of levels (depth of the `Schema` and `Columns`) to /// normalize. If `0`, normalizes all levels. /// diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 6ba42b0f2c38..fc2c93cc349e 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -423,7 +423,7 @@ impl Schema { /// /// "foo.bar": Utf8 /// - /// `max_level`: The maximum number of levels (depth of the `Schema`) to normalize. If `0`, + /// `max_level`: The maximum number of levels (depth of the `Schema`) to normalize. If `0`, /// normalizes all levels. /// /// This carries metadata from the parent schema over. From 6d6b0269b732b81498a8adefa9f3c29ecfe54858 Mon Sep 17 00:00:00 2001 From: nglime Date: Tue, 31 Dec 2024 00:24:25 -0600 Subject: [PATCH 15/20] Edited examples to not be ran as tests. --- arrow-array/src/record_batch.rs | 8 ++++---- arrow-schema/src/schema.rs | 7 ++++--- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index e2e03daff9b6..f43de029e8da 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -400,13 +400,13 @@ impl RecordBatch { /// /// `separator`: Nested [`Field`]s will generate names separated by `separator`, e.g. for /// separator= "." and the schema: - /// + /// ```text /// "foo": StructArray<"bar": Utf8> - /// + /// ``` /// will generate: - /// + /// ```text /// "foo.bar": Utf8 - /// + /// ``` /// `max_level`: The maximum number of levels (depth of the `Schema` and `Columns`) to /// normalize. If `0`, normalizes all levels. /// diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index fc2c93cc349e..0252c9ac2464 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -416,12 +416,13 @@ impl Schema { /// Returns a new schema, normalized based on the max_level field. /// `separator`: Nested [`Field`]s will generate names separated by `separator`, e.g. for /// separator= "." and the schema: - /// + /// ```text /// "foo": StructArray<"bar": Utf8> - /// + /// ``` /// will generate: - /// + /// ```text /// "foo.bar": Utf8 + /// ``` /// /// `max_level`: The maximum number of levels (depth of the `Schema`) to normalize. If `0`, /// normalizes all levels. From 71380b67bc795d2e0e4be76e314e41760ed747df Mon Sep 17 00:00:00 2001 From: nglime Date: Sun, 5 Jan 2025 00:10:01 -0600 Subject: [PATCH 16/20] Adjusted based on some of the suggestions. Simplified the matching and if statements, simplified the VecDeque fields. --- arrow-array/src/record_batch.rs | 51 ++++++++++++++------------------- arrow-schema/src/schema.rs | 44 ++++++++++++---------------- 2 files changed, 40 insertions(+), 55 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index f43de029e8da..bb0c1193eb4a 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -453,42 +453,34 @@ impl RecordBatch { if max_level == 0 { max_level = usize::MAX; } - let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &DataType, bool)> = VecDeque::new(); + let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &FieldRef)> = VecDeque::new(); for (c, f) in self.columns.iter().zip(self.schema.fields()) { let name_vec: Vec<&str> = vec![f.name()]; - queue.push_back((0, c, name_vec, f.data_type(), f.is_nullable())); + queue.push_back((0, c, name_vec, f)); } let mut columns: Vec = Vec::new(); let mut fields: Vec = Vec::new(); - while let Some((depth, c, name, data_type, nullable)) = queue.pop_front() { - if depth < max_level { - match data_type { - DataType::Struct(ff) => { - // Need to zip these in reverse to maintain original order - for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() { - let mut name = name.clone(); - name.push(separator); - name.push(fff.name().as_str()); - queue.push_front(( - depth + 1, - cff, - name.clone(), - fff.data_type(), - fff.is_nullable(), - )) - } - } - _ => { - let updated_field = Field::new(name.concat(), data_type.clone(), nullable); - columns.push(c.clone()); - fields.push(Arc::new(updated_field)); + while let Some((depth, c, name, field_ref)) = queue.pop_front() { + match field_ref.data_type() { + DataType::Struct(ff) if depth < max_level => { + // Need to zip these in reverse to maintain original order + for (cff, fff) in c.as_struct().columns().iter().zip(ff.into_iter()).rev() { + let mut name = name.clone(); + name.push(separator); + name.push(fff.name()); + queue.push_front((depth + 1, cff, name, fff)) } } - } else { - let updated_field = Field::new(name.concat(), data_type.clone(), nullable); - columns.push(c.clone()); - fields.push(Arc::new(updated_field)); + _ => { + let updated_field = Field::new( + name.concat(), + field_ref.data_type().clone(), + field_ref.is_nullable(), + ); + columns.push(c.clone()); + fields.push(Arc::new(updated_field)); + } } } RecordBatch::try_new(Arc::new(Schema::new(fields)), columns) @@ -868,8 +860,6 @@ where #[cfg(test)] mod tests { - use std::collections::HashMap; - use super::*; use crate::{ BooleanArray, Int32Array, Int64Array, Int8Array, ListArray, StringArray, StringViewArray, @@ -877,6 +867,7 @@ mod tests { use arrow_buffer::{Buffer, ToByteSlice}; use arrow_data::{ArrayData, ArrayDataBuilder}; use arrow_schema::Fields; + use std::collections::HashMap; #[test] fn create_record_batch() { diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 0252c9ac2464..7a829878d700 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -456,38 +456,32 @@ impl Schema { if max_level == 0 { max_level = usize::MAX; } - let mut queue: VecDeque<(usize, Vec<&str>, &DataType, bool)> = VecDeque::new(); + let mut queue: VecDeque<(usize, Vec<&str>, &FieldRef)> = VecDeque::new(); for f in self.fields() { let name_vec: Vec<&str> = vec![f.name()]; - queue.push_back((0, name_vec, f.data_type(), f.is_nullable())); + queue.push_back((0, name_vec, f)); } let mut fields: Vec = Vec::new(); - while let Some((depth, name, data_type, nullable)) = queue.pop_front() { - if depth < max_level { - match data_type { - DataType::Struct(ff) => { - // Need to zip these in reverse to maintain original order - for fff in ff.into_iter().rev() { - let mut name = name.clone(); - name.push(separator); - name.push(fff.name().as_str()); - queue.push_front(( - depth + 1, - name.clone(), - fff.data_type(), - fff.is_nullable(), - )) - } - } - _ => { - let updated_field = Field::new(name.concat(), data_type.clone(), nullable); - fields.push(Arc::new(updated_field)); + while let Some((depth, name, field_ref)) = queue.pop_front() { + match field_ref.data_type() { + DataType::Struct(ff) if depth < max_level => { + // Need to zip these in reverse to maintain original order + for fff in ff.into_iter().rev() { + let mut name = name.clone(); + name.push(separator); + name.push(fff.name()); + queue.push_front((depth + 1, name, fff)) } } - } else { - let updated_field = Field::new(name.concat(), data_type.clone(), nullable); - fields.push(Arc::new(updated_field)); + _ => { + let updated_field = Field::new( + name.concat(), + field_ref.data_type().clone(), + field_ref.is_nullable(), + ); + fields.push(Arc::new(updated_field)); + } } } Ok(Schema::new(fields)) From af7946bf7a48f0393f32998b5de4588a126c9257 Mon Sep 17 00:00:00 2001 From: nglime Date: Fri, 10 Jan 2025 22:23:35 -0600 Subject: [PATCH 17/20] Additional test cases for List and FixedSizeList in Schema. --- arrow-array/src/record_batch.rs | 42 +++++---- arrow-schema/src/schema.rs | 158 +++++++++++++++++++++++++++++--- 2 files changed, 168 insertions(+), 32 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index bb0c1193eb4a..644962d84776 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -21,7 +21,6 @@ use crate::cast::AsArray; use crate::{new_empty_array, Array, ArrayRef, StructArray}; use arrow_schema::{ArrowError, DataType, Field, FieldRef, Schema, SchemaBuilder, SchemaRef}; -use std::collections::VecDeque; use std::ops::Index; use std::sync::Arc; @@ -438,7 +437,7 @@ impl RecordBatch { /// /// let normalized = RecordBatch::try_new(Arc::new(schema), vec![a]) /// .expect("valid conversion") - /// .normalize(".", 0) + /// .normalize(".", None) /// .expect("valid normalization"); /// /// let expected = RecordBatch::try_from_iter_with_nullable(vec![ @@ -449,19 +448,20 @@ impl RecordBatch { /// /// assert_eq!(expected, normalized); /// ``` - pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { - if max_level == 0 { - max_level = usize::MAX; - } - let mut queue: VecDeque<(usize, &ArrayRef, Vec<&str>, &FieldRef)> = VecDeque::new(); - for (c, f) in self.columns.iter().zip(self.schema.fields()) { + pub fn normalize(&self, separator: &str, max_level: Option) -> Result { + let max_level = match max_level.unwrap_or(usize::MAX) { + 0 => usize::MAX, + val => val, + }; + let mut stack: Vec<(usize, &ArrayRef, Vec<&str>, &FieldRef)> = Vec::new(); + for (c, f) in self.columns.iter().zip(self.schema.fields()).rev() { let name_vec: Vec<&str> = vec![f.name()]; - queue.push_back((0, c, name_vec, f)); + stack.push((0, c, name_vec, f)); } let mut columns: Vec = Vec::new(); let mut fields: Vec = Vec::new(); - while let Some((depth, c, name, field_ref)) = queue.pop_front() { + while let Some((depth, c, name, field_ref)) = stack.pop() { match field_ref.data_type() { DataType::Struct(ff) if depth < max_level => { // Need to zip these in reverse to maintain original order @@ -469,7 +469,7 @@ impl RecordBatch { let mut name = name.clone(); name.push(separator); name.push(fff.name()); - queue.push_front((depth + 1, cff, name, fff)) + stack.push((depth + 1, cff, name, fff)) } } _ => { @@ -1315,9 +1315,9 @@ mod tests { Field::new("month", DataType::Int64, true), ]); - let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) + let normalized = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, month.clone()]) .expect("valid conversion") - .normalize(".", 0) + .normalize(".", Some(0)) .expect("valid normalization"); let expected = RecordBatch::try_from_iter_with_nullable(vec![ @@ -1329,6 +1329,14 @@ mod tests { .expect("valid conversion"); assert_eq!(expected, normalized); + + // check 0 and None have the same effect + let normalized = RecordBatch::try_new(Arc::new(schema), vec![a, month.clone()]) + .expect("valid conversion") + .normalize(".", None) + .expect("valid normalization"); + + assert_eq!(expected, normalized); } #[test] @@ -1382,7 +1390,7 @@ mod tests { let normalized = RecordBatch::try_new(Arc::new(schema.clone()), vec![exclamation_field.clone()]) .expect("valid conversion") - .normalize(".", 1) + .normalize(".", Some(1)) .expect("valid normalization"); let expected = RecordBatch::try_from_iter_with_nullable(vec![ @@ -1412,7 +1420,7 @@ mod tests { // Normalize all levels let normalized = RecordBatch::try_new(Arc::new(schema), vec![exclamation_field]) .expect("valid conversion") - .normalize(".", 0) + .normalize(".", None) .expect("valid normalization"); let expected = RecordBatch::try_from_iter_with_nullable(vec![ @@ -1444,11 +1452,11 @@ mod tests { ]); let normalized = RecordBatch::new_empty(Arc::new(schema.clone())) - .normalize(".", 0) + .normalize(".", Some(0)) .expect("valid normalization"); let expected = RecordBatch::new_empty(Arc::new( - schema.normalize(".", 0).expect("valid normalization"), + schema.normalize(".", Some(0)).expect("valid normalization"), )); assert_eq!(expected, normalized); diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 7a829878d700..09795c135589 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use std::collections::{HashMap, VecDeque}; +use std::collections::HashMap; use std::fmt; use std::hash::Hash; use std::sync::Arc; @@ -444,7 +444,7 @@ impl Schema { /// false, /// ), /// ]) - /// .normalize(".", 0) + /// .normalize(".", None) /// .expect("valid normalization"); /// let expected = Schema::new(vec![ /// Field::new("a.animals", DataType::Utf8, true), @@ -452,18 +452,23 @@ impl Schema { /// ]); /// assert_eq!(schema, expected); /// ``` - pub fn normalize(&self, separator: &str, mut max_level: usize) -> Result { - if max_level == 0 { - max_level = usize::MAX; - } - let mut queue: VecDeque<(usize, Vec<&str>, &FieldRef)> = VecDeque::new(); - for f in self.fields() { + pub fn normalize( + &self, + separator: &str, + max_level: Option, + ) -> Result { + let max_level = match max_level.unwrap_or(usize::MAX) { + 0 => usize::MAX, + val => val, + }; + let mut stack: Vec<(usize, Vec<&str>, &FieldRef)> = Vec::new(); + for f in self.fields().iter().rev() { let name_vec: Vec<&str> = vec![f.name()]; - queue.push_back((0, name_vec, f)); + stack.push((0, name_vec, f)); } let mut fields: Vec = Vec::new(); - while let Some((depth, name, field_ref)) = queue.pop_front() { + while let Some((depth, name, field_ref)) = stack.pop() { match field_ref.data_type() { DataType::Struct(ff) if depth < max_level => { // Need to zip these in reverse to maintain original order @@ -471,7 +476,7 @@ impl Schema { let mut name = name.clone(); name.push(separator); name.push(fff.name()); - queue.push_front((depth + 1, name, fff)) + stack.push((depth + 1, name, fff)) } } _ => { @@ -772,7 +777,7 @@ mod tests { } #[test] - fn normalize() { + fn normalize_simple() { let schema = Schema::new(vec![ Field::new( "a", @@ -785,7 +790,7 @@ mod tests { ), Field::new("month", DataType::Int64, true), ]) - .normalize(".", 0) + .normalize(".", Some(0)) .expect("valid normalization"); let expected = Schema::new(vec![ @@ -796,6 +801,24 @@ mod tests { ]); assert_eq!(schema, expected); + + // Check that 0, None have the same result + let schema = Schema::new(vec![ + Field::new( + "a", + DataType::Struct(Fields::from(vec![ + Arc::new(Field::new("animals", DataType::Utf8, true)), + Arc::new(Field::new("n_legs", DataType::Int64, true)), + Arc::new(Field::new("year", DataType::Int64, true)), + ])), + false, + ), + Field::new("month", DataType::Int64, true), + ]) + .normalize(".", None) + .expect("valid normalization"); + + assert_eq!(schema, expected); } #[test] @@ -826,7 +849,7 @@ mod tests { )); let normalize_all = Schema::new(vec![exclamation.clone()]) - .normalize(".", 0) + .normalize(".", Some(0)) .expect("valid normalization"); let expected = Schema::new(vec![ @@ -841,7 +864,7 @@ mod tests { assert_eq!(normalize_all, expected); let normalize_depth_one = Schema::new(vec![exclamation]) - .normalize(".", 1) + .normalize(".", Some(1)) .expect("valid normalization"); let expected = Schema::new(vec![ @@ -852,6 +875,111 @@ mod tests { assert_eq!(normalize_depth_one, expected); } + #[test] + fn normalize_list() { + // Only the Struct type field should be unwrapped + let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let b = Arc::new(Field::new("b", DataType::Int64, false)); + let c = Arc::new(Field::new("c", DataType::Int64, true)); + let d = Arc::new(Field::new("d", DataType::Utf8, true)); + let e = Arc::new(Field::new("e", DataType::Int64, false)); + let f = Arc::new(Field::new("f", DataType::Int64, true)); + + let one = Arc::new(Field::new( + "1", + DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), + true, + )); + + let two = Arc::new(Field::new( + "2", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + ))), + false, + )); + + let exclamation = Arc::new(Field::new( + "!", + DataType::Struct(Fields::from(vec![one.clone(), two.clone()])), + false, + )); + + let normalize_all = Schema::new(vec![exclamation.clone()]) + .normalize(".", Some(0)) + .expect("valid normalization"); + + let expected = Schema::new(vec![ + Field::new("!.1.a", DataType::Utf8, true), + Field::new("!.1.b", DataType::Int64, false), + Field::new("!.1.c", DataType::Int64, true), + Field::new( + "!.2", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + ))), + false, + ), + ]); + + assert_eq!(normalize_all, expected); + + // LargeList + let two = Arc::new(Field::new( + "2", + DataType::FixedSizeList( + Arc::new(Field::new_fixed_size_list( + "3", + Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + )), + 1, + true, + )), + 1, + ), + false, + )); + + let exclamation = Arc::new(Field::new( + "!", + DataType::Struct(Fields::from(vec![one, two])), + false, + )); + + let normalize_all = Schema::new(vec![exclamation.clone()]) + .normalize(".", Some(0)) + .expect("valid normalization"); + + // List shouldn't be affected + let expected = Schema::new(vec![ + Field::new("!.1.a", DataType::Utf8, true), + Field::new("!.1.b", DataType::Int64, false), + Field::new("!.1.c", DataType::Int64, true), + Field::new( + "!.2", + DataType::FixedSizeList( + Arc::new(Field::new_fixed_size_list( + "3", + Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + )), + 1, + true, + )), + 1, + ), + false, + ), + ]); + + assert_eq!(normalize_all, expected); + } + #[test] #[should_panic( expected = "Unable to get field named \\\"nickname\\\". Valid fields: [\\\"first_name\\\", \\\"last_name\\\", \\\"address\\\", \\\"interests\\\"]" From e97cc9cbc1f4cf3243f10c12704e40574837e53d Mon Sep 17 00:00:00 2001 From: nglime Date: Sun, 19 Jan 2025 22:37:35 -0600 Subject: [PATCH 18/20] Additional test cases for deeply nested normalization. --- arrow-array/src/record_batch.rs | 9 +- arrow-schema/src/schema.rs | 216 ++++++++++++++++++++++++++++++-- 2 files changed, 211 insertions(+), 14 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index 644962d84776..ab05a855ca0a 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -1315,10 +1315,11 @@ mod tests { Field::new("month", DataType::Int64, true), ]); - let normalized = RecordBatch::try_new(Arc::new(schema.clone()), vec![a, month.clone()]) - .expect("valid conversion") - .normalize(".", Some(0)) - .expect("valid normalization"); + let normalized = + RecordBatch::try_new(Arc::new(schema.clone()), vec![a.clone(), month.clone()]) + .expect("valid conversion") + .normalize(".", Some(0)) + .expect("valid normalization"); let expected = RecordBatch::try_from_iter_with_nullable(vec![ ("a.animals", animals.clone(), true), diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 09795c135589..291f8e88d993 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -452,11 +452,7 @@ impl Schema { /// ]); /// assert_eq!(schema, expected); /// ``` - pub fn normalize( - &self, - separator: &str, - max_level: Option, - ) -> Result { + pub fn normalize(&self, separator: &str, max_level: Option) -> Result { let max_level = match max_level.unwrap_or(usize::MAX) { 0 => usize::MAX, val => val, @@ -907,9 +903,10 @@ mod tests { )); let normalize_all = Schema::new(vec![exclamation.clone()]) - .normalize(".", Some(0)) + .normalize(".", None) .expect("valid normalization"); + // List shouldn't be affected let expected = Schema::new(vec![ Field::new("!.1.a", DataType::Utf8, true), Field::new("!.1.b", DataType::Int64, false), @@ -925,8 +922,9 @@ mod tests { ]); assert_eq!(normalize_all, expected); + assert_eq!(normalize_all.fields().len(), 4); - // LargeList + // FixedSizeList let two = Arc::new(Field::new( "2", DataType::FixedSizeList( @@ -946,15 +944,15 @@ mod tests { let exclamation = Arc::new(Field::new( "!", - DataType::Struct(Fields::from(vec![one, two])), + DataType::Struct(Fields::from(vec![one.clone(), two])), false, )); let normalize_all = Schema::new(vec![exclamation.clone()]) - .normalize(".", Some(0)) + .normalize(".", None) .expect("valid normalization"); - // List shouldn't be affected + // FixedSizeList shouldn't be affected let expected = Schema::new(vec![ Field::new("!.1.a", DataType::Utf8, true), Field::new("!.1.b", DataType::Int64, false), @@ -977,6 +975,204 @@ mod tests { ), ]); + assert_eq!(normalize_all, expected); + assert_eq!(normalize_all.fields().len(), 4); + + // LargeList + let two = Arc::new(Field::new( + "2", + DataType::FixedSizeList( + Arc::new(Field::new_large_list( + "3", + Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + )), + true, + )), + 1, + ), + false, + )); + + let exclamation = Arc::new(Field::new( + "!", + DataType::Struct(Fields::from(vec![one.clone(), two])), + false, + )); + + let normalize_all = Schema::new(vec![exclamation.clone()]) + .normalize(".", None) + .expect("valid normalization"); + + // LargeList shouldn't be affected + let expected = Schema::new(vec![ + Field::new("!.1.a", DataType::Utf8, true), + Field::new("!.1.b", DataType::Int64, false), + Field::new("!.1.c", DataType::Int64, true), + Field::new( + "!.2", + DataType::FixedSizeList( + Arc::new(Field::new_large_list( + "3", + Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + )), + true, + )), + 1, + ), + false, + ), + ]); + + assert_eq!(normalize_all, expected); + assert_eq!(normalize_all.fields().len(), 4); + } + + #[test] + fn normalize_deep_nested() { + // No unwrapping expected + let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let b = Arc::new(Field::new("b", DataType::Int64, false)); + let c = Arc::new(Field::new("c", DataType::Int64, true)); + let d = Arc::new(Field::new("d", DataType::Utf8, true)); + let e = Arc::new(Field::new("e", DataType::Int64, false)); + let f = Arc::new(Field::new("f", DataType::Int64, true)); + + let one = Arc::new(Field::new( + "1", + DataType::Struct(Fields::from(vec![a.clone(), b.clone(), c.clone()])), + true, + )); + + let two = Arc::new(Field::new( + "2", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![d.clone(), e.clone(), f.clone()])), + true, + ))), + false, + )); + + let l10 = Arc::new(Field::new( + "l10", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![one, two])), + true, + ))), + false, + )); + + let l9 = Arc::new(Field::new( + "l9", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l10])), + true, + ))), + false, + )); + + let l8 = Arc::new(Field::new( + "l8", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l9])), + true, + ))), + false, + )); + let l7 = Arc::new(Field::new( + "l7", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l8])), + true, + ))), + false, + )); + let l6 = Arc::new(Field::new( + "l6", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l7])), + true, + ))), + false, + )); + let l5 = Arc::new(Field::new( + "l5", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l6])), + true, + ))), + false, + )); + let l4 = Arc::new(Field::new( + "l4", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l5])), + true, + ))), + false, + )); + let l3 = Arc::new(Field::new( + "l3", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l4])), + true, + ))), + false, + )); + let l2 = Arc::new(Field::new( + "l2", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l3])), + true, + ))), + false, + )); + let l1 = Arc::new(Field::new( + "l1", + DataType::List(Arc::new(Field::new_list_field( + DataType::Struct(Fields::from(vec![l2])), + true, + ))), + false, + )); + + let normalize_all = Schema::new(vec![l1]) + .normalize(".", None) + .expect("valid normalization"); + + assert_eq!(normalize_all.fields().len(), 1); + } + + #[test] + fn normalize_dictionary() { + let a = Arc::new(Field::new("a", DataType::Utf8, true)); + let b = Arc::new(Field::new("b", DataType::Int64, false)); + + let one = Arc::new(Field::new( + "1", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Struct(Fields::from(vec![a.clone(), b.clone()]))), + ), + false, + )); + + let normalize_all = Schema::new(vec![one.clone()]) + .normalize(".", None) + .expect("valid normalization"); + + let expected = Schema::new(vec![Field::new( + "1", + DataType::Dictionary( + Box::new(DataType::Int32), + Box::new(DataType::Struct(Fields::from(vec![a.clone(), b.clone()]))), + ), + false, + )]); + assert_eq!(normalize_all, expected); } From b90e8f5b97d7a22a4a672f5bc7420ecb176c043a Mon Sep 17 00:00:00 2001 From: nglime Date: Mon, 20 Jan 2025 14:34:58 -0600 Subject: [PATCH 19/20] Suggestions from Jefffrey on the descriptions and stack initialization. --- arrow-array/src/record_batch.rs | 33 +++++++++++++++++++++----------- arrow-schema/src/schema.rs | 34 ++++++++++++++++++++------------- 2 files changed, 43 insertions(+), 24 deletions(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index ab05a855ca0a..bb17dd4c1e7b 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -397,17 +397,23 @@ impl RecordBatch { /// Normalize a semi-structured [`RecordBatch`] into a flat table. /// - /// `separator`: Nested [`Field`]s will generate names separated by `separator`, e.g. for - /// separator= "." and the schema: + /// Nested [`Field`]s will generate names separated by `separator`, up to a depth of `max_level` + /// (unlimited if `None`). + /// + /// e.g. given a [`RecordBatch`] with schema: + /// /// ```text /// "foo": StructArray<"bar": Utf8> /// ``` - /// will generate: + /// + /// A separator of `"."` would generate a batch with the schema: + /// /// ```text /// "foo.bar": Utf8 /// ``` - /// `max_level`: The maximum number of levels (depth of the `Schema` and `Columns`) to - /// normalize. If `0`, normalizes all levels. + /// + /// Note that giving a depth of `Some(0)` to `max_level` is the same as passing in `None`; + /// it will be treated as unlimited. /// /// # Example /// @@ -415,7 +421,7 @@ impl RecordBatch { /// # use std::sync::Arc; /// # use arrow_array::{ArrayRef, Int64Array, StringArray, StructArray, RecordBatch}; /// # use arrow_schema::{DataType, Field, Fields, Schema}; - /// + /// # /// let animals: ArrayRef = Arc::new(StringArray::from(vec!["Parrot", ""])); /// let n_legs: ArrayRef = Arc::new(Int64Array::from(vec![Some(2), Some(4)])); /// @@ -453,11 +459,16 @@ impl RecordBatch { 0 => usize::MAX, val => val, }; - let mut stack: Vec<(usize, &ArrayRef, Vec<&str>, &FieldRef)> = Vec::new(); - for (c, f) in self.columns.iter().zip(self.schema.fields()).rev() { - let name_vec: Vec<&str> = vec![f.name()]; - stack.push((0, c, name_vec, f)); - } + let mut stack: Vec<(usize, &ArrayRef, Vec<&str>, &FieldRef)> = self + .columns + .iter() + .zip(self.schema.fields()) + .rev() + .map(|c, f| { + let name_vec: Vec<&str> = vec![f.name()]; + (0, c, name_vec, f) + }) + .collect(); let mut columns: Vec = Vec::new(); let mut fields: Vec = Vec::new(); diff --git a/arrow-schema/src/schema.rs b/arrow-schema/src/schema.rs index 291f8e88d993..6e66ed080e58 100644 --- a/arrow-schema/src/schema.rs +++ b/arrow-schema/src/schema.rs @@ -413,21 +413,25 @@ impl Schema { &self.metadata } - /// Returns a new schema, normalized based on the max_level field. - /// `separator`: Nested [`Field`]s will generate names separated by `separator`, e.g. for - /// separator= "." and the schema: + /// Normalize a [`Schema`] into a flat table. + /// + /// Nested [`Field`]s will generate names separated by `separator`, up to a depth of `max_level` + /// (unlimited if `None`). + /// + /// e.g. given a [`Schema`]: + /// /// ```text /// "foo": StructArray<"bar": Utf8> /// ``` - /// will generate: + /// + /// A separator of `"."` would generate a batch with the schema: + /// /// ```text /// "foo.bar": Utf8 /// ``` /// - /// `max_level`: The maximum number of levels (depth of the `Schema`) to normalize. If `0`, - /// normalizes all levels. - /// - /// This carries metadata from the parent schema over. + /// Note that giving a depth of `Some(0)` to `max_level` is the same as passing in `None`; + /// it will be treated as unlimited. /// /// # Example /// @@ -457,11 +461,15 @@ impl Schema { 0 => usize::MAX, val => val, }; - let mut stack: Vec<(usize, Vec<&str>, &FieldRef)> = Vec::new(); - for f in self.fields().iter().rev() { - let name_vec: Vec<&str> = vec![f.name()]; - stack.push((0, name_vec, f)); - } + let mut stack: Vec<(usize, Vec<&str>, &FieldRef)> = self + .fields() + .iter() + .rev() + .map(|f| { + let name_vec: Vec<&str> = vec![f.name()]; + (0, name_vec, f) + }) + .collect(); let mut fields: Vec = Vec::new(); while let Some((depth, name, field_ref)) = stack.pop() { From 6a2e3cac863a27d830e3b1ea1168b07b97c4d47b Mon Sep 17 00:00:00 2001 From: nglime Date: Mon, 20 Jan 2025 14:36:59 -0600 Subject: [PATCH 20/20] Forgot parenthesis. --- arrow-array/src/record_batch.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/arrow-array/src/record_batch.rs b/arrow-array/src/record_batch.rs index bb17dd4c1e7b..a6c2aee7cbc6 100644 --- a/arrow-array/src/record_batch.rs +++ b/arrow-array/src/record_batch.rs @@ -464,7 +464,7 @@ impl RecordBatch { .iter() .zip(self.schema.fields()) .rev() - .map(|c, f| { + .map(|(c, f)| { let name_vec: Vec<&str> = vec![f.name()]; (0, c, name_vec, f) })