Skip to content

Commit

Permalink
Implement extensible configuration mechanism (#2754)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Jun 22, 2022
1 parent e9423b8 commit cbb0517
Show file tree
Hide file tree
Showing 5 changed files with 249 additions and 14 deletions.
187 changes: 187 additions & 0 deletions datafusion/core/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! DataFusion Configuration Options
use arrow::datatypes::DataType;
use datafusion_common::ScalarValue;
use std::collections::HashMap;

/// Configuration option "datafusion.optimizer.filterNullJoinKeys"
pub const OPT_FILTER_NULL_JOIN_KEYS: &str = "datafusion.optimizer.filterNullJoinKeys";

/// Definition of a configuration option
pub struct ConfigDefinition {
/// key used to identifier this configuration option
key: String,
/// Description to be used in generated documentation
description: String,
/// Data type of this option
data_type: DataType,
/// Default value
default_value: ScalarValue,
}

impl ConfigDefinition {
/// Create a configuration option definition
pub fn new(
name: impl Into<String>,
description: impl Into<String>,
data_type: DataType,
default_value: ScalarValue,
) -> Self {
Self {
key: name.into(),
description: description.into(),
data_type,
default_value,
}
}

/// Create a configuration option definition with a boolean value
pub fn new_bool(name: &str, description: &str, default_value: bool) -> Self {
Self {
key: name.to_string(),
description: description.to_string(),
data_type: DataType::Boolean,
default_value: ScalarValue::Boolean(Some(default_value)),
}
}
}

/// Contains definitions for all built-in configuration options
pub struct BuiltInConfigs {
/// Configuration option definitions
config_definitions: Vec<ConfigDefinition>,
}

impl Default for BuiltInConfigs {
fn default() -> Self {
Self::new()
}
}

impl BuiltInConfigs {
/// Create a new BuiltInConfigs struct containing definitions for all built-in
/// configuration options
pub fn new() -> Self {
Self {
config_definitions: vec![ConfigDefinition::new_bool(
OPT_FILTER_NULL_JOIN_KEYS,
"When set to true, the optimizer will insert filters before a join between \
a nullable and non-nullable column to filter out nulls on the nullable side. This \
filter can add additional overhead when the file format does not fully support \
predicate push down.",
false,
)],
}
}

/// Generate documentation that can be included int he user guide
pub fn generate_config_markdown() -> String {
let configs = Self::new();
let mut docs = "| key | type | default | description |\n".to_string();
docs += "|-----|------|---------|-------------|\n";
for config in configs.config_definitions {
docs += &format!(
"| {} | {} | {} | {} |\n",
config.key, config.data_type, config.default_value, config.description
);
}
docs
}
}

/// Configuration options struct. This can contain values for built-in and custom options
#[derive(Debug, Clone)]
pub struct ConfigOptions {
options: HashMap<String, ScalarValue>,
}

impl Default for ConfigOptions {
fn default() -> Self {
Self::new()
}
}

impl ConfigOptions {
/// Create new ConfigOptions struct
pub fn new() -> Self {
let mut options = HashMap::new();
let built_in = BuiltInConfigs::new();
for config_def in &built_in.config_definitions {
options.insert(config_def.key.clone(), config_def.default_value.clone());
}
Self { options }
}

/// set a configuration option
pub fn set(&mut self, key: &str, value: ScalarValue) {
self.options.insert(key.to_string(), value);
}

/// set a boolean configuration option
pub fn set_bool(&mut self, key: &str, value: bool) {
self.set(key, ScalarValue::Boolean(Some(value)))
}

/// get a configuration option
pub fn get(&self, key: &str) -> Option<ScalarValue> {
self.options.get(key).cloned()
}

/// get a boolean configuration option
pub fn get_bool(&self, key: &str) -> bool {
match self.get(key) {
Some(ScalarValue::Boolean(Some(b))) => b,
_ => false,
}
}
}

#[cfg(test)]
mod test {
use crate::config::{BuiltInConfigs, ConfigOptions};

#[test]
fn docs() {
let docs = BuiltInConfigs::generate_config_markdown();
assert_eq!("| key | type | default | description |\
\n|-----|------|---------|-------------|\
\n| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When set to true, the optimizer \
will insert filters before a join between a nullable and non-nullable column to filter out \
nulls on the nullable side. This filter can add additional overhead when the file format does \
not fully support predicate push down. |\n", docs);
}

#[test]
fn get_then_set() {
let mut config = ConfigOptions::new();
let config_key = "datafusion.optimizer.filterNullJoinKeys";
assert!(!config.get_bool(config_key));
config.set_bool(config_key, true);
assert!(config.get_bool(config_key));
}

#[test]
fn get_invalid_config() {
let config = ConfigOptions::new();
let invalid_key = "not.valid";
assert!(config.get(invalid_key).is_none());
assert!(!config.get_bool(invalid_key));
}
}
48 changes: 34 additions & 14 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ use crate::physical_optimizer::coalesce_batches::CoalesceBatches;
use crate::physical_optimizer::merge_exec::AddCoalescePartitionsExec;
use crate::physical_optimizer::repartition::Repartition;

use crate::config::{ConfigOptions, OPT_FILTER_NULL_JOIN_KEYS};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
Expand All @@ -91,6 +92,7 @@ use crate::physical_plan::PhysicalPlanner;
use crate::variable::{VarProvider, VarType};
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use datafusion_common::ScalarValue;
use datafusion_expr::TableSource;
use datafusion_optimizer::filter_null_join_keys::FilterNullJoinKeys;
use datafusion_sql::{
Expand Down Expand Up @@ -1014,6 +1016,8 @@ pub struct SessionConfig {
pub repartition_windows: bool,
/// Should DataFusion parquet reader using the predicate to prune data
pub parquet_pruning: bool,
/// Configuration options
pub config_options: ConfigOptions,
}

impl Default for SessionConfig {
Expand All @@ -1029,6 +1033,7 @@ impl Default for SessionConfig {
repartition_aggregations: true,
repartition_windows: true,
parquet_pruning: true,
config_options: ConfigOptions::new(),
}
}
}
Expand All @@ -1039,6 +1044,17 @@ impl SessionConfig {
Default::default()
}

/// Set a configuration option
pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
self.config_options.set(key, value);
self
}

