From 55f92ed353ebafb4d12b642f300fe5a31d4a083b Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 16 May 2024 11:30:36 -0400 Subject: [PATCH] thread through statistics analysis --- datafusion-examples/examples/parquet_index.rs | 184 ++++++++++++------ 1 file changed, 126 insertions(+), 58 deletions(-) diff --git a/datafusion-examples/examples/parquet_index.rs b/datafusion-examples/examples/parquet_index.rs index 0ac8192269d6e..fd2a6ebcdb04a 100644 --- a/datafusion-examples/examples/parquet_index.rs +++ b/datafusion-examples/examples/parquet_index.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::{ArrayRef, Int32Array, RecordBatch, StringArray, UInt64Array}; -use arrow_schema::SchemaRef; +use arrow::array::{Array, ArrayRef, AsArray, Int32Array, RecordBatch, StringArray, UInt64Array}; +use arrow_schema::{SchemaRef}; use async_trait::async_trait; use datafusion::datasource::listing::PartitionedFile; use datafusion::datasource::physical_plan::{ @@ -33,8 +33,7 @@ use datafusion::prelude::*; use datafusion::prelude::{SessionConfig, SessionContext}; use datafusion_common::config::TableParquetOptions; use datafusion_common::{ - internal_datafusion_err, DFSchema, DataFusionError, Result, - Statistics, + internal_datafusion_err, DFSchema, DataFusionError, Result, Statistics, }; use datafusion_expr::utils::conjunction; use datafusion_expr::TableType; @@ -44,6 +43,7 @@ use std::fs::{DirEntry, File}; use std::ops::Range; use std::path::{Path, PathBuf}; use std::sync::Arc; +use arrow::datatypes::{Int32Type, UInt64Type}; use tempfile::TempDir; use url::Url; @@ -116,8 +116,6 @@ pub struct IndexTableProvider { files: Vec>, } - - impl IndexTableProvider { /// Create a new IndexTableProvider pub fn try_new(dir: impl Into) -> Result { @@ -133,7 +131,6 @@ impl IndexTableProvider { let index = index_builder.build()?; - // todo make a nicer API for this ("partitioned files" from directory) let files = files .iter() @@ -151,21 +148,6 @@ impl IndexTableProvider { } } -/// Get the schema for the parquet files in the given directory, assuming they -/// are all the same -/// -/// Note this could be done with SchemaAdapter once DataFusion 39 is released -/// -fn get_schema(file: &Path) -> Result { - let file = File::open(file).map_err(|e| { - DataFusionError::from(e).context(format!("Error opening file {file:?}")) - })?; - - let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; - - Ok(reader.schema().clone()) -} - #[async_trait] impl TableProvider for IndexTableProvider { fn as_any(&self) -> &dyn Any { @@ -232,19 +214,20 @@ impl TableProvider for IndexTableProvider { } } -/// This is very simple in memory index for a set of parquet files +/// Simple in memory index for a set of parquet files /// /// The index is represented as an arrow `RecordBatch` that can be passed /// directly by the DataFusion [`PruningPredicate`] API /// /// The index looks like -/// | file_name | file_size | row_count | value_column_null_count | value_column_min | value_column_max | -/// |-----------|-----------|-----------|-------------------------|------------------|------------------| -/// | file1.parquet | 1000 | 100 | 0 | 0 | 100 | -/// | file2.parquet | 1000 | 100 | 0 | 100 | 200 | -/// | file3.parquet | 1000 | 100 | 0 | 200 | 300 | +/// | file_name | file_size | row_count | value_column_min | value_column_max | +/// |-----------|-----------|-----------|------------------|------------------| +/// | file1.parquet | 1000 | 100 | 0 | 100 | +/// | file2.parquet | 1000 | 100 | 100 | 200 | +/// | file3.parquet | 1000 | 100 | 200 | 300 | /// -/// // TODO eventually store this information for each row group within a file +/// Note a more advanced index would store this information for each row group +/// within a file struct ParquetMetadataIndex { file_schema: SchemaRef, @@ -264,62 +247,151 @@ struct ParquetMetadataIndexBuilder { filenames: Vec, file_sizes: Vec, row_counts: Vec, - value_column_null_counts: Vec, - /// Holds the min/max value of the value column for each file as a single - /// value array. - /// - /// In the more advanced parquet index, this will hold values for each row - /// group within a file - value_column_mins: Vec, - value_column_maxs: Vec, + /// Holds the min/max value of the value column + value_column_mins: Vec, + value_column_maxs: Vec, } - impl ParquetMetadataIndexBuilder { fn new() -> Self { Self::default() } /// Add a file to the index - fn add_file(&mut self, filename: &Path) -> Result<()> { - println!("Adding file {:?}", filename); + fn add_file(&mut self, file: &Path) -> Result<()> { + let file_name = file + .file_name() + .ok_or_else(|| internal_datafusion_err!("No filename"))? + .to_str() + .ok_or_else(|| internal_datafusion_err!("Invalid filename"))?; + let file_size = file.metadata()?.len(); + + println!("Adding file {file_name}"); + + let file = File::open(file).map_err(|e| { + DataFusionError::from(e).context(format!("Error opening file {file:?}")) + })?; + + let reader = ParquetRecordBatchReaderBuilder::try_new(file)?; + + // Get the schema of the file. A real system might have to handle the + // case where the schema of the file is not the same as the schema of + // the other files e.g. using SchemaAdapter. if self.file_schema.is_none() { - self.file_schema = Some(get_schema(filename)?); + self.file_schema = Some(reader.schema().clone()); } + + // extract the statistics from the file + let metadata = reader.metadata(); + + // TODO: extract the min/max values for each row group + let stats = parquet_stats_to_arrow("value", &reader)?; + // our example has no nulls, so this is a sanity check + assert_eq!(stats.row_count.null_count(), 0); + assert_eq!(stats.min.null_count(), 0); + assert_eq!(stats.max.null_count(), 0); + + let row_count = stats.row_count.as_primitive::() + .iter().flatten().sum::(); + + let value_column_min = stats.min.as_primitive::() + .iter().flatten().min().unwrap_or_default(); + let value_column_max = stats.min.as_primitive::() + .iter().flatten().min().unwrap_or_default(); + + // sanity check the statistics + assert_eq!(row_count, metadata.file_metadata().num_rows() as u64); + + + self.add_row(file_name, file_size, row_count, value_column_min, value_column_max); todo!(); } + /// Add a single row values to all the in progress rows + fn add_row( + &mut self, + file_name: impl Into, + file_size: u64, + row_count: u64, + value_column_min: i32, + value_column_max: i32, + ) { + self.filenames.push(file_name.into()); + self.file_sizes.push(file_size); + self.row_counts.push(row_count); + self.value_column_mins.push(value_column_min); + self.value_column_maxs.push(value_column_max); + } + /// Build the index from the files added fn build(self) -> Result { let Some(file_schema) = self.file_schema else { return Err(internal_datafusion_err!("No files added to index")); }; - let value_column_mins = concat_arrays(&self.value_column_mins)?; - let value_column_maxs = concat_arrays(&self.value_column_maxs)?; - let index = RecordBatch::try_from_iter(vec![ - ("file_name", Arc::new(StringArray::from(self.filenames)) as ArrayRef), - ("file_size", Arc::new(UInt64Array::from(self.file_sizes)) as ArrayRef), - ("row_count", Arc::new(UInt64Array::from(self.row_counts)) as ArrayRef), - ("value_column_null_count", Arc::new(UInt64Array::from(self.value_column_null_counts)) as ArrayRef), - ("value_column_min", value_column_mins), - ("value_column_max", value_column_maxs), + ( + "file_name", + Arc::new(StringArray::from(self.filenames)) as ArrayRef, + ), + ( + "file_size", + Arc::new(UInt64Array::from(self.file_sizes)) as ArrayRef, + ), + ( + "row_count", + Arc::new(UInt64Array::from(self.row_counts)) as ArrayRef, + ), + ( + "value_column_min", + Arc::new(Int32Array::from(self.value_column_mins)) as ArrayRef, + ), + ( + "value_column_max", + Arc::new(Int32Array::from(self.value_column_maxs)) as ArrayRef, + ), ])?; Ok(ParquetMetadataIndex { file_schema, index }) } } +/// TODO use the new +/// API from https://github.com/apache/datafusion/issues/10453 +pub struct ArrowStatistics { + /// min values + min: ArrayRef, + /// max values + max: ArrayRef, + /// Row counts (UInt64Array) + row_count: ArrayRef, + /// Null Counts (UInt64Array) + null_count: ArrayRef, +} + +/// extract the minimum value in the statistics for the given column, if any +pub fn parquet_stats_to_arrow( + column_name: &str, + // todo only take the fields of this we need + parquet_record_batch_reader_builder: &ParquetRecordBatchReaderBuilder +) -> Result { + +todo!(); + +} + + + + + /// Return a list of the directory entries in the given directory, sorted by name -fn read_dir(dir: &Path) -> Result> { +fn read_dir(dir: &Path) -> Result> { let mut files = dir .read_dir() .map_err(|e| { - DataFusionError::from(e) - .context(format!("Error reading directory {dir:?}")) + DataFusionError::from(e).context(format!("Error reading directory {dir:?}")) })? .map(|entry| { entry.map_err(|e| { @@ -332,11 +404,7 @@ fn read_dir(dir: &Path) -> Result> { Ok(files) } -fn concat_arrays(arrays: &[ArrayRef]) -> Result { - // Need to use refs for some reason - let arrays = arrays.iter().map(|a| a.as_ref()).collect::>(); - Ok(arrow::compute::concat(&arrays)?) -} + /// Demonstration Data ///