From 8180c3aee4facec2b4c25fcc5ddd0afaabd70bff Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 18 Oct 2022 12:50:20 -0400 Subject: [PATCH 1/2] Consolidate remaining parquet config options into ConfigOptions --- benchmarks/src/bin/tpch.rs | 3 +- datafusion-examples/examples/flight_server.rs | 5 +- .../examples/parquet_sql_multiple_files.rs | 2 +- datafusion/core/src/config.rs | 33 +++++ .../src/datasource/file_format/parquet.rs | 134 +++++++++++------- .../core/src/datasource/listing/table.rs | 16 ++- datafusion/core/src/execution/context.rs | 41 ++++-- datafusion/core/src/execution/options.rs | 39 ++--- .../src/physical_plan/file_format/parquet.rs | 12 +- datafusion/core/tests/path_partition.rs | 3 +- datafusion/core/tests/row.rs | 9 +- .../core/tests/sql/information_schema.rs | 3 + datafusion/core/tests/sql/parquet_schema.rs | 11 +- datafusion/proto/src/logical_plan.rs | 3 +- 14 files changed, 196 insertions(+), 118 deletions(-) diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index b2f2bf181787..71257d7b9647 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -431,7 +431,8 @@ async fn get_table( } "parquet" => { let path = format!("{}/{}", path, table); - let format = ParquetFormat::default().with_enable_pruning(true); + let format = ParquetFormat::new(ctx.config.config_options()) + .with_enable_pruning(true); (Arc::new(format), path, DEFAULT_PARQUET_EXTENSION) } diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 88cc6f1c23be..16463185fab2 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -67,11 +67,12 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { let request = request.into_inner(); - let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let ctx = SessionContext::new(); + let format = Arc::new(ParquetFormat::new(ctx.config_options())); + let listing_options = ListingOptions::new(format); let table_path = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; - let ctx = SessionContext::new(); let schema = listing_options .infer_schema(&ctx.state(), &table_path) .await diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 6004ce67df29..8bf2ea70891f 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { let testdata = datafusion::test_util::parquet_test_data(); // Configure listing options - let file_format = ParquetFormat::default().with_enable_pruning(true); + let file_format = ParquetFormat::new(ctx.config_options()); let listing_options = ListingOptions { file_extension: FileType::PARQUET.get_ext(), format: Arc::new(file_format), diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index e058f5c72bd4..b7a328983ea2 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -60,6 +60,16 @@ pub const OPT_PARQUET_REORDER_FILTERS: &str = pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str = "datafusion.execution.parquet.enable_page_index"; +/// Configuration option "datafusion.execution.parquet.pruning" +pub const OPT_PARQUET_ENABLE_PRUNING: &str = "datafusion.execution.parquet.pruning"; + +/// Configuration option "datafusion.execution.parquet.skip_metadata" +pub const OPT_PARQUET_SKIP_METADATA: &str = "datafusion.execution.parquet.skip_metadata"; + +/// Configuration option "datafusion.execution.parquet.metadata_size_hint" +pub const OPT_PARQUET_METADATA_SIZE_HINT: &str = + "datafusion.execution.parquet.metadata_size_hint"; + /// Configuration option "datafusion.optimizer.skip_failed_rules" pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str = "datafusion.optimizer.skip_failed_rules"; @@ -237,6 +247,29 @@ impl BuiltInConfigs { to reduce the number of rows decoded.", false, ), + ConfigDefinition::new_bool( + OPT_PARQUET_ENABLE_PRUNING, + "If true, the parquet reader attempts to skip entire row groups based \ + on the predicate in the query and the metadata (min/max values) stored in \ + the parquet file.", + true, + ), + ConfigDefinition::new_bool( + OPT_PARQUET_SKIP_METADATA, + "If true, the parquet reader skip the optional embedded metadata that may be in \ + the file Schema. This setting can help avoid schema conflicts when querying \ + multiple parquet files with schemas containing compatible types but different metadata.", + true, + ), + ConfigDefinition::new( + OPT_PARQUET_METADATA_SIZE_HINT, + "If specified, the parquet reader will try and fetch the last `size_hint` \ + bytes of the parquet file optimistically. If not specified, two read are required: \ + One read to fetch the 8-byte parquet footer and \ + another to fetch the metadata length encoded in the footer.", + DataType::Boolean, + ScalarValue::Boolean(None), + ), ConfigDefinition::new_bool( OPT_OPTIMIZER_SKIP_FAILED_RULES, "When set to true, the logical plan optimizer will produce warning \ diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 07819bdf52cb..668aeea609ea 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -28,6 +28,7 @@ use datafusion_common::DataFusionError; use datafusion_optimizer::utils::conjunction; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; +use parking_lot::RwLock; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; @@ -39,6 +40,10 @@ use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::{DataType, Field}; +use crate::config::ConfigOptions; +use crate::config::OPT_PARQUET_ENABLE_PRUNING; +use crate::config::OPT_PARQUET_METADATA_SIZE_HINT; +use crate::config::OPT_PARQUET_SKIP_METADATA; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::logical_expr::Expr; @@ -52,26 +57,23 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; /// The Apache Parquet `FileFormat` implementation #[derive(Debug)] pub struct ParquetFormat { - enable_pruning: bool, - metadata_size_hint: Option, - skip_metadata: bool, + config_options: Arc>, } -impl Default for ParquetFormat { - fn default() -> Self { - Self { - enable_pruning: true, - metadata_size_hint: None, - skip_metadata: true, - } +impl ParquetFormat { + /// construct a new Format with the specified `ConfigOptions` + pub fn new(config_options: Arc>) -> Self { + Self { config_options } } } impl ParquetFormat { /// Activate statistics based row group level pruning /// - defaults to true - pub fn with_enable_pruning(mut self, enable: bool) -> Self { - self.enable_pruning = enable; + pub fn with_enable_pruning(self, enable: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_PARQUET_ENABLE_PRUNING, enable); self } @@ -79,32 +81,46 @@ impl ParquetFormat { /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. /// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then /// another read to fetch the metadata length encoded in the footer. - pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self { - self.metadata_size_hint = Some(size_hint); + pub fn with_metadata_size_hint(self, size_hint: usize) -> Self { + self.config_options + .write() + .set_u64(OPT_PARQUET_METADATA_SIZE_HINT, size_hint as u64); + self + } + + /// Tell the parquet reader to skip any metadata that may be in + /// the file Schema. This can help avoid schema conflicts due to + /// metadata. Defaults to true. + pub fn with_skip_metadata(self, skip_metadata: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_PARQUET_SKIP_METADATA, skip_metadata); self } + /// Return true if pruning is enabled pub fn enable_pruning(&self) -> bool { - self.enable_pruning + self.config_options + .read() + .get_bool(OPT_PARQUET_ENABLE_PRUNING) + .unwrap_or(false) } /// Return the metadata size hint if set pub fn metadata_size_hint(&self) -> Option { - self.metadata_size_hint - } - - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self { - self.skip_metadata = skip_metadata; - self + self.config_options + .read() + .get_u64(OPT_PARQUET_METADATA_SIZE_HINT) + .map(|u| u as usize) } /// returns true if schema metadata will be cleared prior to /// schema merging. pub fn skip_metadata(&self) -> bool { - self.skip_metadata + self.config_options + .read() + .get_bool(OPT_PARQUET_SKIP_METADATA) + .unwrap_or(false) } } @@ -139,11 +155,11 @@ impl FileFormat for ParquetFormat { let mut schemas = Vec::with_capacity(objects.len()); for object in objects { let schema = - fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?; + fetch_schema(store.as_ref(), object, self.metadata_size_hint()).await?; schemas.push(schema) } - let schema = if self.skip_metadata { + let schema = if self.skip_metadata() { Schema::try_merge(clear_metadata(schemas)) } else { Schema::try_merge(schemas) @@ -162,7 +178,7 @@ impl FileFormat for ParquetFormat { store.as_ref(), table_schema, object, - self.metadata_size_hint, + self.metadata_size_hint(), ) .await?; Ok(stats) @@ -176,7 +192,7 @@ impl FileFormat for ParquetFormat { // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. - let predicate = if self.enable_pruning { + let predicate = if self.enable_pruning() { conjunction(filters.to_vec()) } else { None @@ -601,7 +617,8 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; - let format = ParquetFormat::default(); + let ctx = SessionContext::new(); + let format = ParquetFormat::new(ctx.config_options()); let schema = format.infer_schema(&store, &meta).await.unwrap(); let stats = @@ -748,7 +765,8 @@ mod tests { assert_eq!(store.request_count(), 2); - let format = ParquetFormat::default().with_metadata_size_hint(9); + let ctx = SessionContext::new(); + let format = ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(9); let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); let stats = @@ -775,7 +793,8 @@ mod tests { // ensure the requests were coalesced into a single request assert_eq!(store.request_count(), 1); - let format = ParquetFormat::default().with_metadata_size_hint(size_hint); + let format = + ParquetFormat::new(ctx.config_options()).with_metadata_size_hint(size_hint); let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); let stats = fetch_statistics( store.upcast().as_ref(), @@ -812,7 +831,7 @@ mod tests { let config = SessionConfig::new().with_batch_size(2); let ctx = SessionContext::with_config(config); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx)?; @@ -841,11 +860,12 @@ mod tests { // Read the full file let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; // Read only one column. This should scan less data. let projection = Some(vec![0]); - let exec_projected = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec_projected = + get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let task_ctx = ctx.task_ctx(); @@ -863,7 +883,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, Some(1)).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, Some(1)).await?; // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics().num_rows, Some(8)); @@ -882,7 +903,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let x: Vec = exec .schema() @@ -920,7 +942,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -950,7 +973,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -977,7 +1001,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1004,7 +1029,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1034,7 +1060,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1064,7 +1091,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1095,7 +1123,7 @@ mod tests { let task_ctx = session_ctx.task_ctx(); // parquet use the int32 as the physical type to store decimal - let exec = get_exec("int32_decimal.parquet", None, None).await?; + let exec = get_exec(&session_ctx, "int32_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1103,7 +1131,7 @@ mod tests { assert_eq!(&DataType::Decimal128(4, 2), column.data_type()); // parquet use the int64 as the physical type to store decimal - let exec = get_exec("int64_decimal.parquet", None, None).await?; + let exec = get_exec(&session_ctx, "int64_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1111,14 +1139,21 @@ mod tests { assert_eq!(&DataType::Decimal128(10, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal - let exec = get_exec("fixed_length_decimal.parquet", None, None).await?; + let exec = + get_exec(&session_ctx, "fixed_length_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); assert_eq!(&DataType::Decimal128(25, 2), column.data_type()); - let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?; + let exec = get_exec( + &session_ctx, + "fixed_length_decimal_legacy.parquet", + None, + None, + ) + .await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1128,7 +1163,7 @@ mod tests { // parquet use the fixed length binary as the physical type to store decimal // TODO: arrow-rs don't support convert the physical type of binary to decimal // https://github.com/apache/arrow-rs/pull/2160 - // let exec = get_exec("byte_array_decimal.parquet", None, None).await?; + // let exec = get_exec(&session_ctx, "byte_array_decimal.parquet", None, None).await?; Ok(()) } @@ -1212,12 +1247,13 @@ mod tests { } async fn get_exec( + ctx: &SessionContext, file_name: &str, projection: Option>, limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ctx.config_options()); scan_format(&format, &testdata, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index deaa09249e11..c3a4528377a4 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -28,6 +28,7 @@ use object_store::path::Path; use object_store::ObjectMeta; use parking_lot::RwLock; +use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::{ file_format::{ @@ -104,7 +105,10 @@ impl ListingTableConfig { } } - fn infer_format(path: &str) -> Result<(Arc, String)> { + fn infer_format( + config_options: Arc>, + path: &str, + ) -> Result<(Arc, String)> { let err_msg = format!("Unable to infer file type from path: {}", path); let mut exts = path.rsplit('.'); @@ -133,7 +137,7 @@ impl ListingTableConfig { FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::PARQUET => Arc::new(ParquetFormat::new(config_options)), }; Ok((file_format, ext)) @@ -154,8 +158,10 @@ impl ListingTableConfig { .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let (format, file_extension) = - ListingTableConfig::infer_format(file.location.as_ref())?; + let (format, file_extension) = ListingTableConfig::infer_format( + ctx.config.config_options(), + file.location.as_ref(), + )?; let listing_options = ListingOptions { format, @@ -538,7 +544,7 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))); let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index f140ce1c3b98..8eda97d142ae 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -21,6 +21,7 @@ use crate::{ catalog::{CatalogList, MemoryCatalogList}, information_schema::CatalogWithInformationSchema, }, + config::OPT_PARQUET_ENABLE_PRUNING, datasource::listing::{ListingOptions, ListingTable}, datasource::{ file_format::{ @@ -212,6 +213,11 @@ impl SessionContext { self.state.read().runtime_env.clone() } + /// Return a handle to the shared configuration options + pub fn config_options(&self) -> Arc> { + self.state.read().config.config_options() + } + /// Return the session_id of this Session pub fn session_id(&self) -> String { self.session_id.clone() @@ -465,7 +471,7 @@ impl SessionContext { .with_delimiter(cmd.delimiter as u8) .with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::PARQUET => Arc::new(ParquetFormat::new(self.config_options())), FileType::AVRO => Arc::new(AvroFormat::default()), FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), @@ -680,7 +686,8 @@ impl SessionContext { let table_path = ListingTableUrl::parse(table_path)?; let target_partitions = self.copied_config().target_partitions; - let listing_options = options.to_listing_options(target_partitions); + let listing_options = + options.to_listing_options(self.config_options(), target_partitions); // with parquet we resolve the schema in all cases let resolved_schema = listing_options @@ -799,13 +806,10 @@ impl SessionContext { table_path: &str, options: ParquetReadOptions<'_>, ) -> Result<()> { - let (target_partitions, parquet_pruning) = { - let conf = self.copied_config(); - (conf.target_partitions, conf.parquet_pruning) - }; - let listing_options = options - .parquet_pruning(parquet_pruning) - .to_listing_options(target_partitions); + let listing_options = options.to_listing_options( + self.config_options(), + self.copied_config().target_partitions, + ); self.register_listing_table(name, table_path, listing_options, None, None) .await?; @@ -1142,8 +1146,6 @@ pub struct SessionConfig { /// Should DataFusion repartition data using the partition keys to execute window functions in /// parallel using the provided `target_partitions` level pub repartition_windows: bool, - /// Should DataFusion parquet reader using the predicate to prune data - pub parquet_pruning: bool, /// Should DataFusion collect statistics after listing files pub collect_statistics: bool, /// Configuration options @@ -1163,7 +1165,6 @@ impl Default for SessionConfig { repartition_joins: true, repartition_aggregations: true, repartition_windows: true, - parquet_pruning: true, collect_statistics: false, config_options: Arc::new(RwLock::new(ConfigOptions::new())), // Assume no extensions by default. @@ -1262,11 +1263,21 @@ impl SessionConfig { } /// Enables or disables the use of pruning predicate for parquet readers to skip row groups - pub fn with_parquet_pruning(mut self, enabled: bool) -> Self { - self.parquet_pruning = enabled; + pub fn with_parquet_pruning(self, enabled: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_PARQUET_ENABLE_PRUNING, enabled); self } + /// Returns true if pruning predicate use is enabled for parquet reader + pub fn parquet_pruning(&self) -> bool { + self.config_options + .read() + .get_bool(OPT_PARQUET_ENABLE_PRUNING) + .unwrap_or(false) + } + /// Enables or disables the collection of statistics after listing files pub fn with_collect_statistics(mut self, enabled: bool) -> Self { self.collect_statistics = enabled; @@ -1314,7 +1325,7 @@ impl SessionConfig { ); map.insert( PARQUET_PRUNING.to_owned(), - format!("{}", self.parquet_pruning), + format!("{}", self.parquet_pruning()), ); map.insert( COLLECT_STATISTICS.to_owned(), diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index 150a20670ce2..bc1837e5171f 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -20,7 +20,9 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; +use parking_lot::RwLock; +use crate::config::ConfigOptions; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::file_type::FileCompressionType; @@ -168,45 +170,18 @@ pub struct ParquetReadOptions<'a> { pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec, - /// Should DataFusion parquet reader use the predicate to prune data, - /// overridden by value on execution::context::SessionConfig - // TODO move this into ConfigOptions - pub parquet_pruning: bool, - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - // TODO move this into ConfigOptions - pub skip_metadata: bool, } impl<'a> Default for ParquetReadOptions<'a> { fn default() -> Self { - let format_default = ParquetFormat::default(); - Self { file_extension: DEFAULT_PARQUET_EXTENSION, table_partition_cols: vec![], - parquet_pruning: format_default.enable_pruning(), - skip_metadata: format_default.skip_metadata(), } } } impl<'a> ParquetReadOptions<'a> { - /// Specify parquet_pruning - pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self { - self.parquet_pruning = parquet_pruning; - self - } - - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - pub fn skip_metadata(mut self, skip_metadata: bool) -> Self { - self.skip_metadata = skip_metadata; - self - } - /// Specify table_partition_cols for partition pruning pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { self.table_partition_cols = table_partition_cols; @@ -214,10 +189,12 @@ impl<'a> ParquetReadOptions<'a> { } /// Helper to convert these user facing options to `ListingTable` options - pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { - let file_format = ParquetFormat::default() - .with_enable_pruning(self.parquet_pruning) - .with_skip_metadata(self.skip_metadata); + pub fn to_listing_options( + &self, + config_options: Arc>, + target_partitions: usize, + ) -> ListingOptions { + let file_format = ParquetFormat::new(config_options); ListingOptions { format: Arc::new(file_format), diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index f5bd890591fd..ad3185003cba 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1164,6 +1164,7 @@ mod tests { use crate::config::ConfigOptions; use crate::datasource::file_format::parquet::test_util::store_parquet; use crate::datasource::file_format::test_util::scan_format; + use crate::datasource::file_format::FileFormat; use crate::datasource::listing::{FileRange, PartitionedFile}; use crate::datasource::object_store::ObjectStoreUrl; use crate::execution::options::CsvReadOptions; @@ -1171,8 +1172,7 @@ mod tests { use crate::test::object_store::local_unpartitioned_file; use crate::{ assert_batches_sorted_eq, assert_contains, - datasource::file_format::{parquet::ParquetFormat, FileFormat}, - physical_plan::collect, + datasource::file_format::parquet::ParquetFormat, physical_plan::collect, }; use arrow::array::Float32Array; use arrow::datatypes::DataType::Decimal128; @@ -1661,7 +1661,7 @@ mod tests { async fn parquet_exec_with_projection() -> Result<()> { let testdata = crate::test_util::parquet_test_data(); let filename = "alltypes_plain.parquet"; - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ConfigOptions::new().into_shareable()); let parquet_exec = scan_format(&format, &testdata, filename, Some(vec![0, 1, 2]), None) .await @@ -1743,7 +1743,7 @@ mod tests { let meta = local_unpartitioned_file(filename); let store = Arc::new(LocalFileSystem::new()) as _; - let file_schema = ParquetFormat::default() + let file_schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await?; @@ -1790,7 +1790,7 @@ mod tests { let meta = local_unpartitioned_file(filename); - let schema = ParquetFormat::default() + let schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await .unwrap(); @@ -2478,7 +2478,7 @@ mod tests { let meta = local_unpartitioned_file(filename); - let schema = ParquetFormat::default() + let schema = ParquetFormat::new(session_ctx.config_options()) .infer_schema(&store, &[meta.clone()]) .await .unwrap(); diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index fca9b9a43b1c..fc2d3deae49f 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -462,7 +462,8 @@ async fn register_partitioned_alltypes_parquet( MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths), ); - let mut options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let format = Arc::new(ParquetFormat::new(ctx.config_options())); + let mut options = ListingOptions::new(format); options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); options.collect_stat = true; diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 630c28a109c2..760d132dd579 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -34,7 +34,8 @@ async fn test_with_parquet() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); - let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -55,7 +56,8 @@ async fn test_with_parquet_word_aligned() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); - let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -72,6 +74,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> { } async fn get_exec( + ctx: &SessionContext, file_name: &str, projection: &Option>, limit: Option, @@ -81,7 +84,7 @@ async fn get_exec( let path = Path::from_filesystem_path(filename).unwrap(); - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ctx.config_options()); let object_store = Arc::new(LocalFileSystem::new()) as Arc; let object_store_url = ObjectStoreUrl::local_filesystem(); diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index ce318e6c2625..e68d077041f7 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -702,8 +702,11 @@ async fn show_all() { "| datafusion.execution.coalesce_batches | true |", "| datafusion.execution.coalesce_target_batch_size | 4096 |", "| datafusion.execution.parquet.enable_page_index | false |", + "| datafusion.execution.parquet.metadata_size_hint | NULL |", + "| datafusion.execution.parquet.pruning | true |", "| datafusion.execution.parquet.pushdown_filters | false |", "| datafusion.execution.parquet.reorder_filters | false |", + "| datafusion.execution.parquet.skip_metadata | true |", "| datafusion.execution.time_zone | UTC |", "| datafusion.explain.logical_plan_only | false |", "| datafusion.explain.physical_plan_only | false |", diff --git a/datafusion/core/tests/sql/parquet_schema.rs b/datafusion/core/tests/sql/parquet_schema.rs index b5a891268d02..4b03158e698b 100644 --- a/datafusion/core/tests/sql/parquet_schema.rs +++ b/datafusion/core/tests/sql/parquet_schema.rs @@ -23,6 +23,7 @@ use std::{ }; use ::parquet::arrow::ArrowWriter; +use datafusion::config::OPT_PARQUET_SKIP_METADATA; use tempfile::TempDir; use super::*; @@ -106,9 +107,6 @@ async fn schema_merge_can_preserve_metadata() { let tmp_dir = TempDir::new().unwrap(); let table_dir = tmp_dir.path().join("parquet_test"); - // explicitly disable schema clearing - let options = ParquetReadOptions::default().skip_metadata(false); - let f1 = Field::new("id", DataType::Int32, true); let f2 = Field::new("name", DataType::Utf8, true); @@ -141,6 +139,13 @@ async fn schema_merge_can_preserve_metadata() { let table_path = table_dir.to_str().unwrap().to_string(); let ctx = SessionContext::new(); + + // explicitly disable schema clearing + ctx.config_options() + .write() + .set_bool(OPT_PARQUET_SKIP_METADATA, false); + + let options = ParquetReadOptions::default(); let df = ctx .read_parquet(&table_path, options.clone()) .await diff --git a/datafusion/proto/src/logical_plan.rs b/datafusion/proto/src/logical_plan.rs index d61bb2d65bae..74e04752a5b9 100644 --- a/datafusion/proto/src/logical_plan.rs +++ b/datafusion/proto/src/logical_plan.rs @@ -368,7 +368,8 @@ impl AsLogicalPlan for LogicalPlanNode { &FileFormatType::Parquet(protobuf::ParquetFormat { enable_pruning, }) => Arc::new( - ParquetFormat::default().with_enable_pruning(enable_pruning), + ParquetFormat::new(ctx.config_options()) + .with_enable_pruning(enable_pruning), ), FileFormatType::Csv(protobuf::CsvFormat { has_header, From e5e3be07634325b2ba7d3607591e7fe41b978e54 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Mon, 24 Oct 2022 17:10:12 -0400 Subject: [PATCH 2/2] fix: type should be u64 --- datafusion/core/src/config.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index b7a328983ea2..8b7f3e02cc32 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -267,8 +267,8 @@ impl BuiltInConfigs { bytes of the parquet file optimistically. If not specified, two read are required: \ One read to fetch the 8-byte parquet footer and \ another to fetch the metadata length encoded in the footer.", - DataType::Boolean, - ScalarValue::Boolean(None), + DataType::UInt64, + ScalarValue::UInt64(None), ), ConfigDefinition::new_bool( OPT_OPTIMIZER_SKIP_FAILED_RULES,