Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support ShowVariable Statement #3455

Merged
merged 5 commits into from
Sep 14, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 79 additions & 3 deletions datafusion/core/src/catalog/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use std::{
sync::{Arc, Weak},
};

use parking_lot::RwLock;

use arrow::{
array::{StringBuilder, UInt64Builder},
datatypes::{DataType, Field, Schema},
Expand All @@ -39,26 +41,32 @@ use super::{
schema::SchemaProvider,
};

use crate::config::ConfigOptions;

const INFORMATION_SCHEMA: &str = "information_schema";
const TABLES: &str = "tables";
const VIEWS: &str = "views";
const COLUMNS: &str = "columns";
const SETTINGS: &str = "settings";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think settings is an "offical" information_schema (if there is such a thing) table. Perhaps we could call this table something like df_settings to make it clear that this table is a datafusion extension?

https://www.postgresql.org/docs/current/information-schema.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it's not. postgresl has this

willy=#  select name, setting from pg_settings limit 1;
          name           | setting 
-------------------------+---------
 allow_system_table_mods | off
(1 row)

df_settings sounds good. should we leave it in information_schema, or move it out?

Copy link
Contributor

@alamb alamb Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

df_settings sounds good. should we leave it in information_schema, or move it out?

I don't have any preference -- whatever is easier at first (information_schema seems reasonable to me)

Copy link
Contributor Author

@waitingkuo waitingkuo Sep 13, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb updated

❯ show tables;
+---------------+--------------------+-------------+------------+
| table_catalog | table_schema       | table_name  | table_type |
+---------------+--------------------+-------------+------------+
| datafusion    | information_schema | tables      | VIEW       |
| datafusion    | information_schema | views       | VIEW       |
| datafusion    | information_schema | columns     | VIEW       |
| datafusion    | information_schema | df_settings | VIEW       |
+---------------+--------------------+-------------+------------+
4 rows in set. Query took 0.003 seconds.


❯ select * from information_schema.df_settings;
+-------------------------------------------------+---------+
| name                                            | setting |
+-------------------------------------------------+---------+
| datafusion.execution.coalesce_batches           | true    |
| datafusion.explain.physical_plan_only           | false   |
| datafusion.execution.coalesce_target_batch_size | 4096    |
| datafusion.execution.batch_size                 | 8192    |
| datafusion.optimizer.skip_failed_rules          | true    |
| datafusion.optimizer.filter_null_join_keys      | false   |
| datafusion.explain.logical_plan_only            | false   |
+-------------------------------------------------+---------+
8 rows in set. Query took 0.002 seconds.


/// Wraps another [`CatalogProvider`] and adds a "information_schema"
/// schema that can introspect on tables in the catalog_list
pub(crate) struct CatalogWithInformationSchema {
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
/// wrapped provider
inner: Arc<dyn CatalogProvider>,
}

impl CatalogWithInformationSchema {
pub(crate) fn new(
catalog_list: Weak<dyn CatalogList>,
config_options: Weak<RwLock<ConfigOptions>>,
inner: Arc<dyn CatalogProvider>,
) -> Self {
Self {
catalog_list,
config_options,
inner,
}
}
Expand All @@ -79,9 +87,13 @@ impl CatalogProvider for CatalogWithInformationSchema {

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) {
Weak::upgrade(&self.catalog_list).map(|catalog_list| {
Arc::new(InformationSchemaProvider { catalog_list })
as Arc<dyn SchemaProvider>
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| {
Weak::upgrade(&self.config_options).map(|config_options| {
Arc::new(InformationSchemaProvider {
catalog_list: catalog_list,
config_options: config_options,
}) as Arc<dyn SchemaProvider>
})
})
} else {
self.inner.schema(name)
Expand All @@ -106,6 +118,7 @@ impl CatalogProvider for CatalogWithInformationSchema {
/// table is queried.
struct InformationSchemaProvider {
catalog_list: Arc<dyn CatalogList>,
config_options: Arc<RwLock<ConfigOptions>>,
}

impl InformationSchemaProvider {
Expand Down Expand Up @@ -141,6 +154,12 @@ impl InformationSchemaProvider {
COLUMNS,
TableType::View,
);
builder.add_table(
&catalog_name,
INFORMATION_SCHEMA,
SETTINGS,
TableType::View,
);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add SETTINGS into information_schema

so that

❯ show tables;
+---------------+--------------------+------------+------------+
| table_catalog | table_schema       | table_name | table_type |
+---------------+--------------------+------------+------------+
| datafusion    | information_schema | tables     | VIEW       |
| datafusion    | information_schema | views      | VIEW       |
| datafusion    | information_schema | columns    | VIEW       |
| datafusion    | information_schema | settings   | VIEW       |
+---------------+--------------------+------------+------------+
4 rows in set. Query took 0.004 seconds.

}

let mem_table: MemTable = builder.into();
Expand Down Expand Up @@ -206,6 +225,19 @@ impl InformationSchemaProvider {

Arc::new(mem_table)
}

/// Construct the `information_schema.settings` virtual table
fn make_settings(&self) -> Arc<dyn TableProvider> {
let mut builder = InformationSchemaSettingsBuilder::new();

for (name, setting) in self.config_options.read().options() {
builder.add_setting(name, setting.to_string());
}

let mem_table: MemTable = builder.into();

Arc::new(mem_table)
}
}

impl SchemaProvider for InformationSchemaProvider {
Expand All @@ -224,6 +256,8 @@ impl SchemaProvider for InformationSchemaProvider {
Some(self.make_columns())
} else if name.eq_ignore_ascii_case("views") {
Some(self.make_views())
} else if name.eq_ignore_ascii_case("settings") {
Some(self.make_settings())
} else {
None
}
Expand Down Expand Up @@ -579,3 +613,45 @@ impl From<InformationSchemaColumnsBuilder> for MemTable {
MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}

struct InformationSchemaSettingsBuilder {
names: StringBuilder,
settings: StringBuilder,
}

impl InformationSchemaSettingsBuilder {
fn new() -> Self {
Self {
names: StringBuilder::new(),
settings: StringBuilder::new(),
}
}

fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>) {
self.names.append_value(name.as_ref());
self.settings.append_value(setting.as_ref());
}
}

impl From<InformationSchemaSettingsBuilder> for MemTable {
fn from(value: InformationSchemaSettingsBuilder) -> MemTable {
let schema = Schema::new(vec![
Field::new("name", DataType::Utf8, false),
Field::new("setting", DataType::Utf8, false),
]);

let InformationSchemaSettingsBuilder {
mut names,
mut settings,
} = value;

let schema = Arc::new(schema);
let batch = RecordBatch::try_new(
schema.clone(),
vec![Arc::new(names.finish()), Arc::new(settings.finish())],
)
.unwrap();

MemTable::try_new(schema, vec![vec![batch]]).unwrap()
}
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

implement InformationSchemaSettingsBuilder
only two columns, name and setting supported now

this is what postgresql has

show all;

                  name                  |                                 setting                                  |                                                               description                                                               
----------------------------------------+--------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------
 allow_system_table_mods                | off                                                                      | Allows modifications of the structure of system tables.
 application_name                       | psql                                                                     | Sets the application name to be reported in statistics and logs.
 archive_cleanup_command                |                                                                          | Sets the shell command that will be executed at every restart point.
 archive_command                        | (disabled)                                                               | Sets the shell command that will be called to archive a WAL file.
 archive_mode                           | off                                                                      | Allows archiving of WAL files using archive_command.
 archive_timeout                        | 0                                                                        | Forces a switch 

30 changes: 17 additions & 13 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ impl SessionContext {
let catalog = if information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&state.catalog_list),
Arc::downgrade(&state.config.config_options),
catalog,
))
} else {
Expand Down Expand Up @@ -1130,7 +1131,7 @@ pub struct SessionConfig {
/// Should DataFusion parquet reader using the predicate to prune data
pub parquet_pruning: bool,
/// Configuration options
pub config_options: ConfigOptions,
pub config_options: Arc<RwLock<ConfigOptions>>,
Comment on lines 1133 to +1134
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change SessionConfig.config_options from ConfigOptions to Arc<RwLock<ConfigOptions>
since

  1. we need to share this with CatalogWithInformationSchema
  2. allow to set config

/// Opaque extensions.
extensions: AnyMap,
}
Expand All @@ -1147,7 +1148,7 @@ impl Default for SessionConfig {
repartition_aggregations: true,
repartition_windows: true,
parquet_pruning: true,
config_options: ConfigOptions::new(),
config_options: Arc::new(RwLock::new(ConfigOptions::new())),
// Assume no extensions by default.
extensions: HashMap::with_capacity_and_hasher(
0,
Expand All @@ -1166,14 +1167,14 @@ impl SessionConfig {
/// Create an execution config with config options read from the environment
pub fn from_env() -> Self {
Self {
config_options: ConfigOptions::from_env(),
config_options: Arc::new(RwLock::new(ConfigOptions::from_env())),
..Default::default()
}
}

/// Set a configuration option
pub fn set(mut self, key: &str, value: ScalarValue) -> Self {
self.config_options.set(key, value);
pub fn set(self, key: &str, value: ScalarValue) -> Self {
self.config_options.write().set(key, value);
self
Comment on lines +1176 to 1178
Copy link
Contributor Author

@waitingkuo waitingkuo Sep 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_options.write() to unlock the write lock and set the value

}

Expand Down Expand Up @@ -1252,22 +1253,18 @@ impl SessionConfig {
/// Get the currently configured batch size
pub fn batch_size(&self) -> usize {
self.config_options
.read()
.get_u64(OPT_BATCH_SIZE)
.try_into()
.unwrap()
}
Comment on lines 1254 to 1260
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

config_options.read() to unlock the read lock to get the value


/// Get the current configuration options
pub fn config_options(&self) -> &ConfigOptions {
&self.config_options
}

/// Convert configuration options to name-value pairs with values converted to strings. Note
/// that this method will eventually be deprecated and replaced by [config_options].
pub fn to_props(&self) -> HashMap<String, String> {
let mut map = HashMap::new();
// copy configs from config_options
for (k, v) in self.config_options.options() {
for (k, v) in self.config_options.read().options() {
map.insert(k.to_string(), format!("{}", v));
}
map.insert(
Expand Down Expand Up @@ -1420,6 +1417,7 @@ impl SessionState {
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema {
Arc::new(CatalogWithInformationSchema::new(
Arc::downgrade(&catalog_list),
Arc::downgrade(&config.config_options),
Arc::new(default_catalog),
))
} else {
Expand All @@ -1444,7 +1442,11 @@ impl SessionState {
Arc::new(ProjectionPushDown::new()),
Arc::new(RewriteDisjunctivePredicate::new()),
];
if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) {
if config
.config_options
.read()
.get_bool(OPT_FILTER_NULL_JOIN_KEYS)
{
rules.push(Arc::new(FilterNullJoinKeys::default()));
}
rules.push(Arc::new(ReduceOuterJoin::new()));
Expand All @@ -1459,10 +1461,11 @@ impl SessionState {
Arc::new(AggregateStatistics::new()),
Arc::new(HashBuildProbeOrder::new()),
];
if config.config_options.get_bool(OPT_COALESCE_BATCHES) {
if config.config_options.read().get_bool(OPT_COALESCE_BATCHES) {
physical_optimizers.push(Arc::new(CoalesceBatches::new(
config
.config_options
.read()
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE)
.try_into()
.unwrap(),
Expand Down Expand Up @@ -1566,6 +1569,7 @@ impl SessionState {
let mut optimizer_config = OptimizerConfig::new().with_skip_failing_rules(
self.config
.config_options
.read()
.get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES),
);
optimizer_config.query_execution_start_time =
Expand Down
2 changes: 2 additions & 0 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,7 @@ impl DefaultPhysicalPlanner {
if !session_state
.config
.config_options
.read()
.get_bool(OPT_EXPLAIN_PHYSICAL_PLAN_ONLY)
{
stringified_plans = e.stringified_plans.clone();
Expand All @@ -1583,6 +1584,7 @@ impl DefaultPhysicalPlanner {
if !session_state
.config
.config_options
.read()
.get_bool(OPT_EXPLAIN_LOGICAL_PLAN_ONLY)
{
let input = self
Expand Down
Loading