From cc6c3db8d1fe0d09db49051f18be149156206425 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Tue, 18 Oct 2022 12:50:20 -0400 Subject: [PATCH] Consolidate remaining parquet config options into ConfigOptions --- datafusion-examples/examples/flight_server.rs | 5 +- .../examples/parquet_sql_multiple_files.rs | 2 +- datafusion/core/src/config.rs | 32 +++++ .../src/datasource/file_format/parquet.rs | 136 +++++++++++------- .../core/src/datasource/listing/table.rs | 16 ++- datafusion/core/src/execution/context.rs | 21 +-- datafusion/core/src/execution/options.rs | 39 ++--- .../src/physical_plan/file_format/parquet.rs | 3 +- datafusion/core/tests/path_partition.rs | 3 +- datafusion/core/tests/row.rs | 9 +- .../core/tests/sql/information_schema.rs | 3 + datafusion/core/tests/sql/parquet_schema.rs | 11 +- 12 files changed, 175 insertions(+), 105 deletions(-) diff --git a/datafusion-examples/examples/flight_server.rs b/datafusion-examples/examples/flight_server.rs index 88cc6f1c23be4..16463185fab26 100644 --- a/datafusion-examples/examples/flight_server.rs +++ b/datafusion-examples/examples/flight_server.rs @@ -67,11 +67,12 @@ impl FlightService for FlightServiceImpl { ) -> Result, Status> { let request = request.into_inner(); - let listing_options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let ctx = SessionContext::new(); + let format = Arc::new(ParquetFormat::new(ctx.config_options())); + let listing_options = ListingOptions::new(format); let table_path = ListingTableUrl::parse(&request.path[0]).map_err(to_tonic_err)?; - let ctx = SessionContext::new(); let schema = listing_options .infer_schema(&ctx.state(), &table_path) .await diff --git a/datafusion-examples/examples/parquet_sql_multiple_files.rs b/datafusion-examples/examples/parquet_sql_multiple_files.rs index 6004ce67df29a..8bf2ea70891f3 100644 --- a/datafusion-examples/examples/parquet_sql_multiple_files.rs +++ b/datafusion-examples/examples/parquet_sql_multiple_files.rs @@ -32,7 +32,7 @@ async fn main() -> Result<()> { let testdata = datafusion::test_util::parquet_test_data(); // Configure listing options - let file_format = ParquetFormat::default().with_enable_pruning(true); + let file_format = ParquetFormat::new(ctx.config_options()); let listing_options = ListingOptions { file_extension: FileType::PARQUET.get_ext(), format: Arc::new(file_format), diff --git a/datafusion/core/src/config.rs b/datafusion/core/src/config.rs index b95c12d3ba1a8..1389e93dfca91 100644 --- a/datafusion/core/src/config.rs +++ b/datafusion/core/src/config.rs @@ -60,6 +60,16 @@ pub const OPT_PARQUET_REORDER_FILTERS: &str = pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str = "datafusion.execution.parquet.enable_page_index"; +/// Configuration option "datafusion.execution.parquet.pruning" +pub const OPT_PARQUET_ENABLE_PRUNING: &str = "datafusion.execution.parquet.pruning"; + +/// Configuration option "datafusion.execution.parquet.skip_metadata" +pub const OPT_PARQUET_SKIP_METADATA: &str = "datafusion.execution.parquet.skip_metadata"; + +/// Configuration option "datafusion.execution.parquet.metadata_size_hint" +pub const OPT_PARQUET_METADATA_SIZE_HINT: &str = + "datafusion.execution.parquet.metadata_size_hint"; + /// Configuration option "datafusion.optimizer.skip_failed_rules" pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str = "datafusion.optimizer.skip_failed_rules"; @@ -234,6 +244,28 @@ impl BuiltInConfigs { to reduce the number of rows decoded.", false, ), + ConfigDefinition::new_bool( + OPT_PARQUET_ENABLE_PRUNING, + "If true, the parquet reader attempts to skip entire row groups based \ + on the predicate in the query.", + true, + ), + ConfigDefinition::new_bool( + OPT_PARQUET_SKIP_METADATA, + "If true, the parquet reader skip the optional embedded metadata that may be in \ + the file Schema. This setting can help avoid schema conflicts when querying \ + multiple parquet files with schemas containing compatible types but different metadata.", + true, + ), + ConfigDefinition::new( + OPT_PARQUET_METADATA_SIZE_HINT, + "If specified, the parquet reader will try and fetch the last `size_hint` \ + bytes of the parquet file optimistically. If not specified, two read are required: \ + One read to fetch the 8-byte parquet footer and \ + another to fetch the metadata length encoded in the footer.", + DataType::Boolean, + ScalarValue::Boolean(None), + ), ConfigDefinition::new_bool( OPT_OPTIMIZER_SKIP_FAILED_RULES, "When set to true, the logical plan optimizer will produce warning \ diff --git a/datafusion/core/src/datasource/file_format/parquet.rs b/datafusion/core/src/datasource/file_format/parquet.rs index 07819bdf52cb2..5c97a34f74394 100644 --- a/datafusion/core/src/datasource/file_format/parquet.rs +++ b/datafusion/core/src/datasource/file_format/parquet.rs @@ -28,6 +28,7 @@ use datafusion_common::DataFusionError; use datafusion_optimizer::utils::conjunction; use hashbrown::HashMap; use object_store::{ObjectMeta, ObjectStore}; +use parking_lot::RwLock; use parquet::arrow::parquet_to_arrow_schema; use parquet::file::footer::{decode_footer, decode_metadata}; use parquet::file::metadata::ParquetMetaData; @@ -39,6 +40,10 @@ use crate::arrow::array::{ BooleanArray, Float32Array, Float64Array, Int32Array, Int64Array, }; use crate::arrow::datatypes::{DataType, Field}; +use crate::config::ConfigOptions; +use crate::config::OPT_PARQUET_ENABLE_PRUNING; +use crate::config::OPT_PARQUET_METADATA_SIZE_HINT; +use crate::config::OPT_PARQUET_SKIP_METADATA; use crate::datasource::{create_max_min_accs, get_col_stats}; use crate::error::Result; use crate::logical_expr::Expr; @@ -52,26 +57,23 @@ pub const DEFAULT_PARQUET_EXTENSION: &str = ".parquet"; /// The Apache Parquet `FileFormat` implementation #[derive(Debug)] pub struct ParquetFormat { - enable_pruning: bool, - metadata_size_hint: Option, - skip_metadata: bool, + config_options: Arc>, } -impl Default for ParquetFormat { - fn default() -> Self { - Self { - enable_pruning: true, - metadata_size_hint: None, - skip_metadata: true, - } +impl ParquetFormat { + /// construct a new Format with the specified `ConfigOptions` + pub fn new(config_options: Arc>) -> Self { + Self { config_options } } } impl ParquetFormat { /// Activate statistics based row group level pruning /// - defaults to true - pub fn with_enable_pruning(mut self, enable: bool) -> Self { - self.enable_pruning = enable; + pub fn with_enable_pruning(self, enable: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_PARQUET_ENABLE_PRUNING, enable); self } @@ -79,32 +81,46 @@ impl ParquetFormat { /// the reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. /// With out a hint, two read are required. One read to fetch the 8-byte parquet footer and then /// another read to fetch the metadata length encoded in the footer. - pub fn with_metadata_size_hint(mut self, size_hint: usize) -> Self { - self.metadata_size_hint = Some(size_hint); + pub fn with_metadata_size_hint(self, size_hint: usize) -> Self { + self.config_options + .write() + .set_u64(OPT_PARQUET_METADATA_SIZE_HINT, size_hint as u64); + self + } + + /// Tell the parquet reader to skip any metadata that may be in + /// the file Schema. This can help avoid schema conflicts due to + /// metadata. Defaults to true. + pub fn with_skip_metadata(self, skip_metadata: bool) -> Self { + self.config_options + .write() + .set_bool(OPT_PARQUET_SKIP_METADATA, skip_metadata); self } + /// Return true if pruning is enabled pub fn enable_pruning(&self) -> bool { - self.enable_pruning + self.config_options + .read() + .get_bool(OPT_PARQUET_ENABLE_PRUNING) + .unwrap_or(false) } /// Return the metadata size hint if set pub fn metadata_size_hint(&self) -> Option { - self.metadata_size_hint - } - - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - pub fn with_skip_metadata(mut self, skip_metadata: bool) -> Self { - self.skip_metadata = skip_metadata; - self + self.config_options + .read() + .get_u64(OPT_PARQUET_METADATA_SIZE_HINT) + .map(|u| u as usize) } /// returns true if schema metadata will be cleared prior to /// schema merging. pub fn skip_metadata(&self) -> bool { - self.skip_metadata + self.config_options + .read() + .get_bool(OPT_PARQUET_SKIP_METADATA) + .unwrap_or(false) } } @@ -139,11 +155,11 @@ impl FileFormat for ParquetFormat { let mut schemas = Vec::with_capacity(objects.len()); for object in objects { let schema = - fetch_schema(store.as_ref(), object, self.metadata_size_hint).await?; + fetch_schema(store.as_ref(), object, self.metadata_size_hint()).await?; schemas.push(schema) } - let schema = if self.skip_metadata { + let schema = if self.skip_metadata() { Schema::try_merge(clear_metadata(schemas)) } else { Schema::try_merge(schemas) @@ -162,7 +178,7 @@ impl FileFormat for ParquetFormat { store.as_ref(), table_schema, object, - self.metadata_size_hint, + self.metadata_size_hint(), ) .await?; Ok(stats) @@ -176,7 +192,7 @@ impl FileFormat for ParquetFormat { // If enable pruning then combine the filters to build the predicate. // If disable pruning then set the predicate to None, thus readers // will not prune data based on the statistics. - let predicate = if self.enable_pruning { + let predicate = if self.enable_pruning() { conjunction(filters.to_vec()) } else { None @@ -601,7 +617,9 @@ mod tests { let store = Arc::new(LocalFileSystem::new()) as _; let (meta, _files) = store_parquet(vec![batch1, batch2]).await?; - let format = ParquetFormat::default(); + let ctx = SessionContext::new(); + let config_options = ctx.config_options(); + let format = ParquetFormat::new(config_options); let schema = format.infer_schema(&store, &meta).await.unwrap(); let stats = @@ -748,7 +766,12 @@ mod tests { assert_eq!(store.request_count(), 2); - let format = ParquetFormat::default().with_metadata_size_hint(9); + let ctx = SessionContext::new(); + let config_options = ctx.config_options(); + config_options + .write() + .set_u64(OPT_PARQUET_METADATA_SIZE_HINT, 9); + let format = ParquetFormat::default(config_options); let schema = format.infer_schema(&store.upcast(), &meta).await.unwrap(); let stats = @@ -812,7 +835,7 @@ mod tests { let config = SessionConfig::new().with_batch_size(2); let ctx = SessionContext::with_config(config); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let task_ctx = ctx.task_ctx(); let stream = exec.execute(0, task_ctx)?; @@ -841,11 +864,12 @@ mod tests { // Read the full file let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; // Read only one column. This should scan less data. let projection = Some(vec![0]); - let exec_projected = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec_projected = + get_exec(&ctx, "alltypes_plain.parquet", projection, None).await?; let task_ctx = ctx.task_ctx(); @@ -863,7 +887,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, Some(1)).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, Some(1)).await?; // note: even if the limit is set, the executor rounds up to the batch size assert_eq!(exec.statistics().num_rows, Some(8)); @@ -882,7 +907,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = None; - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let x: Vec = exec .schema() @@ -920,7 +946,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![1]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -950,7 +977,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -977,7 +1005,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![10]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1004,7 +1033,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![6]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1034,7 +1064,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![7]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1064,7 +1095,8 @@ mod tests { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![9]); - let exec = get_exec("alltypes_plain.parquet", projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", projection, None).await?; let batches = collect(exec, task_ctx).await?; assert_eq!(1, batches.len()); @@ -1095,7 +1127,7 @@ mod tests { let task_ctx = session_ctx.task_ctx(); // parquet use the int32 as the physical type to store decimal - let exec = get_exec("int32_decimal.parquet", None, None).await?; + let exec = get_exec(&session_ctx, "int32_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1103,7 +1135,7 @@ mod tests { assert_eq!(&DataType::Decimal128(4, 2), column.data_type()); // parquet use the int64 as the physical type to store decimal - let exec = get_exec("int64_decimal.parquet", None, None).await?; + let exec = get_exec(&session_ctx, "int64_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1111,14 +1143,21 @@ mod tests { assert_eq!(&DataType::Decimal128(10, 2), column.data_type()); // parquet use the fixed length binary as the physical type to store decimal - let exec = get_exec("fixed_length_decimal.parquet", None, None).await?; + let exec = + get_exec(&session_ctx, "fixed_length_decimal.parquet", None, None).await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); let column = batches[0].column(0); assert_eq!(&DataType::Decimal128(25, 2), column.data_type()); - let exec = get_exec("fixed_length_decimal_legacy.parquet", None, None).await?; + let exec = get_exec( + &session_ctx, + "fixed_length_decimal_legacy.parquet", + None, + None, + ) + .await?; let batches = collect(exec, task_ctx.clone()).await?; assert_eq!(1, batches.len()); assert_eq!(1, batches[0].num_columns()); @@ -1128,7 +1167,7 @@ mod tests { // parquet use the fixed length binary as the physical type to store decimal // TODO: arrow-rs don't support convert the physical type of binary to decimal // https://github.com/apache/arrow-rs/pull/2160 - // let exec = get_exec("byte_array_decimal.parquet", None, None).await?; + // let exec = get_exec(&session_ctx, "byte_array_decimal.parquet", None, None).await?; Ok(()) } @@ -1212,12 +1251,13 @@ mod tests { } async fn get_exec( + ctx: &SessionContext, file_name: &str, projection: Option>, limit: Option, ) -> Result> { let testdata = crate::test_util::parquet_test_data(); - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ctx.config_options()); scan_format(&format, &testdata, file_name, projection, limit).await } } diff --git a/datafusion/core/src/datasource/listing/table.rs b/datafusion/core/src/datasource/listing/table.rs index deaa09249e110..c3a4528377a44 100644 --- a/datafusion/core/src/datasource/listing/table.rs +++ b/datafusion/core/src/datasource/listing/table.rs @@ -28,6 +28,7 @@ use object_store::path::Path; use object_store::ObjectMeta; use parking_lot::RwLock; +use crate::config::ConfigOptions; use crate::datasource::file_format::file_type::{FileCompressionType, FileType}; use crate::datasource::{ file_format::{ @@ -104,7 +105,10 @@ impl ListingTableConfig { } } - fn infer_format(path: &str) -> Result<(Arc, String)> { + fn infer_format( + config_options: Arc>, + path: &str, + ) -> Result<(Arc, String)> { let err_msg = format!("Unable to infer file type from path: {}", path); let mut exts = path.rsplit('.'); @@ -133,7 +137,7 @@ impl ListingTableConfig { FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::PARQUET => Arc::new(ParquetFormat::new(config_options)), }; Ok((file_format, ext)) @@ -154,8 +158,10 @@ impl ListingTableConfig { .await .ok_or_else(|| DataFusionError::Internal("No files for table".into()))??; - let (format, file_extension) = - ListingTableConfig::infer_format(file.location.as_ref())?; + let (format, file_extension) = ListingTableConfig::infer_format( + ctx.config.config_options(), + file.location.as_ref(), + )?; let listing_options = ListingOptions { format, @@ -538,7 +544,7 @@ mod tests { let ctx = SessionContext::new(); let state = ctx.state(); - let opt = ListingOptions::new(Arc::new(ParquetFormat::default())); + let opt = ListingOptions::new(Arc::new(ParquetFormat::new(ctx.config_options()))); let schema = opt.infer_schema(&state, &table_path).await?; let config = ListingTableConfig::new(table_path) .with_listing_options(opt) diff --git a/datafusion/core/src/execution/context.rs b/datafusion/core/src/execution/context.rs index c50f79426c8f4..d900906e1a9d9 100644 --- a/datafusion/core/src/execution/context.rs +++ b/datafusion/core/src/execution/context.rs @@ -212,6 +212,11 @@ impl SessionContext { self.state.read().runtime_env.clone() } + /// Return a handle to the shared configuration options + pub fn config_options(&self) -> Arc> { + self.state.read().config.config_options() + } + /// Return the session_id of this Session pub fn session_id(&self) -> String { self.session_id.clone() @@ -465,7 +470,7 @@ impl SessionContext { .with_delimiter(cmd.delimiter as u8) .with_file_compression_type(file_compression_type), ), - FileType::PARQUET => Arc::new(ParquetFormat::default()), + FileType::PARQUET => Arc::new(ParquetFormat::new(self.config_options())), FileType::AVRO => Arc::new(AvroFormat::default()), FileType::JSON => Arc::new( JsonFormat::default().with_file_compression_type(file_compression_type), @@ -680,7 +685,8 @@ impl SessionContext { let table_path = ListingTableUrl::parse(table_path)?; let target_partitions = self.copied_config().target_partitions; - let listing_options = options.to_listing_options(target_partitions); + let listing_options = + options.to_listing_options(self.config_options(), target_partitions); // with parquet we resolve the schema in all cases let resolved_schema = listing_options @@ -799,13 +805,10 @@ impl SessionContext { table_path: &str, options: ParquetReadOptions<'_>, ) -> Result<()> { - let (target_partitions, parquet_pruning) = { - let conf = self.copied_config(); - (conf.target_partitions, conf.parquet_pruning) - }; - let listing_options = options - .parquet_pruning(parquet_pruning) - .to_listing_options(target_partitions); + let listing_options = options.to_listing_options( + self.config_options(), + self.copied_config().target_partitions, + ); self.register_listing_table(name, table_path, listing_options, None, None) .await?; diff --git a/datafusion/core/src/execution/options.rs b/datafusion/core/src/execution/options.rs index 150a20670ce2d..bc1837e5171ff 100644 --- a/datafusion/core/src/execution/options.rs +++ b/datafusion/core/src/execution/options.rs @@ -20,7 +20,9 @@ use std::sync::Arc; use arrow::datatypes::{Schema, SchemaRef}; +use parking_lot::RwLock; +use crate::config::ConfigOptions; use crate::datasource::file_format::avro::DEFAULT_AVRO_EXTENSION; use crate::datasource::file_format::csv::DEFAULT_CSV_EXTENSION; use crate::datasource::file_format::file_type::FileCompressionType; @@ -168,45 +170,18 @@ pub struct ParquetReadOptions<'a> { pub file_extension: &'a str, /// Partition Columns pub table_partition_cols: Vec, - /// Should DataFusion parquet reader use the predicate to prune data, - /// overridden by value on execution::context::SessionConfig - // TODO move this into ConfigOptions - pub parquet_pruning: bool, - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - // TODO move this into ConfigOptions - pub skip_metadata: bool, } impl<'a> Default for ParquetReadOptions<'a> { fn default() -> Self { - let format_default = ParquetFormat::default(); - Self { file_extension: DEFAULT_PARQUET_EXTENSION, table_partition_cols: vec![], - parquet_pruning: format_default.enable_pruning(), - skip_metadata: format_default.skip_metadata(), } } } impl<'a> ParquetReadOptions<'a> { - /// Specify parquet_pruning - pub fn parquet_pruning(mut self, parquet_pruning: bool) -> Self { - self.parquet_pruning = parquet_pruning; - self - } - - /// Tell the parquet reader to skip any metadata that may be in - /// the file Schema. This can help avoid schema conflicts due to - /// metadata. Defaults to true. - pub fn skip_metadata(mut self, skip_metadata: bool) -> Self { - self.skip_metadata = skip_metadata; - self - } - /// Specify table_partition_cols for partition pruning pub fn table_partition_cols(mut self, table_partition_cols: Vec) -> Self { self.table_partition_cols = table_partition_cols; @@ -214,10 +189,12 @@ impl<'a> ParquetReadOptions<'a> { } /// Helper to convert these user facing options to `ListingTable` options - pub fn to_listing_options(&self, target_partitions: usize) -> ListingOptions { - let file_format = ParquetFormat::default() - .with_enable_pruning(self.parquet_pruning) - .with_skip_metadata(self.skip_metadata); + pub fn to_listing_options( + &self, + config_options: Arc>, + target_partitions: usize, + ) -> ListingOptions { + let file_format = ParquetFormat::new(config_options); ListingOptions { format: Arc::new(file_format), diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index f5bd890591fd5..68b00a3ef08ce 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -1171,8 +1171,7 @@ mod tests { use crate::test::object_store::local_unpartitioned_file; use crate::{ assert_batches_sorted_eq, assert_contains, - datasource::file_format::{parquet::ParquetFormat, FileFormat}, - physical_plan::collect, + datasource::file_format::parquet::ParquetFormat, physical_plan::collect, }; use arrow::array::Float32Array; use arrow::datatypes::DataType::Decimal128; diff --git a/datafusion/core/tests/path_partition.rs b/datafusion/core/tests/path_partition.rs index fca9b9a43b1c4..fc2d3deae49f1 100644 --- a/datafusion/core/tests/path_partition.rs +++ b/datafusion/core/tests/path_partition.rs @@ -462,7 +462,8 @@ async fn register_partitioned_alltypes_parquet( MirroringObjectStore::new_arc(parquet_file_path.clone(), store_paths), ); - let mut options = ListingOptions::new(Arc::new(ParquetFormat::default())); + let format = Arc::new(ParquetFormat::new(ctx.config_options())); + let mut options = ListingOptions::new(format); options.table_partition_cols = partition_cols.iter().map(|&s| s.to_owned()).collect(); options.collect_stat = true; diff --git a/datafusion/core/tests/row.rs b/datafusion/core/tests/row.rs index 630c28a109c22..760d132dd5791 100644 --- a/datafusion/core/tests/row.rs +++ b/datafusion/core/tests/row.rs @@ -34,7 +34,8 @@ async fn test_with_parquet() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); - let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -55,7 +56,8 @@ async fn test_with_parquet_word_aligned() -> Result<()> { let session_ctx = SessionContext::new(); let task_ctx = session_ctx.task_ctx(); let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7]); - let exec = get_exec("alltypes_plain.parquet", &projection, None).await?; + let exec = + get_exec(&session_ctx, "alltypes_plain.parquet", &projection, None).await?; let schema = exec.schema().clone(); let batches = collect(exec, task_ctx).await?; @@ -72,6 +74,7 @@ async fn test_with_parquet_word_aligned() -> Result<()> { } async fn get_exec( + ctx: &SessionContext, file_name: &str, projection: &Option>, limit: Option, @@ -81,7 +84,7 @@ async fn get_exec( let path = Path::from_filesystem_path(filename).unwrap(); - let format = ParquetFormat::default(); + let format = ParquetFormat::new(ctx.config_options()); let object_store = Arc::new(LocalFileSystem::new()) as Arc; let object_store_url = ObjectStoreUrl::local_filesystem(); diff --git a/datafusion/core/tests/sql/information_schema.rs b/datafusion/core/tests/sql/information_schema.rs index 873ead46258b5..11a4be55aae47 100644 --- a/datafusion/core/tests/sql/information_schema.rs +++ b/datafusion/core/tests/sql/information_schema.rs @@ -702,8 +702,11 @@ async fn show_all() { "| datafusion.execution.coalesce_batches | true |", "| datafusion.execution.coalesce_target_batch_size | 4096 |", "| datafusion.execution.parquet.enable_page_index | false |", + "| datafusion.execution.parquet.metadata_size_hint | NULL |", + "| datafusion.execution.parquet.pruning | true |", "| datafusion.execution.parquet.pushdown_filters | false |", "| datafusion.execution.parquet.reorder_filters | false |", + "| datafusion.execution.parquet.skip_metadata | true |", "| datafusion.execution.time_zone | UTC |", "| datafusion.explain.logical_plan_only | false |", "| datafusion.explain.physical_plan_only | false |", diff --git a/datafusion/core/tests/sql/parquet_schema.rs b/datafusion/core/tests/sql/parquet_schema.rs index b5a891268d023..4b03158e698b4 100644 --- a/datafusion/core/tests/sql/parquet_schema.rs +++ b/datafusion/core/tests/sql/parquet_schema.rs @@ -23,6 +23,7 @@ use std::{ }; use ::parquet::arrow::ArrowWriter; +use datafusion::config::OPT_PARQUET_SKIP_METADATA; use tempfile::TempDir; use super::*; @@ -106,9 +107,6 @@ async fn schema_merge_can_preserve_metadata() { let tmp_dir = TempDir::new().unwrap(); let table_dir = tmp_dir.path().join("parquet_test"); - // explicitly disable schema clearing - let options = ParquetReadOptions::default().skip_metadata(false); - let f1 = Field::new("id", DataType::Int32, true); let f2 = Field::new("name", DataType::Utf8, true); @@ -141,6 +139,13 @@ async fn schema_merge_can_preserve_metadata() { let table_path = table_dir.to_str().unwrap().to_string(); let ctx = SessionContext::new(); + + // explicitly disable schema clearing + ctx.config_options() + .write() + .set_bool(OPT_PARQUET_SKIP_METADATA, false); + + let options = ParquetReadOptions::default(); let df = ctx .read_parquet(&table_path, options.clone()) .await