Skip to content

Commit

Permalink
fix: Update VirtualLister to specify schema if one exact present (#849
Browse files Browse the repository at this point in the history
)

This is so we can specialize for the case of BigQuery.

Signed-off-by: Vaibhav <[email protected]>
  • Loading branch information
vrongmeal authored Apr 12, 2023
1 parent 2e1bdf0 commit fff11d6
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 129 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 2 additions & 5 deletions crates/datasource_common/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,8 @@ pub enum DatasourceCommonError {
#[error("Failed to find an open port to open the SSH tunnel")]
NoOpenPorts,

#[error("Listing schemas for this data source is unsupported.")]
ListingSchemasUnsupported,

#[error("Listing tables for this data source is unsupported.")]
ListingTablesUnsupported,
#[error("Unknown virtual catalog table: {0}")]
UnknownVirtualCatalogTable(String),

#[error(transparent)]
ListingErrBoxed(#[from] Box<dyn std::error::Error + Sync + Send>),
Expand Down
227 changes: 205 additions & 22 deletions crates/datasource_common/src/listing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,47 +2,230 @@
//!
//! Virtual listers can list schema and table information about the underlying
//! data source. These essentially provide a trimmed down information schema.
use crate::errors::Result;
use std::{fmt::Display, str::FromStr, sync::Arc};

use async_trait::async_trait;
use datafusion::{
arrow::{
array::{StringArray, StringBuilder},
datatypes::{DataType, Field, Schema, SchemaRef},
record_batch::RecordBatch,
},
datasource::TableProvider,
error::DataFusionError,
execution::context::SessionState,
logical_expr::{Expr, Operator, TableProviderFilterPushDown, TableType},
physical_plan::{memory::MemoryExec, ExecutionPlan},
scalar::ScalarValue,
};
use metastore::builtins::{VIRTUAL_CATALOG_SCHEMATA_TABLE, VIRTUAL_CATALOG_TABLES_TABLE};

#[derive(Debug, Clone)]
pub struct VirtualSchemas {
pub schema_names: Vec<String>,
}
use crate::errors::{DatasourceCommonError, Result};

#[derive(Debug, Clone)]
pub struct VirtualTables {
// These two vectors should always be the same length. Keeping the data as
// vectors instead of create a vector of structs makes conversion into arrow
// arrays easier.
pub table_schemas: Vec<String>,
pub table_names: Vec<String>,
pub struct VirtualTable {
pub schema: String,
pub table: String,
}

#[async_trait]
pub trait VirtualLister: Sync + Send {
/// List schemas for a data source.
async fn list_schemas(&self) -> Result<VirtualSchemas>;
async fn list_schemas(&self) -> Result<Vec<String>>;

/// List tables for a data source.
async fn list_tables(&self) -> Result<VirtualTables>;
async fn list_tables(&self, schema: Option<&str>) -> Result<Vec<VirtualTable>>;
}

#[derive(Debug, Clone, Copy)]
pub struct EmptyLister;

#[async_trait]
impl VirtualLister for EmptyLister {
async fn list_schemas(&self) -> Result<VirtualSchemas> {
Ok(VirtualSchemas {
schema_names: Vec::new(),
})
async fn list_schemas(&self) -> Result<Vec<String>> {
Ok(Vec::new())
}

async fn list_tables(&self) -> Result<VirtualTables> {
Ok(VirtualTables {
table_schemas: Vec::new(),
table_names: Vec::new(),
})
async fn list_tables(&self, _schema: Option<&str>) -> Result<Vec<VirtualTable>> {
Ok(Vec::new())
}
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum VirtualCatalogTable {
Schemata,
Tables,
}

impl Display for VirtualCatalogTable {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Schemata => VIRTUAL_CATALOG_SCHEMATA_TABLE,
Self::Tables => VIRTUAL_CATALOG_TABLES_TABLE,
};
f.write_str(s)
}
}

impl FromStr for VirtualCatalogTable {
type Err = DatasourceCommonError;

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let catalog = match s {
VIRTUAL_CATALOG_SCHEMATA_TABLE => Self::Schemata,
VIRTUAL_CATALOG_TABLES_TABLE => Self::Tables,
other => {
return Err(DatasourceCommonError::UnknownVirtualCatalogTable(
other.to_owned(),
))
}
};
Ok(catalog)
}
}

pub struct VirtualCatalogTableProvider {
lister: Box<dyn VirtualLister>,
catalog: VirtualCatalogTable,
schema: SchemaRef,
}

impl VirtualCatalogTableProvider {
pub fn new(lister: Box<dyn VirtualLister>, catalog: VirtualCatalogTable) -> Self {
Self {
lister,
catalog,
schema: Self::make_schema(catalog),
}
}

fn make_schema(catalog: VirtualCatalogTable) -> SchemaRef {
let fields = match catalog {
VirtualCatalogTable::Schemata => vec![Field::new("schema_name", DataType::Utf8, false)],
VirtualCatalogTable::Tables => vec![
Field::new("table_schema", DataType::Utf8, false),
Field::new("table_name", DataType::Utf8, false),
],
};
Arc::new(Schema::new(fields))
}
}

#[async_trait]
impl TableProvider for VirtualCatalogTableProvider {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> SchemaRef {
self.schema.clone()
}

fn table_type(&self) -> TableType {
TableType::Base
}

fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> Result<Vec<TableProviderFilterPushDown>, DataFusionError> {
let supports = match self.catalog {
VirtualCatalogTable::Schemata => {
vec![TableProviderFilterPushDown::Unsupported; filters.len()]
}
VirtualCatalogTable::Tables => filters
.iter()
.map(|f| {
if get_schema_from_filter(f).is_some() {
TableProviderFilterPushDown::Inexact
} else {
TableProviderFilterPushDown::Unsupported
}
})
.collect(),
};
Ok(supports)
}

async fn scan(
&self,
_state: &SessionState,
projection: Option<&Vec<usize>>,
filters: &[Expr],
_limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
let batch = match self.catalog {
VirtualCatalogTable::Schemata => {
let schema_list = self
.lister
.list_schemas()
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let schema_list: StringArray = schema_list.into_iter().map(Some).collect();

RecordBatch::try_new(self.schema.clone(), vec![Arc::new(schema_list)])?
}
VirtualCatalogTable::Tables => {
// Since we don't support any other filter for the table provider,
// the first filter should be the one we can use to directly match
// the schema to the lister.
let schema = filters.first().map(get_schema_from_filter).unwrap_or(None);
let schema = schema.as_deref();

let list = self
.lister
.list_tables(schema)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let (mut schema_list, mut table_list): (StringBuilder, StringBuilder) = list
.into_iter()
.map(|v| (Some(v.schema), Some(v.table)))
.unzip();

RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(schema_list.finish()),
Arc::new(table_list.finish()),
],
)?
}
};

let schema = batch.schema();
let exec = MemoryExec::try_new(&[vec![batch]], schema, projection.cloned())?;
Ok(Arc::new(exec))
}
}

fn get_schema_from_filter(filter: &Expr) -> Option<String> {
fn extract_schema(left: &Expr, right: &Expr) -> Option<String> {
// Check if left is a column with "table_schema" column.
match left {
Expr::Column(col) if col.name == "table_schema" => {}
_ => return None,
};
// Check if right is a literal with schema value and return it.
match right {
Expr::Literal(ScalarValue::Utf8(schema)) => schema.clone(),
_ => None,
}
}

match filter {
Expr::BinaryExpr(expr) if expr.op == Operator::Eq => {
// Here we need one of left or right to be column "table_schema"
// and the other to be literal.
if let Some(schema) = extract_schema(&expr.left, &expr.right) {
return Some(schema);
}
if let Some(schema) = extract_schema(&expr.right, &expr.left) {
return Some(schema);
}
}
_ => {}
};
None
}
1 change: 1 addition & 0 deletions crates/datasource_debug/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ futures = "0.3.28"
async-trait = "0.1.68"
serde = { version = "1.0", features = ["derive"] }
datafusion = "20.0"
datasource_common = { path = "../datasource_common" }
42 changes: 42 additions & 0 deletions crates/datasource_debug/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use datafusion::physical_plan::display::DisplayFormatType;
use datafusion::physical_plan::{
ExecutionPlan, Partitioning, RecordBatchStream, SendableRecordBatchStream, Statistics,
};
use datasource_common::errors::DatasourceCommonError;
use datasource_common::listing::{VirtualLister, VirtualTable};
use errors::DebugError;
use futures::Stream;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -124,6 +126,46 @@ impl DebugTableType {
}
}

pub struct DebugVirtualLister;

#[async_trait]
impl VirtualLister for DebugVirtualLister {
async fn list_schemas(&self) -> Result<Vec<String>, DatasourceCommonError> {
Ok((0..2).map(|i| format!("schema_{i}")).collect())
}

async fn list_tables(
&self,
schema: Option<&str>,
) -> Result<Vec<VirtualTable>, DatasourceCommonError> {
if let Some(schema) = schema {
if schema == "debug_schema" {
// This schema doesn't actually exist but we want to check if
// we get the schema from the filters correctly.
let tables: Vec<_> = (0..2)
.map(|i| VirtualTable {
schema: schema.to_owned(),
table: format!("table_{i}"),
})
.collect();
return Ok(tables);
}
}

let schema_list = self.list_schemas().await.unwrap();
let mut tables = Vec::new();
for schema in schema_list.iter() {
for i in 0..2 {
tables.push(VirtualTable {
schema: schema.clone(),
table: format!("table_{i}"),
});
}
}
Ok(tables)
}
}

pub struct DebugTableProvider {
typ: DebugTableType,
}
Expand Down
Loading

0 comments on commit fff11d6

Please sign in to comment.