-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support ShowVariable Statement #3455
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,6 +24,8 @@ use std::{ | |
sync::{Arc, Weak}, | ||
}; | ||
|
||
use parking_lot::RwLock; | ||
|
||
use arrow::{ | ||
array::{StringBuilder, UInt64Builder}, | ||
datatypes::{DataType, Field, Schema}, | ||
|
@@ -39,26 +41,32 @@ use super::{ | |
schema::SchemaProvider, | ||
}; | ||
|
||
use crate::config::ConfigOptions; | ||
|
||
const INFORMATION_SCHEMA: &str = "information_schema"; | ||
const TABLES: &str = "tables"; | ||
const VIEWS: &str = "views"; | ||
const COLUMNS: &str = "columns"; | ||
const SETTINGS: &str = "settings"; | ||
|
||
/// Wraps another [`CatalogProvider`] and adds a "information_schema" | ||
/// schema that can introspect on tables in the catalog_list | ||
pub(crate) struct CatalogWithInformationSchema { | ||
catalog_list: Weak<dyn CatalogList>, | ||
config_options: Weak<RwLock<ConfigOptions>>, | ||
/// wrapped provider | ||
inner: Arc<dyn CatalogProvider>, | ||
} | ||
|
||
impl CatalogWithInformationSchema { | ||
pub(crate) fn new( | ||
catalog_list: Weak<dyn CatalogList>, | ||
config_options: Weak<RwLock<ConfigOptions>>, | ||
inner: Arc<dyn CatalogProvider>, | ||
) -> Self { | ||
Self { | ||
catalog_list, | ||
config_options, | ||
inner, | ||
} | ||
} | ||
|
@@ -79,9 +87,13 @@ impl CatalogProvider for CatalogWithInformationSchema { | |
|
||
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> { | ||
if name.eq_ignore_ascii_case(INFORMATION_SCHEMA) { | ||
Weak::upgrade(&self.catalog_list).map(|catalog_list| { | ||
Arc::new(InformationSchemaProvider { catalog_list }) | ||
as Arc<dyn SchemaProvider> | ||
Weak::upgrade(&self.catalog_list).and_then(|catalog_list| { | ||
Weak::upgrade(&self.config_options).map(|config_options| { | ||
Arc::new(InformationSchemaProvider { | ||
catalog_list: catalog_list, | ||
config_options: config_options, | ||
}) as Arc<dyn SchemaProvider> | ||
}) | ||
}) | ||
} else { | ||
self.inner.schema(name) | ||
|
@@ -106,6 +118,7 @@ impl CatalogProvider for CatalogWithInformationSchema { | |
/// table is queried. | ||
struct InformationSchemaProvider { | ||
catalog_list: Arc<dyn CatalogList>, | ||
config_options: Arc<RwLock<ConfigOptions>>, | ||
} | ||
|
||
impl InformationSchemaProvider { | ||
|
@@ -141,6 +154,12 @@ impl InformationSchemaProvider { | |
COLUMNS, | ||
TableType::View, | ||
); | ||
builder.add_table( | ||
&catalog_name, | ||
INFORMATION_SCHEMA, | ||
SETTINGS, | ||
TableType::View, | ||
); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add so that ❯ show tables;
+---------------+--------------------+------------+------------+
| table_catalog | table_schema | table_name | table_type |
+---------------+--------------------+------------+------------+
| datafusion | information_schema | tables | VIEW |
| datafusion | information_schema | views | VIEW |
| datafusion | information_schema | columns | VIEW |
| datafusion | information_schema | settings | VIEW |
+---------------+--------------------+------------+------------+
4 rows in set. Query took 0.004 seconds. |
||
} | ||
|
||
let mem_table: MemTable = builder.into(); | ||
|
@@ -206,6 +225,19 @@ impl InformationSchemaProvider { | |
|
||
Arc::new(mem_table) | ||
} | ||
|
||
/// Construct the `information_schema.settings` virtual table | ||
fn make_settings(&self) -> Arc<dyn TableProvider> { | ||
let mut builder = InformationSchemaSettingsBuilder::new(); | ||
|
||
for (name, setting) in self.config_options.read().options() { | ||
builder.add_setting(name, setting.to_string()); | ||
} | ||
|
||
let mem_table: MemTable = builder.into(); | ||
|
||
Arc::new(mem_table) | ||
} | ||
} | ||
|
||
impl SchemaProvider for InformationSchemaProvider { | ||
|
@@ -224,6 +256,8 @@ impl SchemaProvider for InformationSchemaProvider { | |
Some(self.make_columns()) | ||
} else if name.eq_ignore_ascii_case("views") { | ||
Some(self.make_views()) | ||
} else if name.eq_ignore_ascii_case("settings") { | ||
Some(self.make_settings()) | ||
} else { | ||
None | ||
} | ||
|
@@ -579,3 +613,45 @@ impl From<InformationSchemaColumnsBuilder> for MemTable { | |
MemTable::try_new(schema, vec![vec![batch]]).unwrap() | ||
} | ||
} | ||
|
||
struct InformationSchemaSettingsBuilder { | ||
names: StringBuilder, | ||
settings: StringBuilder, | ||
} | ||
|
||
impl InformationSchemaSettingsBuilder { | ||
fn new() -> Self { | ||
Self { | ||
names: StringBuilder::new(), | ||
settings: StringBuilder::new(), | ||
} | ||
} | ||
|
||
fn add_setting(&mut self, name: impl AsRef<str>, setting: impl AsRef<str>) { | ||
self.names.append_value(name.as_ref()); | ||
self.settings.append_value(setting.as_ref()); | ||
} | ||
} | ||
|
||
impl From<InformationSchemaSettingsBuilder> for MemTable { | ||
fn from(value: InformationSchemaSettingsBuilder) -> MemTable { | ||
let schema = Schema::new(vec![ | ||
Field::new("name", DataType::Utf8, false), | ||
Field::new("setting", DataType::Utf8, false), | ||
]); | ||
|
||
let InformationSchemaSettingsBuilder { | ||
mut names, | ||
mut settings, | ||
} = value; | ||
|
||
let schema = Arc::new(schema); | ||
let batch = RecordBatch::try_new( | ||
schema.clone(), | ||
vec![Arc::new(names.finish()), Arc::new(settings.finish())], | ||
) | ||
.unwrap(); | ||
|
||
MemTable::try_new(schema, vec![vec![batch]]).unwrap() | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. implement this is what postgresql has show all;
name | setting | description
----------------------------------------+--------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------
allow_system_table_mods | off | Allows modifications of the structure of system tables.
application_name | psql | Sets the application name to be reported in statistics and logs.
archive_cleanup_command | | Sets the shell command that will be executed at every restart point.
archive_command | (disabled) | Sets the shell command that will be called to archive a WAL file.
archive_mode | off | Allows archiving of WAL files using archive_command.
archive_timeout | 0 | Forces a switch |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -841,6 +841,7 @@ impl SessionContext { | |
let catalog = if information_schema { | ||
Arc::new(CatalogWithInformationSchema::new( | ||
Arc::downgrade(&state.catalog_list), | ||
Arc::downgrade(&state.config.config_options), | ||
catalog, | ||
)) | ||
} else { | ||
|
@@ -1130,7 +1131,7 @@ pub struct SessionConfig { | |
/// Should DataFusion parquet reader using the predicate to prune data | ||
pub parquet_pruning: bool, | ||
/// Configuration options | ||
pub config_options: ConfigOptions, | ||
pub config_options: Arc<RwLock<ConfigOptions>>, | ||
Comment on lines
1133
to
+1134
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change
|
||
/// Opaque extensions. | ||
extensions: AnyMap, | ||
} | ||
|
@@ -1147,7 +1148,7 @@ impl Default for SessionConfig { | |
repartition_aggregations: true, | ||
repartition_windows: true, | ||
parquet_pruning: true, | ||
config_options: ConfigOptions::new(), | ||
config_options: Arc::new(RwLock::new(ConfigOptions::new())), | ||
// Assume no extensions by default. | ||
extensions: HashMap::with_capacity_and_hasher( | ||
0, | ||
|
@@ -1166,14 +1167,14 @@ impl SessionConfig { | |
/// Create an execution config with config options read from the environment | ||
pub fn from_env() -> Self { | ||
Self { | ||
config_options: ConfigOptions::from_env(), | ||
config_options: Arc::new(RwLock::new(ConfigOptions::from_env())), | ||
..Default::default() | ||
} | ||
} | ||
|
||
/// Set a configuration option | ||
pub fn set(mut self, key: &str, value: ScalarValue) -> Self { | ||
self.config_options.set(key, value); | ||
pub fn set(self, key: &str, value: ScalarValue) -> Self { | ||
self.config_options.write().set(key, value); | ||
self | ||
Comment on lines
+1176
to
1178
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
|
@@ -1252,22 +1253,18 @@ impl SessionConfig { | |
/// Get the currently configured batch size | ||
pub fn batch_size(&self) -> usize { | ||
self.config_options | ||
.read() | ||
.get_u64(OPT_BATCH_SIZE) | ||
.try_into() | ||
.unwrap() | ||
} | ||
Comment on lines
1254
to
1260
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
|
||
/// Get the current configuration options | ||
pub fn config_options(&self) -> &ConfigOptions { | ||
&self.config_options | ||
} | ||
|
||
/// Convert configuration options to name-value pairs with values converted to strings. Note | ||
/// that this method will eventually be deprecated and replaced by [config_options]. | ||
pub fn to_props(&self) -> HashMap<String, String> { | ||
let mut map = HashMap::new(); | ||
// copy configs from config_options | ||
for (k, v) in self.config_options.options() { | ||
for (k, v) in self.config_options.read().options() { | ||
map.insert(k.to_string(), format!("{}", v)); | ||
} | ||
map.insert( | ||
|
@@ -1420,6 +1417,7 @@ impl SessionState { | |
let default_catalog: Arc<dyn CatalogProvider> = if config.information_schema { | ||
Arc::new(CatalogWithInformationSchema::new( | ||
Arc::downgrade(&catalog_list), | ||
Arc::downgrade(&config.config_options), | ||
Arc::new(default_catalog), | ||
)) | ||
} else { | ||
|
@@ -1444,7 +1442,11 @@ impl SessionState { | |
Arc::new(ProjectionPushDown::new()), | ||
Arc::new(RewriteDisjunctivePredicate::new()), | ||
]; | ||
if config.config_options.get_bool(OPT_FILTER_NULL_JOIN_KEYS) { | ||
if config | ||
.config_options | ||
.read() | ||
.get_bool(OPT_FILTER_NULL_JOIN_KEYS) | ||
{ | ||
rules.push(Arc::new(FilterNullJoinKeys::default())); | ||
} | ||
rules.push(Arc::new(ReduceOuterJoin::new())); | ||
|
@@ -1459,10 +1461,11 @@ impl SessionState { | |
Arc::new(AggregateStatistics::new()), | ||
Arc::new(HashBuildProbeOrder::new()), | ||
]; | ||
if config.config_options.get_bool(OPT_COALESCE_BATCHES) { | ||
if config.config_options.read().get_bool(OPT_COALESCE_BATCHES) { | ||
physical_optimizers.push(Arc::new(CoalesceBatches::new( | ||
config | ||
.config_options | ||
.read() | ||
.get_u64(OPT_COALESCE_TARGET_BATCH_SIZE) | ||
.try_into() | ||
.unwrap(), | ||
|
@@ -1566,6 +1569,7 @@ impl SessionState { | |
let mut optimizer_config = OptimizerConfig::new().with_skip_failing_rules( | ||
self.config | ||
.config_options | ||
.read() | ||
.get_bool(OPT_OPTIMIZER_SKIP_FAILED_RULES), | ||
); | ||
optimizer_config.query_execution_start_time = | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think
settings
is an "offical" information_schema (if there is such a thing) table. Perhaps we could call this table something likedf_settings
to make it clear that this table is a datafusion extension?https://www.postgresql.org/docs/current/information-schema.html
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no it's not. postgresl has this
df_settings sounds good. should we leave it in information_schema, or move it out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't have any preference -- whatever is easier at first (
information_schema
seems reasonable to me)There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@alamb updated