Skip to content

Commit

Permalink
Expose parquet reader settings using normal DataFusion ConfigOptions (
Browse files Browse the repository at this point in the history
#3822)

* Expose parquet reader settings as DataFusion config settings

* fix logical conflit

* Update tests
  • Loading branch information
alamb authored Oct 19, 2022
1 parent e8ea218 commit 6e0097d
Show file tree
Hide file tree
Showing 16 changed files with 250 additions and 102 deletions.
56 changes: 41 additions & 15 deletions benchmarks/src/bin/parquet_filter_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,18 @@ use arrow::datatypes::{DataType, Field, Int32Type, Schema, SchemaRef, TimeUnit};
use arrow::record_batch::RecordBatch;
use arrow::util::pretty;
use datafusion::common::{Result, ToDFSchema};
use datafusion::config::{
ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
OPT_PARQUET_REORDER_FILTERS,
};
use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
use datafusion::physical_plan::file_format::{
FileScanConfig, ParquetExec, ParquetScanOptions,
};
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
use datafusion::physical_plan::filter::FilterExec;
use datafusion::prelude::{col, SessionConfig, SessionContext};
use object_store::path::Path;
Expand Down Expand Up @@ -109,6 +111,13 @@ async fn main() -> Result<()> {
Ok(())
}

#[derive(Debug, Clone)]
struct ParquetScanOptions {
pushdown_filters: bool,
reorder_filters: bool,
enable_page_index: bool,
}

async fn run_benchmarks(
ctx: &mut SessionContext,
object_store_url: ObjectStoreUrl,
Expand All @@ -117,15 +126,21 @@ async fn run_benchmarks(
debug: bool,
) -> Result<()> {
let scan_options_matrix = vec![
ParquetScanOptions::default(),
ParquetScanOptions::default()
.with_page_index(true)
.with_pushdown_filters(true)
.with_reorder_predicates(true),
ParquetScanOptions::default()
.with_page_index(true)
.with_pushdown_filters(true)
.with_reorder_predicates(false),
ParquetScanOptions {
pushdown_filters: false,
reorder_filters: false,
enable_page_index: false,
},
ParquetScanOptions {
pushdown_filters: true,
reorder_filters: true,
enable_page_index: true,
},
ParquetScanOptions {
pushdown_filters: true,
reorder_filters: true,
enable_page_index: false,
},
];

let filter_matrix = vec![
Expand Down Expand Up @@ -193,6 +208,18 @@ async fn exec_scan(
debug: bool,
) -> Result<usize> {
let schema = BatchBuilder::schema();

let ParquetScanOptions {
pushdown_filters,
reorder_filters,
enable_page_index,
} = scan_options;

let mut config_options = ConfigOptions::new();
config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);

let scan_config = FileScanConfig {
object_store_url,
file_schema: schema.clone(),
Expand All @@ -206,6 +233,7 @@ async fn exec_scan(
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: config_options.into_shareable(),
};

let df_schema = schema.clone().to_dfschema()?;
Expand All @@ -217,9 +245,7 @@ async fn exec_scan(
&ExecutionProps::default(),
)?;

let parquet_exec = Arc::new(
ParquetExec::new(scan_config, Some(filter), None).with_scan_options(scan_options),
);
let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));

let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);

Expand Down
78 changes: 60 additions & 18 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use itertools::Itertools;
use log::warn;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::env;
use std::sync::Arc;

/// Configuration option "datafusion.optimizer.filter_null_join_keys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filter_null_join_keys";
Expand All @@ -43,13 +45,25 @@ pub const OPT_COALESCE_BATCHES: &str = "datafusion.execution.coalesce_batches";
pub const OPT_COALESCE_TARGET_BATCH_SIZE: &str =
"datafusion.execution.coalesce_target_batch_size";

/// Configuration option "datafusion.execution.time_zone"
pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";

/// Configuration option "datafusion.execution.parquet.pushdown_filters"
pub const OPT_PARQUET_PUSHDOWN_FILTERS: &str =
"datafusion.execution.parquet.pushdown_filters";

/// Configuration option "datafusion.execution.parquet.reorder_filters"
pub const OPT_PARQUET_REORDER_FILTERS: &str =
"datafusion.execution.parquet.reorder_filters";

/// Configuration option "datafusion.execution.parquet.enable_page_index"
pub const OPT_PARQUET_ENABLE_PAGE_INDEX: &str =
"datafusion.execution.parquet.enable_page_index";

/// Configuration option "datafusion.optimizer.skip_failed_rules"
pub const OPT_OPTIMIZER_SKIP_FAILED_RULES: &str =
"datafusion.optimizer.skip_failed_rules";

/// Configuration option "datafusion.execution.time_zone"
pub const OPT_TIME_ZONE: &str = "datafusion.execution.time_zone";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
Expand Down Expand Up @@ -173,11 +187,11 @@ impl BuiltInConfigs {
false,
),
ConfigDefinition::new_u64(
OPT_BATCH_SIZE,
"Default batch size while creating new batches, it's especially useful for \
buffer-in-memory batches since creating tiny batches would results in too much metadata \
memory consumption.",
8192,
OPT_BATCH_SIZE,
"Default batch size while creating new batches, it's especially useful for \
buffer-in-memory batches since creating tiny batches would results in too much metadata \
memory consumption.",
8192,
),
ConfigDefinition::new_bool(
OPT_COALESCE_BATCHES,
Expand All @@ -191,23 +205,43 @@ impl BuiltInConfigs {
ConfigDefinition::new_u64(
OPT_COALESCE_TARGET_BATCH_SIZE,
format!("Target batch size when coalescing batches. Uses in conjunction with the \
configuration setting '{}'.", OPT_COALESCE_BATCHES),
configuration setting '{}'.", OPT_COALESCE_BATCHES),
4096,
),
ConfigDefinition::new_string(
OPT_TIME_ZONE,
"The session time zone which some function require \
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
then extract the hour.",
"UTC".into()
),
ConfigDefinition::new_bool(
OPT_PARQUET_PUSHDOWN_FILTERS,
"If true, filter expressions are be applied during the parquet decoding operation to \
reduce the number of rows decoded.",
false,
),
ConfigDefinition::new_bool(
OPT_PARQUET_REORDER_FILTERS,
"If true, filter expressions evaluated during the parquet decoding opearation \
will be reordered heuristically to minimize the cost of evaluation. If false, \
the filters are applied in the same order as written in the query.",
false,
),
ConfigDefinition::new_bool(
OPT_PARQUET_ENABLE_PAGE_INDEX,
"If true, uses parquet data page level metadata (Page Index) statistics \
to reduce the number of rows decoded.",
false,
),
ConfigDefinition::new_bool(
OPT_OPTIMIZER_SKIP_FAILED_RULES,
"When set to true, the logical plan optimizer will produce warning \
messages if any optimization rules produce errors and then proceed to the next \
rule. When set to false, any rules that produce errors will cause the query to fail.",
true
),
ConfigDefinition::new_string(
OPT_TIME_ZONE,
"The session time zone which some function require \
e.g. EXTRACT(HOUR from SOME_TIME) shift the underline datetime according to the time zone,
then extract the hour",
"UTC".into()
)]
]
}
}