/// Set a boolean configuration option
pub fn set_bool(self, key: &str, value: bool) -> Self {
self.set(key, ScalarValue::Boolean(Some(value)))
}

/// Customize batch size
pub fn with_batch_size(mut self, n: usize) -> Self {
// batch size must be greater than zero
Expand Down Expand Up @@ -1200,22 +1216,26 @@ impl SessionState {
.register_catalog(config.default_catalog.clone(), default_catalog);
}

let mut rules: Vec<Arc<dyn OptimizerRule + Sync + Send>> = vec![
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Arc::new(SubqueryFilterToJoin::new()),
Arc::new(EliminateFilter::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(ProjectionPushDown::new()),
];
if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(FilterPushDown::new()));
rules.push(Arc::new(LimitPushDown::new()));
rules.push(Arc::new(SingleDistinctToGroupBy::new()));

SessionState {
session_id,
optimizer: Optimizer::new(vec![
// Simplify expressions first to maximize the chance
// of applying other optimizations
Arc::new(SimplifyExpressions::new()),
Arc::new(SubqueryFilterToJoin::new()),
Arc::new(EliminateFilter::new()),
Arc::new(CommonSubexprEliminate::new()),
Arc::new(EliminateLimit::new()),
Arc::new(ProjectionPushDown::new()),
Arc::new(FilterNullJoinKeys::default()),
Arc::new(FilterPushDown::new()),
Arc::new(LimitPushDown::new()),
Arc::new(SingleDistinctToGroupBy::new()),
]),
optimizer: Optimizer::new(rules),
physical_optimizers: vec![
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ extern crate sqlparser;

pub mod avro_to_arrow;
pub mod catalog;
pub mod config;
pub mod dataframe;
pub mod datasource;
pub mod error;
Expand Down
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Table of content
user-guide/library
user-guide/cli
user-guide/sql/index
user-guide/configs
user-guide/faq

.. _toc.specs:
Expand Down
26 changes: 26 additions & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
<!---
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Configuration Settings

The following configuration options can be passed to `SessionConfig` to control various aspects of query execution.

| key | type | default | description |
| --------------------------------------- | ------- | ------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| datafusion.optimizer.filterNullJoinKeys | Boolean | false | When set to true, the optimizer will insert filters before a join between a nullable and non-nullable column to filter out nulls on the nullable side. This filter can add additional overhead when the file format does not fully support predicate push down. |

0 comments on commit cbb0517

Please sign in to comment.