Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ShowVariable Statement #3455

Merged
merged 5 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use std::{
sync::{Arc, Weak},
};

use parking_lot::RwLock;

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema},
Expand All @@ -39,26 +41,32 @@ use super::{
schema::SchemaProvider,
};

use crate::config::ConfigOptions;

const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
const COLUMNS: &str = "columns";
const DF_SETTINGS: &str = "df_settings";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename settings -> df_settings


/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
config_options,
inner,
}
}
Expand All @@ -79,9 +87,13 @@ impl CatalogProvider for CatalogWithInformationSchema {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider { catalog_list })
as Arc<dyn SchemaProvider>
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
catalog_list,
config_options,
}) as Arc<dyn SchemaProvider>
})
})
} else {
self.inner.schema(name)
Expand All @@ -106,6 +118,7 @@ impl CatalogProvider for CatalogWithInformationSchema {
/// table is queried.
struct InformationSchemaProvider {
catalog_list: Arc<dyn CatalogList>,
config_options: Arc<RwLock<ConfigOptions>>,
}

impl InformationSchemaProvider {
Expand Down Expand Up @@ -141,6 +154,12 @@ impl InformationSchemaProvider {
COLUMNS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
DF_SETTINGS,
TableType::View,
);
}

let mem_table: MemTable = builder.into();
Expand Down Expand Up @@ -206,6 +225,19 @@ impl InformationSchemaProvider {

Arc::new(mem_table)
}

/// Construct the `information_schema.df_settings` virtual table
fn make_df_settings(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaDfSettingsBuilder::new();

for (name, setting) in self.config_options.read().options() {
builder.add_setting(name, setting.to_string());
}

let mem_table: MemTable = builder.into();

Arc::new(mem_table)
}
}

impl SchemaProvider for InformationSchemaProvider {
Expand All @@ -224,6 +256,8 @@ impl SchemaProvider for InformationSchemaProvider {
Some(self.make_columns())
} else if name.eq_ignore_ascii_case("views") {
Some(self.make_views())
} else if name.eq_ignore_ascii_case("df_settings") {
Some(self.make_df_settings())
} else {
None
}
Expand Down Expand Up @@ -579,3 +613,45 @@ impl From<InformationSchemaColumnsBuilder> for MemTable {
MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}

struct InformationSchemaDfSettingsBuilder {
names: StringBuilder,
settings: StringBuilder,
}

impl InformationSchemaDfSettingsBuilder {
fn new() -> Self {
Self {
names: StringBuilder::new(),
settings: StringBuilder::new(),
}
}

fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>) {
self.names.append_value(name.as_ref());
self.settings.append_value(setting.as_ref());
}
}

impl From<InformationSchemaDfSettingsBuilder> for MemTable {
fn from(value: InformationSchemaDfSettingsBuilder) -> MemTable {
let schema = Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("setting", DataType::Utf8, false),
]);

let InformationSchemaDfSettingsBuilder {
mut names,
mut settings,
} = value;

let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(names.finish()), Arc::new(settings.finish())],
)
.unwrap();

MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}
30 changes: 17 additions & 13 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ impl SessionContext {
let catalog = if information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
Arc::downgrade(&state.config.config_options),
catalog,
))
} else {
Expand Down Expand Up @@ -1130,7 +1131,7 @@ pub struct SessionConfig {
/// Should DataFusion parquet reader using the predicate to prune data
pub parquet_pruning: bool,
/// Configuration options
pub config_options: ConfigOptions,
pub config_options: Arc<RwLock<ConfigOptions>>,
Comment on lines 1133 to +1134
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change SessionConfig.config_options from ConfigOptions to Arc<RwLock<ConfigOptions>
since

  1. we need to share this with CatalogWithInformationSchema
  2. allow to set config

/// Opaque extensions.
extensions: AnyMap,
}
Expand All @@ -1147,7 +1148,7 @@ impl Default for SessionConfig {
repartition_aggregations: true,
repartition_windows: true,
parquet_pruning: true,
config_options: ConfigOptions::new(),
config_options: Arc::new(RwLock::new(ConfigOptions::new())),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
0,
Expand All @@ -1166,14 +1167,14 @@ impl SessionConfig {
/// Create an execution config with config options read from the environment
pub fn from_env() -> Self {
Self {
config_options: ConfigOptions::from_env(),
config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
..Default::default()
}
}

/// Set a configuration option
pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
self.config_options.set(key, value);
pub fn set(self, key: &str, value: ScalarValue) -> Self {
self.config_options.write().set(key, value);
self
Comment on lines +1176 to 1178
Copy link
Contributor Author

@waitingkuo waitingkuo Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_options.write() to unlock the write lock and set the value

}

Expand Down Expand Up @@ -1252,22 +1253,18 @@ impl SessionConfig {
/// Get the currently configured batch size
pub fn batch_size(&self) -> usize {
self.config_options
.read()
.get_u64(OPT_BATCH_SIZE)
.try_into()
.unwrap()
}
Comment on lines 1254 to 1260
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_options.read() to unlock the read lock to get the value


/// Get the current configuration options
pub fn config_options(&self) -> &ConfigOptions {
&self.config_options
}

/// Convert configuration options to name-value pairs with values converted to strings. Note
/// that this method will eventually be deprecated and replaced by [config_options].
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
// copy configs from config_options
for (k, v) in self.config_options.options() {
for (k, v) in self.config_options.read().options() {
map.insert(k.to_string(), format!("{}", v));
}
map.insert(
Expand Down Expand Up @@ -1420,6 +1417,7 @@ impl SessionState {
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&catalog_list),
Arc::downgrade(&config.config_options),
Arc::new(default_catalog),
))
} else {
Expand All @@ -1444,7 +1442,11 @@ impl SessionState {
Arc::new(ProjectionPushDown::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
];
if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
if config
.config_options
.read()
.get_bool(OPT_FILTER_NULL_JOIN_KEYS)
{
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(ReduceOuterJoin::new()));
Expand All @@ -1459,10 +1461,11 @@ impl SessionState {
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
];
if config.config_options.get_bool(OPT_COALESCE_BATCHES) {
if config.config_options.read().get_bool(OPT_COALESCE_BATCHES) {
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
.read()
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.try_into()
.unwrap(),
Expand Down Expand Up @@ -1566,6 +1569,7 @@ impl SessionState {
let mut optimizer_config = OptimizerConfig::new().with_skip_failing_rules(
self.config
.config_options
.read()
.get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES),
);
optimizer_config.query_execution_start_time =
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,7 @@ impl DefaultPhysicalPlanner {
if !session_state
.config
.config_options
.read()
.get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY)
{
stringified_plans = e.stringified_plans.clone();
Expand All @@ -1583,6 +1584,7 @@ impl DefaultPhysicalPlanner {
if !session_state
.config
.config_options
.read()
.get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY)
{
let input = self
Expand Down
Loading