Expand Down Expand Up @@ -255,8 +289,16 @@ impl ConfigOptions {
Self { options }
}

/// Create new ConfigOptions struct, taking values from environment variables where possible.
/// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` to control `datafusion.execution.batch_size`.
/// Create a new [`ConfigOptions`] wrapped in an RwLock and Arc
pub fn into_shareable(self) -> Arc<RwLock<Self>> {
Arc::new(RwLock::new(self))
}

/// Create new ConfigOptions struct, taking values from
/// environment variables where possible.
///
/// For example, setting `DATAFUSION_EXECUTION_BATCH_SIZE` will
/// control `datafusion.execution.batch_size`.
pub fn from_env() -> Self {
let built_in = BuiltInConfigs::new();
let mut options = HashMap::with_capacity(built_in.config_definitions.len());
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub trait FileFormat: Send + Sync + fmt::Debug {
#[cfg(test)]
pub(crate) mod test_util {
use super::*;
use crate::config::ConfigOptions;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::test::object_store::local_unpartitioned_file;
Expand Down Expand Up @@ -122,6 +123,7 @@ pub(crate) mod test_util {
projection,
limit,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
},
&[],
)
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,7 @@ impl TableProvider for ListingTable {
projection: projection.clone(),
limit,
table_partition_cols: self.options.table_partition_cols.clone(),
config_options: ctx.config.config_options(),
},
filters,
)
Expand Down
9 changes: 8 additions & 1 deletion datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1184,7 +1184,7 @@ impl SessionConfig {
/// Create an execution config with config options read from the environment
pub fn from_env() -> Self {
Self {
config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
config_options: ConfigOptions::from_env().into_shareable(),
..Default::default()
}
}
Expand Down Expand Up @@ -1324,6 +1324,13 @@ impl SessionConfig {
map
}

/// Return a handle to the shared configuration options.
///
/// [`config_options`]: SessionContext::config_option
pub fn config_options(&self) -> Arc<RwLock<ConfigOptions>> {
self.config_options.clone()
}

/// Add extensions.
///
/// Extensions can be used to attach extra data to the session config -- e.g. tracing information or caches.
Expand Down
30 changes: 26 additions & 4 deletions datafusion/core/src/execution/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ use crate::datasource::{
listing::ListingOptions,
};

/// CSV file read option
/// Options that control the reading of CSV files.
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct CsvReadOptions<'a> {
/// Does the CSV file have a header?
Expand Down Expand Up @@ -150,7 +155,12 @@ impl<'a> CsvReadOptions<'a> {
}
}

/// Parquet read options
/// Options that control the reading of Parquet files.
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct ParquetReadOptions<'a> {
/// File extension; only files with this extension are selected for data input.
Expand All @@ -160,10 +170,12 @@ pub struct ParquetReadOptions<'a> {
pub table_partition_cols: Vec<String>,
/// Should DataFusion parquet reader use the predicate to prune data,
/// overridden by value on execution::context::SessionConfig
// TODO move this into ConfigOptions
pub parquet_pruning: bool,
/// Tell the parquet reader to skip any metadata that may be in
/// the file Schema. This can help avoid schema conflicts due to
/// metadata. Defaults to true.
// TODO move this into ConfigOptions
pub skip_metadata: bool,
}

Expand Down Expand Up @@ -217,7 +229,12 @@ impl<'a> ParquetReadOptions<'a> {
}
}

/// Avro read options
/// Options that control the reading of AVRO files.
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct AvroReadOptions<'a> {
/// The data source schema.
Expand Down Expand Up @@ -261,7 +278,12 @@ impl<'a> AvroReadOptions<'a> {
}
}

/// Line-delimited JSON read options
/// Options that control the reading of Line-delimited JSON files (NDJson)
///
/// Note this structure is supplied when a datasource is created and
/// can not not vary from statement to statement. For settings that
/// can vary statement to statement see
/// [`ConfigOptions`](crate::config::ConfigOptions).
#[derive(Clone)]
pub struct NdJsonReadOptions<'a> {
/// The data source schema.
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_optimizer/repartition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use super::*;
use crate::config::ConfigOptions;
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
use crate::physical_plan::aggregates::{
Expand Down Expand Up @@ -269,6 +270,7 @@ mod tests {
projection: None,
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
},
None,
None,
Expand Down
4 changes: 4 additions & 0 deletions datafusion/core/src/physical_plan/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ mod private {
#[cfg(test)]
#[cfg(feature = "avro")]
mod tests {
use crate::config::ConfigOptions;
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
use crate::datasource::listing::PartitionedFile;
use crate::datasource::object_store::ObjectStoreUrl;
Expand Down Expand Up @@ -237,6 +238,7 @@ mod tests {
projection: Some(vec![0, 1, 2]),
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

Expand Down Expand Up @@ -306,6 +308,7 @@ mod tests {
projection,
limit: None,
table_partition_cols: vec![],
config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

Expand Down Expand Up @@ -374,6 +377,7 @@ mod tests {
statistics: Statistics::default(),
limit: None,
table_partition_cols: vec!["date".to_owned()],
config_options: ConfigOptions::new().into_shareable(),
});
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);

Expand Down
Loading

0 comments on commit 6e0097d

Please sign in to comment.