From 39300063d519510d0e9c83c1ca1f3645c763433f Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 6 Jul 2023 12:22:28 -0500 Subject: [PATCH 1/8] refactor: use native execs instead of custom execs --- crates/datasources/src/object_store/csv.rs | 106 ++++++++ .../src/object_store/csv/csv_helper.rs | 220 ----------------- .../datasources/src/object_store/csv/mod.rs | 226 ------------------ crates/datasources/src/object_store/gcs.rs | 3 + crates/datasources/src/object_store/http.rs | 25 +- crates/datasources/src/object_store/local.rs | 3 + crates/datasources/src/object_store/mod.rs | 3 +- .../datasources/src/object_store/registry.rs | 36 +++ crates/datasources/src/object_store/s3.rs | 3 + crates/glaredb/src/local.rs | 22 +- crates/sqlexec/src/context.rs | 3 + 11 files changed, 188 insertions(+), 462 deletions(-) create mode 100644 crates/datasources/src/object_store/csv.rs delete mode 100644 crates/datasources/src/object_store/csv/csv_helper.rs delete mode 100644 crates/datasources/src/object_store/csv/mod.rs create mode 100644 crates/datasources/src/object_store/registry.rs diff --git a/crates/datasources/src/object_store/csv.rs b/crates/datasources/src/object_store/csv.rs new file mode 100644 index 000000000..1f0c71765 --- /dev/null +++ b/crates/datasources/src/object_store/csv.rs @@ -0,0 +1,106 @@ +//! Helpers for handling csv files. + +use std::any::Any; +use std::sync::Arc; + +use async_trait::async_trait; +use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; +use datafusion::datasource::file_format::csv::CsvFormat; +use datafusion::datasource::file_format::file_type::FileCompressionType; +use datafusion::datasource::file_format::FileFormat; +use datafusion::datasource::object_store::ObjectStoreUrl; +use datafusion::datasource::TableProvider; +use datafusion::error::Result as DatafusionResult; +use datafusion::execution::context::SessionState; +use datafusion::logical_expr::TableType; +use datafusion::physical_plan::file_format::{CsvExec as DfCsvExec, FileScanConfig}; +use datafusion::physical_plan::{ExecutionPlan, Statistics}; +use datafusion::prelude::{Expr, SessionContext}; + +use crate::object_store::errors::Result; +use crate::object_store::TableAccessor; + +/// Table provider for csv table +pub struct CsvTableProvider +where + T: TableAccessor, +{ + pub(crate) accessor: T, + /// Schema for csv file + pub(crate) arrow_schema: ArrowSchemaRef, +} + +impl CsvTableProvider +where + T: TableAccessor, +{ + pub async fn from_table_accessor(accessor: T) -> Result> { + let store = accessor.store(); + let location = [accessor.object_meta().as_ref().clone()]; + // TODO infer schema without generating unused session context/state + let csv_format = CsvFormat::default(); + let session_ctx = SessionContext::new(); + let state = session_ctx.state(); + let arrow_schema = csv_format.infer_schema(&state, store, &location).await?; + Ok(CsvTableProvider { + accessor, + arrow_schema, + }) + } +} + +#[async_trait] +impl TableProvider for CsvTableProvider +where + T: TableAccessor + 'static, +{ + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> ArrowSchemaRef { + self.arrow_schema.clone() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _ctx: &SessionState, + projection: Option<&Vec>, + _filters: &[Expr], + limit: Option, + ) -> DatafusionResult> { + let file = self.accessor.object_meta().as_ref().clone(); + let base_url = self.accessor.location(); + + // This config is setup to make use of `FileStream` to stream from csv files in + // datafusion + let base_config = FileScanConfig { + // `store` in `CsvExec` will be used instead of the datafusion object store registry. + object_store_url: ObjectStoreUrl::parse(&base_url).unwrap(), + file_schema: self.arrow_schema.clone(), + file_groups: vec![vec![file.into()]], + statistics: Statistics::default(), + projection: projection.cloned(), + limit, + table_partition_cols: Vec::new(), + output_ordering: Vec::new(), + infinite_source: false, + }; + // Assume csv has a header + let has_header = true; + let exec = DfCsvExec::new( + base_config, + has_header, + DEFAULT_DELIMITER, + DEFAULT_FILE_COMPRESSION_TYPE, + ); + Ok(Arc::new(exec)) + } +} +const DEFAULT_DELIMITER: u8 = b','; +const DEFAULT_BATCH_SIZE: usize = 8192; +const DEFAULT_FILE_COMPRESSION_TYPE: FileCompressionType = FileCompressionType::UNCOMPRESSED; diff --git a/crates/datasources/src/object_store/csv/csv_helper.rs b/crates/datasources/src/object_store/csv/csv_helper.rs deleted file mode 100644 index a7e7a74f4..000000000 --- a/crates/datasources/src/object_store/csv/csv_helper.rs +++ /dev/null @@ -1,220 +0,0 @@ -//! Helpers for handling csv files from datafusion. - -use std::collections::VecDeque; -use std::sync::Arc; - -use bytes::{Buf, Bytes}; -use datafusion::arrow::csv; -use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datafusion::datasource::file_format::file_type::FileCompressionType; -use datafusion::error::{DataFusionError, Result as DatafusionResult}; -use datafusion::physical_plan::file_format::{FileMeta, FileOpenFuture, FileOpener}; -use futures::{Stream, StreamExt, TryStreamExt}; -use object_store::{GetResult, ObjectStore}; - -#[derive(Debug, Clone)] -pub struct CsvConfig { - pub batch_size: usize, - pub file_schema: ArrowSchemaRef, - pub file_projection: Option>, - pub has_header: bool, - pub delimiter: u8, - pub object_store: Arc, -} - -impl CsvConfig { - fn open(&self, reader: R, first_chunk: bool) -> csv::Reader { - let mut builder = csv::ReaderBuilder::new(self.file_schema.clone()) - .has_header(self.has_header && first_chunk) - .with_delimiter(self.delimiter) - .with_batch_size(self.batch_size); - - if let Some(projection) = &self.file_projection { - builder = builder.with_projection(projection.clone()); - } - - // NB: This function never errors. If it ever does (in future after DF - // changes), this is a programming error and panic-ing is the correct - // thing to do here. - builder.build(reader).expect("should be a valid csv reader") - } -} - -pub struct CsvOpener { - pub config: CsvConfig, - pub file_compression_type: FileCompressionType, -} - -impl FileOpener for CsvOpener { - fn open(&self, file_meta: FileMeta) -> DatafusionResult { - let config = self.config.clone(); - let file_compression_type = self.file_compression_type.to_owned(); - Ok(Box::pin(async move { - match config.object_store.get(file_meta.location()).await? { - GetResult::File(file, _) => { - let decoder = file_compression_type.convert_read(file)?; - Ok(futures::stream::iter(config.open(decoder, true)).boxed()) - } - GetResult::Stream(s) => { - let mut first_chunk = true; - let s = s.map_err(Into::::into); - let decoder = file_compression_type.convert_stream(Box::pin(s))?; - Ok(newline_delimited_stream(decoder) - .map_ok(move |bytes| { - let reader = config.open(bytes.reader(), first_chunk); - first_chunk = false; - futures::stream::iter(reader) - }) - .try_flatten() - .boxed()) - } - } - })) - } -} - -/// Given a [`Stream`] of [`Bytes`] returns a [`Stream`] where each -/// yielded [`Bytes`] contains a whole number of new line delimited records -/// accounting for `\` style escapes and `"` quotes -pub fn newline_delimited_stream(s: S) -> impl Stream> -where - S: Stream> + Unpin, -{ - let delimiter = LineDelimiter::new(); - - futures::stream::unfold( - (s, delimiter, false), - |(mut s, mut delimiter, mut exhausted)| async move { - loop { - if let Some(next) = delimiter.next() { - return Some((Ok(next), (s, delimiter, exhausted))); - } else if exhausted { - return None; - } - - match s.next().await { - Some(Ok(bytes)) => delimiter.push(bytes), - Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))), - None => { - exhausted = true; - match delimiter.finish() { - Ok(true) => return None, - Ok(false) => continue, - Err(e) => return Some((Err(e), (s, delimiter, exhausted))), - } - } - } - } - }, - ) -} - -/// The ASCII encoding of `"` -const QUOTE: u8 = b'"'; - -/// The ASCII encoding of `\n` -const NEWLINE: u8 = b'\n'; - -/// The ASCII encoding of `\` -const ESCAPE: u8 = b'\\'; - -/// [`LineDelimiter`] is provided with a stream of [`Bytes`] and returns an -/// iterator of [`Bytes`] containing a whole number of new line delimited -/// records -#[derive(Debug, Default)] -struct LineDelimiter { - /// Complete chunks of [`Bytes`] - complete: VecDeque, - /// Remainder bytes that form the next record - remainder: Vec, - /// True if the last character was the escape character - is_escape: bool, - /// True if currently processing a quoted string - is_quote: bool, -} - -impl LineDelimiter { - /// Creates a new [`LineDelimiter`] with the provided delimiter - fn new() -> Self { - Self::default() - } - - /// Adds the next set of [`Bytes`] - fn push(&mut self, val: impl Into) { - let val: Bytes = val.into(); - - let is_escape = &mut self.is_escape; - let is_quote = &mut self.is_quote; - let mut record_ends = val.iter().enumerate().filter_map(|(idx, v)| { - if *is_escape { - *is_escape = false; - None - } else if *v == ESCAPE { - *is_escape = true; - None - } else if *v == QUOTE { - *is_quote = !*is_quote; - None - } else if *is_quote { - None - } else { - (*v == NEWLINE).then_some(idx + 1) - } - }); - - let start_offset = match self.remainder.is_empty() { - true => 0, - false => match record_ends.next() { - Some(idx) => { - self.remainder.extend_from_slice(&val[0..idx]); - self.complete - .push_back(Bytes::from(std::mem::take(&mut self.remainder))); - idx - } - None => { - self.remainder.extend_from_slice(&val); - return; - } - }, - }; - let end_offset = record_ends.last().unwrap_or(start_offset); - if start_offset != end_offset { - self.complete.push_back(val.slice(start_offset..end_offset)); - } - - if end_offset != val.len() { - self.remainder.extend_from_slice(&val[end_offset..]) - } - } - - /// Marks the end of the stream, delimiting any remaining bytes - /// - /// Returns `true` if there is no remaining data to be read - fn finish(&mut self) -> DatafusionResult { - if !self.remainder.is_empty() { - if self.is_quote { - return Err(DataFusionError::Execution( - "encountered unterminated string".to_string(), - )); - } - - if self.is_escape { - return Err(DataFusionError::Execution( - "encountered trailing escape character".to_string(), - )); - } - - self.complete - .push_back(Bytes::from(std::mem::take(&mut self.remainder))) - } - Ok(self.complete.is_empty()) - } -} - -impl Iterator for LineDelimiter { - type Item = Bytes; - - fn next(&mut self) -> Option { - self.complete.pop_front() - } -} diff --git a/crates/datasources/src/object_store/csv/mod.rs b/crates/datasources/src/object_store/csv/mod.rs deleted file mode 100644 index 3236ea63d..000000000 --- a/crates/datasources/src/object_store/csv/mod.rs +++ /dev/null @@ -1,226 +0,0 @@ -//! Helpers for handling csv files. - -mod csv_helper; - -use std::any::Any; -use std::fmt; -use std::sync::Arc; - -use async_trait::async_trait; -use datafusion::arrow::datatypes::SchemaRef as ArrowSchemaRef; -use datafusion::datasource::file_format::csv::CsvFormat; -use datafusion::datasource::file_format::file_type::FileCompressionType; -use datafusion::datasource::file_format::FileFormat; -use datafusion::datasource::object_store::ObjectStoreUrl; -use datafusion::datasource::TableProvider; -use datafusion::error::{DataFusionError, Result as DatafusionResult}; -use datafusion::execution::context::{SessionState, TaskContext}; -use datafusion::logical_expr::TableType; -use datafusion::physical_expr::PhysicalSortExpr; -use datafusion::physical_plan::file_format::{FileScanConfig, FileStream}; -use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; -use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, -}; -use datafusion::prelude::{Expr, SessionContext}; -use object_store::ObjectStore; - -use crate::object_store::csv::csv_helper::{CsvConfig, CsvOpener}; -use crate::object_store::errors::Result; -use crate::object_store::TableAccessor; - -/// Table provider for csv table -pub struct CsvTableProvider -where - T: TableAccessor, -{ - pub(crate) accessor: T, - /// Schema for csv file - pub(crate) arrow_schema: ArrowSchemaRef, -} - -impl CsvTableProvider -where - T: TableAccessor, -{ - pub async fn from_table_accessor(accessor: T) -> Result> { - let store = accessor.store(); - let location = [accessor.object_meta().as_ref().clone()]; - - // TODO infer schema without generating unused session context/state - let csv_format = CsvFormat::default(); - let session_ctx = SessionContext::new(); - let state = session_ctx.state(); - let arrow_schema = csv_format.infer_schema(&state, store, &location).await?; - - Ok(CsvTableProvider { - accessor, - arrow_schema, - }) - } -} - -#[async_trait] -impl TableProvider for CsvTableProvider -where - T: TableAccessor + 'static, -{ - fn as_any(&self) -> &dyn Any { - self - } - - fn schema(&self) -> ArrowSchemaRef { - self.arrow_schema.clone() - } - - fn table_type(&self) -> TableType { - TableType::View - } - - async fn scan( - &self, - _ctx: &SessionState, - projection: Option<&Vec>, - _filters: &[Expr], - limit: Option, - ) -> DatafusionResult> { - let file = self.accessor.object_meta().as_ref().clone().into(); - - // This config is setup to make use of `FileStream` to stream from csv files in - // datafusion - let base_config = FileScanConfig { - // `store` in `CsvExec` will be used instead of the datafusion object store registry. - object_store_url: ObjectStoreUrl::local_filesystem(), - file_schema: self.arrow_schema.clone(), - file_groups: vec![vec![file]], - statistics: Statistics::default(), - projection: projection.cloned(), - limit, - table_partition_cols: Vec::new(), - output_ordering: Vec::new(), - infinite_source: false, - }; - - // Assume csv has a header - let has_header = true; - - // Project the schema. - let projected_schema = match projection { - Some(projection) => Arc::new(self.arrow_schema.project(projection)?), - None => self.arrow_schema.clone(), - }; - - let exec = CsvExec { - base_config, - has_header, - delimiter: CsvExec::DEFAULT_DELIMITER, - file_compression_type: CsvExec::DEFAULT_FILE_COMPRESSION_TYPE, - arrow_schema: self.arrow_schema.clone(), - projection: projection.cloned(), - projected_schema, - store: self.accessor.store().clone(), - }; - - Ok(Arc::new(exec)) - } -} - -// /// Execution plan for scanning a CSV file -#[derive(Debug, Clone)] -struct CsvExec { - base_config: FileScanConfig, - delimiter: u8, - has_header: bool, - file_compression_type: FileCompressionType, - arrow_schema: ArrowSchemaRef, - projection: Option>, - projected_schema: ArrowSchemaRef, - store: Arc, -} - -impl CsvExec { - const DEFAULT_DELIMITER: u8 = b','; - const DEFAULT_BATCH_SIZE: usize = 8192; - const DEFAULT_FILE_COMPRESSION_TYPE: FileCompressionType = FileCompressionType::UNCOMPRESSED; -} - -impl ExecutionPlan for CsvExec { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - /// Get the schema for this execution plan - fn schema(&self) -> ArrowSchemaRef { - self.projected_schema.clone() - } - - /// Get the output partitioning of this plan - fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(1) - } - - /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't - /// be - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn children(&self) -> Vec> { - Vec::new() - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> DatafusionResult> { - Err(DataFusionError::Execution( - "cannot replace children for CsvExec".to_string(), - )) - } - - fn execute( - &self, - partition: usize, - _context: Arc, - ) -> DatafusionResult { - let config = CsvConfig { - batch_size: Self::DEFAULT_BATCH_SIZE, - file_schema: self.arrow_schema.clone(), - file_projection: self.projection.clone(), - has_header: self.has_header, - delimiter: self.delimiter, - object_store: self.store.clone(), - }; - - let opener = CsvOpener { - config, - file_compression_type: self.file_compression_type.to_owned(), - }; - - let stream = FileStream::new( - &self.base_config, - partition, - opener, - &ExecutionPlanMetricsSet::new(), - )?; - Ok(Box::pin(stream) as SendableRecordBatchStream) - } - - fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - let files = self - .base_config - .file_groups - .iter() - .flatten() - .map(|f| f.object_meta.location.as_ref()) - .collect::>() - .join(", "); - - write!(f, "CsvExec: files={files}") - } - - fn statistics(&self) -> Statistics { - Statistics::default() - } -} diff --git a/crates/datasources/src/object_store/gcs.rs b/crates/datasources/src/object_store/gcs.rs index 8b47d6d4b..c0497bac2 100644 --- a/crates/datasources/src/object_store/gcs.rs +++ b/crates/datasources/src/object_store/gcs.rs @@ -57,6 +57,9 @@ pub struct GcsAccessor { #[async_trait::async_trait] impl TableAccessor for GcsAccessor { + fn location(&self) -> String { + self.meta.location.to_string() + } fn store(&self) -> &Arc { &self.store } diff --git a/crates/datasources/src/object_store/http.rs b/crates/datasources/src/object_store/http.rs index 9d299b963..f5e7405e6 100644 --- a/crates/datasources/src/object_store/http.rs +++ b/crates/datasources/src/object_store/http.rs @@ -10,34 +10,36 @@ use super::{ csv::CsvTableProvider, json::JsonTableProvider, parquet::ParquetTableProvider, FileType, }; -#[derive(Debug, Clone)] -pub struct HttpTableAccess { - /// Public url where the file is hosted. - pub url: String, -} - #[derive(Debug)] pub struct HttpAccessor { pub store: Arc, pub meta: Arc, pub file_type: FileType, + base_url: String, } impl HttpAccessor { pub async fn try_new(url: String, file_type: FileType) -> Result { + let url = url::Url::parse(&url).unwrap(); let meta = object_meta_from_head(&url).await?; let builder = object_store::http::HttpBuilder::new(); - let store = builder.with_url(url).build()?; + let url = format!("{}://{}", url.scheme(), url.authority()); + let store = builder.with_url(url.clone()).build()?; + Ok(Self { store: Arc::new(store), meta: Arc::new(meta), file_type, + base_url: url, }) } } #[async_trait::async_trait] impl TableAccessor for HttpAccessor { + fn location(&self) -> String { + self.base_url.clone() + } fn store(&self) -> &Arc { &self.store } @@ -63,19 +65,18 @@ impl TableAccessor for HttpAccessor { /// /// We avoid using object store's `head` method since it does a PROPFIND /// request. -async fn object_meta_from_head(url: &str) -> Result { +async fn object_meta_from_head(url: &url::Url) -> Result { use object_store::path::Path as ObjectStorePath; - let url = url::Url::parse(url).unwrap(); - let res = reqwest::Client::new().head(url).send().await?; + let res = reqwest::Client::new().head(url.clone()).send().await?; let len = res.content_length().ok_or(ObjectStoreSourceError::Static( "Missing content-length header", ))?; Ok(ObjectMeta { - location: ObjectStorePath::default(), - last_modified: DateTime::::MIN_UTC, + location: ObjectStorePath::from_url_path(url.path()).unwrap(), + last_modified: Utc::now(), size: len as usize, }) } diff --git a/crates/datasources/src/object_store/local.rs b/crates/datasources/src/object_store/local.rs index ec4288668..2436d2c60 100644 --- a/crates/datasources/src/object_store/local.rs +++ b/crates/datasources/src/object_store/local.rs @@ -39,6 +39,9 @@ pub struct LocalAccessor { #[async_trait::async_trait] impl TableAccessor for LocalAccessor { + fn location(&self) -> String { + self.meta.location.to_string() + } fn store(&self) -> &Arc { &self.store } diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index 2c791baf6..07c2e5500 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -16,7 +16,7 @@ pub mod http; pub mod local; pub mod parquet; pub mod s3; - +pub mod registry; mod csv; mod json; @@ -46,6 +46,7 @@ pub trait TableAccessor: Send + Sync { fn store(&self) -> &Arc; fn object_meta(&self) -> &Arc; + fn location(&self) -> String; async fn into_table_provider(self, predicate_pushdown: bool) -> Result>; } diff --git a/crates/datasources/src/object_store/registry.rs b/crates/datasources/src/object_store/registry.rs new file mode 100644 index 000000000..d62983dc7 --- /dev/null +++ b/crates/datasources/src/object_store/registry.rs @@ -0,0 +1,36 @@ +use std::sync::Arc; + +use datafusion::execution::object_store::ObjectStoreRegistry; +use url::Url; + +#[derive(Debug)] +pub struct GlareDBRegistry; +impl ObjectStoreRegistry for GlareDBRegistry { + fn register_store( + &self, + url: &Url, + store: Arc, + ) -> Option> { + println!("register_store: {:?}", url); + None + } + + fn get_store( + &self, + url: &Url, + ) -> datafusion::error::Result> { + match url.scheme() { + "s3" => todo!(), + "http" | "https" => { + let url = format!("{}://{}", url.scheme(), url.authority()); + + let os = object_store::http::HttpBuilder::new() + .with_url(&url) + .build()?; + + Ok(Arc::new(os)) + } + _ => todo!(), + } + } +} diff --git a/crates/datasources/src/object_store/s3.rs b/crates/datasources/src/object_store/s3.rs index 22a9df821..64e36ecf6 100644 --- a/crates/datasources/src/object_store/s3.rs +++ b/crates/datasources/src/object_store/s3.rs @@ -63,6 +63,9 @@ pub struct S3Accessor { #[async_trait::async_trait] impl TableAccessor for S3Accessor { + fn location(&self) -> String { + self.meta.location.to_string() + } fn store(&self) -> &Arc { &self.store } diff --git a/crates/glaredb/src/local.rs b/crates/glaredb/src/local.rs index 7b0e5b148..681adfd86 100644 --- a/crates/glaredb/src/local.rs +++ b/crates/glaredb/src/local.rs @@ -183,7 +183,13 @@ impl LocalSession { } }, Ok(Signal::CtrlD) => break, - Ok(Signal::CtrlC) => continue, + Ok(Signal::CtrlC) => { + if scratch.is_empty() { + break; + } else { + scratch.clear(); + } + } Err(e) => { return Err(anyhow!("Unable to read from prompt: {e}")); } @@ -204,8 +210,19 @@ impl LocalSession { } const UNNAMED: String = String::new(); - + // let lp = self.sess.sql_to_lp(text).await?; + // println!("converted to lp"); + // if let sqlexec::LogicalPlan::Query(query) = lp { + // let physical_plan = self.sess.create_physical_plan(query).await?; + + // println!("created physical plan: {:#?}", physical_plan); + // let bytes = physical_plan_to_bytes(physical_plan); + // println!("bytes: {:?}", bytes); + // } else { + // todo!() + // } let statements = parser::parse_sql(text)?; + // self.sess.inner.ctx. for stmt in statements { self.sess .prepare_statement(UNNAMED, Some(stmt), Vec::new()) @@ -228,7 +245,6 @@ impl LocalSession { other => println!("{:?}", other), } } - Ok(()) } diff --git a/crates/sqlexec/src/context.rs b/crates/sqlexec/src/context.rs index 0ff3104ea..5572c1f73 100644 --- a/crates/sqlexec/src/context.rs +++ b/crates/sqlexec/src/context.rs @@ -21,6 +21,7 @@ use datafusion::logical_expr::{Expr as DfExpr, LogicalPlanBuilder as DfLogicalPl use datafusion::scalar::ScalarValue; use datafusion::sql::TableReference; use datasources::native::access::NativeTableStorage; +use datasources::object_store::registry::GlareDBRegistry; use futures::future::BoxFuture; use metastore::builtins::POSTGRES_SCHEMA; use metastore::builtins::{CURRENT_SESSION_SCHEMA, DEFAULT_CATALOG}; @@ -112,6 +113,8 @@ impl SessionContext { // Create a new datafusion runtime env with disk manager and memory pool // if needed. let mut conf = RuntimeConfig::default(); + conf = conf.with_object_store_registry(Arc::new(GlareDBRegistry)); + if let Some(spill_path) = spill_path { conf = conf.with_disk_manager(DiskManagerConfig::NewSpecified(vec![spill_path])); } From c1971a1ebdc44f9ba9cb5ab11cd4d3c743419dd7 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 6 Jul 2023 14:27:03 -0500 Subject: [PATCH 2/8] wip --- Cargo.lock | 2 + crates/datasources/Cargo.toml | 1 + crates/datasources/src/object_store/csv.rs | 5 +- crates/datasources/src/object_store/gcs.rs | 31 ++++++-- .../datasources/src/object_store/registry.rs | 70 ++++++++++++++----- crates/glaredb/src/local.rs | 11 ++- crates/sqlbuiltins/src/functions.rs | 7 ++ crates/sqlexec/Cargo.toml | 1 + crates/sqlexec/src/context.rs | 3 +- crates/sqlexec/src/lib.rs | 1 + crates/sqlexec/src/planner/session_planner.rs | 12 ++++ crates/sqlexec/src/session.rs | 27 ++++++- 12 files changed, 137 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2450dc7dd..54a739ce0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2014,6 +2014,7 @@ dependencies = [ "bitvec", "bytes", "chrono", + "dashmap", "datafusion", "decimal", "deltalake", @@ -5920,6 +5921,7 @@ dependencies = [ "tonic", "tower", "tracing", + "url", "uuid", ] diff --git a/crates/datasources/Cargo.toml b/crates/datasources/Cargo.toml index 615eaf594..14950c188 100644 --- a/crates/datasources/Cargo.toml +++ b/crates/datasources/Cargo.toml @@ -47,6 +47,7 @@ tracing = "0.1" uuid = "1.4.0" url.workspace = true webpki-roots = "0.24.0" +dashmap = "5.4.0" # SSH tunnels [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dependencies] diff --git a/crates/datasources/src/object_store/csv.rs b/crates/datasources/src/object_store/csv.rs index 1f0c71765..ec03a8673 100644 --- a/crates/datasources/src/object_store/csv.rs +++ b/crates/datasources/src/object_store/csv.rs @@ -75,12 +75,15 @@ where ) -> DatafusionResult> { let file = self.accessor.object_meta().as_ref().clone(); let base_url = self.accessor.location(); + println!("---base_url----: {}", base_url); + println!("meta = {:#?}", self.accessor.object_meta()); // This config is setup to make use of `FileStream` to stream from csv files in // datafusion let base_config = FileScanConfig { // `store` in `CsvExec` will be used instead of the datafusion object store registry. - object_store_url: ObjectStoreUrl::parse(&base_url).unwrap(), + object_store_url: ObjectStoreUrl::parse(&base_url) + .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()), file_schema: self.arrow_schema.clone(), file_groups: vec![vec![file.into()]], statistics: Statistics::default(), diff --git a/crates/datasources/src/object_store/gcs.rs b/crates/datasources/src/object_store/gcs.rs index c0497bac2..63ab5a88c 100644 --- a/crates/datasources/src/object_store/gcs.rs +++ b/crates/datasources/src/object_store/gcs.rs @@ -41,9 +41,23 @@ impl GcsTableAccess { pub fn store_and_path(&self) -> Result<(Arc, ObjectStorePath)> { let store = self.builder().build()?; - let location = ObjectStorePath::from(self.location.as_str()); + + let location = ObjectStorePath::from_url_path(&self.bucket_name).unwrap(); Ok((Arc::new(store), location)) } + + pub fn store(&self) -> Result> { + let store = self.builder().build()?; + Ok(Arc::new(store)) + } + + pub fn location(&self) -> ObjectStorePath { + ObjectStorePath::from_url_path(&self.location).unwrap() + } + + pub fn base_location(&self) -> ObjectStorePath { + ObjectStorePath::from_url_path(&self.bucket_name).unwrap() + } } #[derive(Debug)] @@ -53,12 +67,13 @@ pub struct GcsAccessor { /// Meta information for location/object pub meta: Arc, pub file_type: FileType, + base_url: String, } #[async_trait::async_trait] impl TableAccessor for GcsAccessor { fn location(&self) -> String { - self.meta.location.to_string() + format!("gs://{}", self.base_url) } fn store(&self) -> &Arc { &self.store @@ -76,6 +91,7 @@ impl TableAccessor for GcsAccessor { FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(self).await?), FileType::Json => Arc::new(JsonTableProvider::from_table_accessor(self).await?), }; + Ok(table_provider) } } @@ -83,23 +99,28 @@ impl TableAccessor for GcsAccessor { impl GcsAccessor { /// Setup accessor for GCS pub async fn new(access: GcsTableAccess) -> Result { - let (store, location) = access.store_and_path()?; + let store = access.store()?; // Use provided file type or infer from location + let location = access.location(); + + println!("location: {}", location); let file_type = access.file_type.unwrap_or(file_type_from_path(&location)?); trace!(?location, ?file_type, "location and file type"); let meta = Arc::new(store.head(&location).await?); - + println!("meta: {:?}", meta); Ok(Self { store, meta, file_type, + base_url: access.bucket_name, }) } pub async fn validate_table_access(access: GcsTableAccess) -> Result<()> { let store = Arc::new(access.builder().build()?); - let location = ObjectStorePath::from(access.location); + let location = access.location(); + store.head(&location).await?; Ok(()) } diff --git a/crates/datasources/src/object_store/registry.rs b/crates/datasources/src/object_store/registry.rs index d62983dc7..8548803a2 100644 --- a/crates/datasources/src/object_store/registry.rs +++ b/crates/datasources/src/object_store/registry.rs @@ -1,36 +1,70 @@ use std::sync::Arc; +use dashmap::DashMap; use datafusion::execution::object_store::ObjectStoreRegistry; +use object_store::{local::LocalFileSystem, ObjectStore}; use url::Url; #[derive(Debug)] -pub struct GlareDBRegistry; +pub struct GlareDBRegistry { + object_stores: DashMap>, +} + +impl Default for GlareDBRegistry { + fn default() -> Self { + Self::new() + } +} +impl GlareDBRegistry { + pub fn new() -> Self { + let object_stores: DashMap> = DashMap::new(); + + Self { object_stores } + } +} + impl ObjectStoreRegistry for GlareDBRegistry { fn register_store( &self, url: &Url, - store: Arc, - ) -> Option> { - println!("register_store: {:?}", url); - None + store: Arc, + ) -> Option> { + println!("register_store for : {}", url); + self.object_stores.insert(url.clone(), store) } - fn get_store( - &self, - url: &Url, - ) -> datafusion::error::Result> { - match url.scheme() { - "s3" => todo!(), - "http" | "https" => { - let url = format!("{}://{}", url.scheme(), url.authority()); + fn get_store(&self, url: &Url) -> datafusion::error::Result> { + if let Some(store) = self.object_stores.get(url) { + println!("found store for : {}", url); + + return Ok(store.clone()); + } else { + match url.scheme() { + "s3" => { + let store = object_store::aws::AmazonS3Builder::new() + .with_url(url.as_str()) + .build()?; + Ok(Arc::new(store)) + } + "gs" => { + let store = object_store::gcp::GoogleCloudStorageBuilder::new() + .with_url(url.as_str()) + .build()?; + Ok(Arc::new(store)) + } - let os = object_store::http::HttpBuilder::new() - .with_url(&url) - .build()?; + "http" | "https" => { + let os = object_store::http::HttpBuilder::new() + .with_url(url.as_str()) + .build()?; - Ok(Arc::new(os)) + Ok(Arc::new(os)) + } + "file" => Ok(Arc::new(LocalFileSystem::new())), + _ => { + todo!() + } } - _ => todo!(), } } } diff --git a/crates/glaredb/src/local.rs b/crates/glaredb/src/local.rs index 681adfd86..3dbb02302 100644 --- a/crates/glaredb/src/local.rs +++ b/crates/glaredb/src/local.rs @@ -211,16 +211,13 @@ impl LocalSession { const UNNAMED: String = String::new(); // let lp = self.sess.sql_to_lp(text).await?; - // println!("converted to lp"); + // // println!("converted to lp: {:#?}", lp); // if let sqlexec::LogicalPlan::Query(query) = lp { // let physical_plan = self.sess.create_physical_plan(query).await?; - // println!("created physical plan: {:#?}", physical_plan); - // let bytes = physical_plan_to_bytes(physical_plan); - // println!("bytes: {:?}", bytes); - // } else { - // todo!() - // } + // // println!("created physical plan: {:#?}", physical_plan); + // }; + let statements = parser::parse_sql(text)?; // self.sess.inner.ctx. for stmt in statements { diff --git a/crates/sqlbuiltins/src/functions.rs b/crates/sqlbuiltins/src/functions.rs index fc7f13182..ee72f76de 100644 --- a/crates/sqlbuiltins/src/functions.rs +++ b/crates/sqlbuiltins/src/functions.rs @@ -15,6 +15,7 @@ use datasources::common::listing::VirtualLister; use datasources::debug::DebugVirtualLister; use datasources::mongodb::{MongoAccessor, MongoTableAccessInfo}; use datasources::mysql::{MysqlAccessor, MysqlTableAccess}; +use datasources::object_store::gcs::GcsTableAccess; use datasources::object_store::http::HttpAccessor; use datasources::object_store::local::{LocalAccessor, LocalTableAccess}; use datasources::object_store::{FileType, TableAccessor}; @@ -26,6 +27,7 @@ use metastoreproto::types::options::{ DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsMongo, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, }; +use object_store::gcp::GoogleCloudStorageBuilder; use once_cell::sync::Lazy; use std::collections::HashMap; use std::pin::Pin; @@ -579,6 +581,11 @@ async fn create_provider_for_filetype( .into_table_provider(false) .await .map_err(|e| BuiltinError::Access(Box::new(e)))?, + Ok("gs") => { + + + todo!() + }, // no scheme so we assume it's a local file _ => { let location = url_string diff --git a/crates/sqlexec/Cargo.toml b/crates/sqlexec/Cargo.toml index 8c7b53480..a44afd288 100644 --- a/crates/sqlexec/Cargo.toml +++ b/crates/sqlexec/Cargo.toml @@ -29,6 +29,7 @@ regex = "1.8" tonic = { version = "0.9", features = ["transport", "tls", "tls-roots"] } tokio-postgres = "0.7.8" once_cell = "1.18.0" +url.workspace = true [dev-dependencies] tempfile = "3" diff --git a/crates/sqlexec/src/context.rs b/crates/sqlexec/src/context.rs index 5572c1f73..954eb5892 100644 --- a/crates/sqlexec/src/context.rs +++ b/crates/sqlexec/src/context.rs @@ -113,8 +113,7 @@ impl SessionContext { // Create a new datafusion runtime env with disk manager and memory pool // if needed. let mut conf = RuntimeConfig::default(); - conf = conf.with_object_store_registry(Arc::new(GlareDBRegistry)); - + conf = conf.with_object_store_registry(Arc::new(GlareDBRegistry::new())); if let Some(spill_path) = spill_path { conf = conf.with_disk_manager(DiskManagerConfig::NewSpecified(vec![spill_path])); } diff --git a/crates/sqlexec/src/lib.rs b/crates/sqlexec/src/lib.rs index 0da2ef71b..80f5df288 100644 --- a/crates/sqlexec/src/lib.rs +++ b/crates/sqlexec/src/lib.rs @@ -11,6 +11,7 @@ mod functions; mod metrics; mod planner; mod vars; +pub use planner::logical_plan::LogicalPlan; pub mod export { pub use sqlparser; diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index cf6bfd51e..c27c08545 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -444,6 +444,7 @@ impl<'a> SessionPlanner<'a> { let bucket = m.remove_required("bucket")?; let location = m.remove_required("location")?; + let url = format!("gs://{bucket}"); let access = GcsTableAccess { bucket_name: bucket, @@ -458,6 +459,17 @@ impl<'a> SessionPlanner<'a> { source: Box::new(e), })?; + let url = url::Url::parse(&url).unwrap(); + + let store = access.store().unwrap(); + let base_location = access.base_location(); + + println!("registering object store for {}", url); + self.ctx + .get_df_state() + .runtime_env() + .register_object_store(&url, store); + TableOptions::Gcs(TableOptionsGcs { service_account_key: access.service_acccount_key_json, bucket: access.bucket_name, diff --git a/crates/sqlexec/src/session.rs b/crates/sqlexec/src/session.rs index 1777ced21..17105842c 100644 --- a/crates/sqlexec/src/session.rs +++ b/crates/sqlexec/src/session.rs @@ -198,7 +198,7 @@ impl Session { } /// Create a physical plan for a given datafusion logical plan. - pub(crate) async fn create_physical_plan( + pub async fn create_physical_plan( &self, plan: DfLogicalPlan, ) -> Result> { @@ -661,4 +661,29 @@ impl Session { } } } + + pub async fn sql_to_lp(&mut self, query: &str) -> Result { + const UNNAMED: String = String::new(); + + let mut statements = crate::parser::parse_sql(query)?; + match statements.len() { + 0 => todo!(), + 1 => { + let stmt = statements.pop_front().unwrap(); + self.prepare_statement(UNNAMED, Some(stmt), Vec::new()) + .await?; + let prepared = self.get_prepared_statement(&UNNAMED)?; + let num_fields = prepared.output_fields().map(|f| f.len()).unwrap_or(0); + self.bind_statement( + UNNAMED, + &UNNAMED, + Vec::new(), + vec![Format::Text; num_fields], + )?; + let portal = self.ctx.get_portal(&UNNAMED)?.clone(); + Ok(portal.stmt.plan.unwrap()) + } + _ => todo!(), + } + } } From 1e78cecf659c76a14fad3b140eabedaf85584e49 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 6 Jul 2023 14:36:13 -0500 Subject: [PATCH 3/8] wip --- .../datasources/src/object_store/registry.rs | 59 +++++++------------ crates/glaredb/src/local.rs | 5 +- crates/sqlexec/src/planner/session_planner.rs | 2 +- 3 files changed, 25 insertions(+), 41 deletions(-) diff --git a/crates/datasources/src/object_store/registry.rs b/crates/datasources/src/object_store/registry.rs index 8548803a2..83d637507 100644 --- a/crates/datasources/src/object_store/registry.rs +++ b/crates/datasources/src/object_store/registry.rs @@ -1,13 +1,13 @@ use std::sync::Arc; use dashmap::DashMap; -use datafusion::execution::object_store::ObjectStoreRegistry; +use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry}; use object_store::{local::LocalFileSystem, ObjectStore}; use url::Url; #[derive(Debug)] pub struct GlareDBRegistry { - object_stores: DashMap>, + object_stores: DashMap>, } impl Default for GlareDBRegistry { @@ -17,7 +17,7 @@ impl Default for GlareDBRegistry { } impl GlareDBRegistry { pub fn new() -> Self { - let object_stores: DashMap> = DashMap::new(); + let object_stores: DashMap> = DashMap::new(); Self { object_stores } } @@ -29,42 +29,27 @@ impl ObjectStoreRegistry for GlareDBRegistry { url: &Url, store: Arc, ) -> Option> { - println!("register_store for : {}", url); - self.object_stores.insert(url.clone(), store) + let s = get_url_key(url); + self.object_stores.insert(s, store) } fn get_store(&self, url: &Url) -> datafusion::error::Result> { - if let Some(store) = self.object_stores.get(url) { - println!("found store for : {}", url); - - return Ok(store.clone()); - } else { - match url.scheme() { - "s3" => { - let store = object_store::aws::AmazonS3Builder::new() - .with_url(url.as_str()) - .build()?; - Ok(Arc::new(store)) - } - "gs" => { - let store = object_store::gcp::GoogleCloudStorageBuilder::new() - .with_url(url.as_str()) - .build()?; - Ok(Arc::new(store)) - } - - "http" | "https" => { - let os = object_store::http::HttpBuilder::new() - .with_url(url.as_str()) - .build()?; - - Ok(Arc::new(os)) - } - "file" => Ok(Arc::new(LocalFileSystem::new())), - _ => { - todo!() - } - } - } + let s = get_url_key(url); + self.object_stores + .get(&s) + .map(|o| o.value().clone()) + .ok_or_else(|| { + DataFusionError::Internal(format!("No suitable object store found for {url}")) + }) } } + +/// Get the key of a url for object store registration. +/// The credential info will be removed +fn get_url_key(url: &Url) -> String { + format!( + "{}://{}", + url.scheme(), + &url[url::Position::BeforeHost..url::Position::AfterPort], + ) +} diff --git a/crates/glaredb/src/local.rs b/crates/glaredb/src/local.rs index 3dbb02302..bfab35dec 100644 --- a/crates/glaredb/src/local.rs +++ b/crates/glaredb/src/local.rs @@ -211,15 +211,14 @@ impl LocalSession { const UNNAMED: String = String::new(); // let lp = self.sess.sql_to_lp(text).await?; - // // println!("converted to lp: {:#?}", lp); + // // // println!("converted to lp: {:#?}", lp); // if let sqlexec::LogicalPlan::Query(query) = lp { // let physical_plan = self.sess.create_physical_plan(query).await?; - // // println!("created physical plan: {:#?}", physical_plan); + // println!("created physical plan: {:#?}", physical_plan); // }; let statements = parser::parse_sql(text)?; - // self.sess.inner.ctx. for stmt in statements { self.sess .prepare_statement(UNNAMED, Some(stmt), Vec::new()) diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index c27c08545..e53af8ddb 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -444,7 +444,7 @@ impl<'a> SessionPlanner<'a> { let bucket = m.remove_required("bucket")?; let location = m.remove_required("location")?; - let url = format!("gs://{bucket}"); + let url = format!("gs://{bucket}/"); let access = GcsTableAccess { bucket_name: bucket, From 5dea2d2bb3b6ba7f301121086ba02b2948485d60 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Thu, 6 Jul 2023 15:20:09 -0500 Subject: [PATCH 4/8] run clippy & linting --- crates/datasources/src/object_store/csv.rs | 2 +- crates/datasources/src/object_store/http.rs | 2 +- crates/datasources/src/object_store/mod.rs | 6 +++--- crates/datasources/src/object_store/registry.rs | 2 +- crates/metastore/src/builtins.rs | 12 ++---------- crates/metastore/src/srv.rs | 1 - crates/metastore/src/storage/lease.rs | 4 ++-- crates/metastoreproto/src/types/catalog.rs | 2 +- crates/sqlbuiltins/src/functions.rs | 16 +++++++++++----- crates/sqlexec/src/planner/session_planner.rs | 2 -- crates/testing/tests/sqllogictests/tests.rs | 2 +- justfile | 10 ++++++++-- 12 files changed, 31 insertions(+), 30 deletions(-) diff --git a/crates/datasources/src/object_store/csv.rs b/crates/datasources/src/object_store/csv.rs index ec03a8673..f888d1134 100644 --- a/crates/datasources/src/object_store/csv.rs +++ b/crates/datasources/src/object_store/csv.rs @@ -104,6 +104,6 @@ where Ok(Arc::new(exec)) } } + const DEFAULT_DELIMITER: u8 = b','; -const DEFAULT_BATCH_SIZE: usize = 8192; const DEFAULT_FILE_COMPRESSION_TYPE: FileCompressionType = FileCompressionType::UNCOMPRESSED; diff --git a/crates/datasources/src/object_store/http.rs b/crates/datasources/src/object_store/http.rs index f5e7405e6..f5bdec735 100644 --- a/crates/datasources/src/object_store/http.rs +++ b/crates/datasources/src/object_store/http.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use crate::object_store::{errors::ObjectStoreSourceError, Result, TableAccessor}; -use chrono::{DateTime, Utc}; +use chrono::Utc; use datafusion::datasource::TableProvider; use object_store::{ObjectMeta, ObjectStore}; diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index 07c2e5500..d7b4d46fd 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -10,15 +10,15 @@ use serde::{Deserialize, Serialize}; use errors::Result; +mod csv; pub mod errors; pub mod gcs; pub mod http; +mod json; pub mod local; pub mod parquet; -pub mod s3; pub mod registry; -mod csv; -mod json; +pub mod s3; #[derive(Debug, Clone, Serialize, Deserialize)] pub enum FileType { diff --git a/crates/datasources/src/object_store/registry.rs b/crates/datasources/src/object_store/registry.rs index 83d637507..916c18502 100644 --- a/crates/datasources/src/object_store/registry.rs +++ b/crates/datasources/src/object_store/registry.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use dashmap::DashMap; use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry}; -use object_store::{local::LocalFileSystem, ObjectStore}; +use object_store::ObjectStore; use url::Url; #[derive(Debug)] diff --git a/crates/metastore/src/builtins.rs b/crates/metastore/src/builtins.rs index 2c3221ca4..4df5bb1f0 100644 --- a/crates/metastore/src/builtins.rs +++ b/crates/metastore/src/builtins.rs @@ -623,11 +623,7 @@ mod tests { fn builtin_unique_view_names() { let mut names = HashSet::new(); for builtin in BuiltinView::builtins() { - let name = format!( - "{}.{}", - builtin.schema.to_string(), - builtin.name.to_string() - ); + let name = format!("{}.{}", builtin.schema, builtin.name); assert!(names.insert(name.clone()), "duplicate name: {}", name); } } @@ -636,11 +632,7 @@ mod tests { fn builtin_unique_table_names() { let mut names = HashSet::new(); for builtin in BuiltinTable::builtins() { - let name = format!( - "{}.{}", - builtin.schema.to_string(), - builtin.name.to_string() - ); + let name = format!("{}.{}", builtin.schema, builtin.name); assert!(names.insert(name.clone()), "duplicate name: {}", name); } } diff --git a/crates/metastore/src/srv.rs b/crates/metastore/src/srv.rs index 1a7356e9c..efa8196d0 100644 --- a/crates/metastore/src/srv.rs +++ b/crates/metastore/src/srv.rs @@ -217,7 +217,6 @@ mod tests { let ent = state .entries .into_values() - .into_iter() .find(|ent| ent.get_meta().name == "test_schema") .unwrap(); assert!(matches!(ent, CatalogEntry::Schema(_))); diff --git a/crates/metastore/src/storage/lease.rs b/crates/metastore/src/storage/lease.rs index 2da04a68d..f267ec468 100644 --- a/crates/metastore/src/storage/lease.rs +++ b/crates/metastore/src/storage/lease.rs @@ -368,7 +368,7 @@ mod tests { let proto: storage::LeaseInformation = lease.into(); let mut bs = BytesMut::new(); proto.encode(&mut bs).unwrap(); - store.put(&path, bs.freeze()).await.unwrap(); + store.put(path, bs.freeze()).await.unwrap(); } async fn get_lease(store: &dyn ObjectStore, path: &ObjectPath) -> LeaseInformation { @@ -529,7 +529,7 @@ mod tests { assert!(lease.is_valid()); - let _ = lease.drop_lease().await.unwrap(); + lease.drop_lease().await.unwrap(); } #[tokio::test] diff --git a/crates/metastoreproto/src/types/catalog.rs b/crates/metastoreproto/src/types/catalog.rs index 73fe24327..d141aa36c 100644 --- a/crates/metastoreproto/src/types/catalog.rs +++ b/crates/metastoreproto/src/types/catalog.rs @@ -519,7 +519,7 @@ mod tests { proptest! { #[test] fn roundtrip_entry_type(expected in any::()) { - let p: catalog::entry_meta::EntryType = expected.clone().into(); + let p: catalog::entry_meta::EntryType = expected.into(); let got: EntryType = p.try_into().unwrap(); assert_eq!(expected, got); } diff --git a/crates/sqlbuiltins/src/functions.rs b/crates/sqlbuiltins/src/functions.rs index ee72f76de..7a05119bd 100644 --- a/crates/sqlbuiltins/src/functions.rs +++ b/crates/sqlbuiltins/src/functions.rs @@ -15,7 +15,6 @@ use datasources::common::listing::VirtualLister; use datasources::debug::DebugVirtualLister; use datasources::mongodb::{MongoAccessor, MongoTableAccessInfo}; use datasources::mysql::{MysqlAccessor, MysqlTableAccess}; -use datasources::object_store::gcs::GcsTableAccess; use datasources::object_store::http::HttpAccessor; use datasources::object_store::local::{LocalAccessor, LocalTableAccess}; use datasources::object_store::{FileType, TableAccessor}; @@ -27,7 +26,6 @@ use metastoreproto::types::options::{ DatabaseOptions, DatabaseOptionsBigQuery, DatabaseOptionsMongo, DatabaseOptionsMysql, DatabaseOptionsPostgres, DatabaseOptionsSnowflake, }; -use object_store::gcp::GoogleCloudStorageBuilder; use once_cell::sync::Lazy; use std::collections::HashMap; use std::pin::Pin; @@ -573,6 +571,16 @@ async fn create_provider_for_filetype( 1 => { let mut args = args.into_iter(); let url_string = string_from_scalar(args.next().unwrap())?; + // todo: fetch this from the registry instead. + // let url = Url::parse(&url_string)?; + // let store = registry.get_store(url)?; + // match file_type { + // FileType::Parquet => { + // Arc::new(ParquetTableProvider::from_table_accessor(store, true).await?) + // } + // FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(store).await?), + // FileType::Json => Arc::new(JsonTableProvider::from_table_accessor(store).await?), + // }; Ok(match Url::parse(&url_string).as_ref().map(Url::scheme) { Ok("http" | "https") => HttpAccessor::try_new(url_string, file_type) @@ -582,10 +590,8 @@ async fn create_provider_for_filetype( .await .map_err(|e| BuiltinError::Access(Box::new(e)))?, Ok("gs") => { - - todo!() - }, + } // no scheme so we assume it's a local file _ => { let location = url_string diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index e53af8ddb..6f84a904c 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -462,9 +462,7 @@ impl<'a> SessionPlanner<'a> { let url = url::Url::parse(&url).unwrap(); let store = access.store().unwrap(); - let base_location = access.base_location(); - println!("registering object store for {}", url); self.ctx .get_df_state() .runtime_env() diff --git a/crates/testing/tests/sqllogictests/tests.rs b/crates/testing/tests/sqllogictests/tests.rs index 897953b1e..c1d298754 100644 --- a/crates/testing/tests/sqllogictests/tests.rs +++ b/crates/testing/tests/sqllogictests/tests.rs @@ -81,7 +81,7 @@ SELECT public_key (key2, "test_user2"), ]; for (key, user) in test_cases { - let parts: Vec<_> = key.split(" ").collect(); + let parts: Vec<_> = key.split(' ').collect(); test_assert!( parts.len() == 3, anyhow!("each public key should be of format ` `") diff --git a/justfile b/justfile index 15d529cf9..6965d3e2f 100644 --- a/justfile +++ b/justfile @@ -54,11 +54,17 @@ fmt-check: protoc # Apply formatting. fmt *args: protoc - cargo fmt *args + cargo fmt {{args}} # Run clippy. clippy: protoc - cargo clippy --all-features -- --deny warnings + cargo clippy --all --all-features -- --deny warnings + +# apply linting & clippy fixes. +fix: protoc + just fmt --all + cargo fix --all --allow-dirty + cargo clippy --fix --all --all-features --allow-dirty # Displays help message. help: From 70bc6898f80f01a7e89138d9acb77bf1c38ac769 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Fri, 7 Jul 2023 12:46:39 -0500 Subject: [PATCH 5/8] wip --- crates/datasources/src/object_store/csv.rs | 4 +- crates/datasources/src/object_store/gcs.rs | 12 +- crates/datasources/src/object_store/json.rs | 119 ++---------------- .../datasources/src/object_store/parquet.rs | 4 +- .../datasources/src/object_store/registry.rs | 51 ++++++-- crates/glaredb/src/local.rs | 4 +- crates/sqlbuiltins/src/functions.rs | 32 ++++- crates/sqlexec/src/context.rs | 6 +- crates/sqlexec/src/planner/dispatch.rs | 8 ++ crates/sqlexec/src/planner/session_planner.rs | 10 -- 10 files changed, 111 insertions(+), 139 deletions(-) diff --git a/crates/datasources/src/object_store/csv.rs b/crates/datasources/src/object_store/csv.rs index f888d1134..2ff8c6cb9 100644 --- a/crates/datasources/src/object_store/csv.rs +++ b/crates/datasources/src/object_store/csv.rs @@ -75,14 +75,12 @@ where ) -> DatafusionResult> { let file = self.accessor.object_meta().as_ref().clone(); let base_url = self.accessor.location(); - println!("---base_url----: {}", base_url); - println!("meta = {:#?}", self.accessor.object_meta()); // This config is setup to make use of `FileStream` to stream from csv files in // datafusion let base_config = FileScanConfig { // `store` in `CsvExec` will be used instead of the datafusion object store registry. - object_store_url: ObjectStoreUrl::parse(&base_url) + object_store_url: ObjectStoreUrl::parse(base_url) .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()), file_schema: self.arrow_schema.clone(), file_groups: vec![vec![file.into()]], diff --git a/crates/datasources/src/object_store/gcs.rs b/crates/datasources/src/object_store/gcs.rs index 63ab5a88c..4fa786115 100644 --- a/crates/datasources/src/object_store/gcs.rs +++ b/crates/datasources/src/object_store/gcs.rs @@ -6,6 +6,7 @@ use object_store::path::Path as ObjectStorePath; use object_store::{ObjectMeta, ObjectStore}; use serde::{Deserialize, Serialize}; use tracing::trace; +use url::Url; use super::csv::CsvTableProvider; use super::errors::Result; @@ -46,7 +47,7 @@ impl GcsTableAccess { Ok((Arc::new(store), location)) } - pub fn store(&self) -> Result> { + pub fn into_object_store(&self) -> Result> { let store = self.builder().build()?; Ok(Arc::new(store)) } @@ -75,6 +76,7 @@ impl TableAccessor for GcsAccessor { fn location(&self) -> String { format!("gs://{}", self.base_url) } + fn store(&self) -> &Arc { &self.store } @@ -99,16 +101,14 @@ impl TableAccessor for GcsAccessor { impl GcsAccessor { /// Setup accessor for GCS pub async fn new(access: GcsTableAccess) -> Result { - let store = access.store()?; + let store = access.into_object_store()?; // Use provided file type or infer from location let location = access.location(); - println!("location: {}", location); let file_type = access.file_type.unwrap_or(file_type_from_path(&location)?); trace!(?location, ?file_type, "location and file type"); let meta = Arc::new(store.head(&location).await?); - println!("meta: {:?}", meta); Ok(Self { store, meta, @@ -124,4 +124,8 @@ impl GcsAccessor { store.head(&location).await?; Ok(()) } + + pub fn base_url(&self) -> Url { + Url::parse(&format!("gs://{}", self.base_url)).unwrap() + } } diff --git a/crates/datasources/src/object_store/json.rs b/crates/datasources/src/object_store/json.rs index 92bdcf419..238d10886 100644 --- a/crates/datasources/src/object_store/json.rs +++ b/crates/datasources/src/object_store/json.rs @@ -1,5 +1,5 @@ use std::any::Any; -use std::fmt; + use std::sync::Arc; use async_trait::async_trait; @@ -9,17 +9,12 @@ use datafusion::datasource::file_format::json::JsonFormat; use datafusion::datasource::file_format::FileFormat; use datafusion::datasource::object_store::ObjectStoreUrl; use datafusion::datasource::TableProvider; -use datafusion::error::{DataFusionError, Result as DatafusionResult}; -use datafusion::execution::context::{SessionState, TaskContext}; +use datafusion::error::Result as DatafusionResult; +use datafusion::execution::context::SessionState; use datafusion::logical_expr::TableType; -use datafusion::physical_expr::PhysicalSortExpr; -use datafusion::physical_plan::file_format::{FileScanConfig, FileStream, JsonOpener}; -use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet; -use datafusion::physical_plan::{ - DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, -}; +use datafusion::physical_plan::file_format::{FileScanConfig, NdJsonExec}; +use datafusion::physical_plan::{ExecutionPlan, Statistics}; use datafusion::prelude::{Expr, SessionContext}; -use object_store::ObjectStore; use crate::object_store::errors::Result; use crate::object_store::TableAccessor; @@ -79,12 +74,11 @@ where limit: Option, ) -> DatafusionResult> { let file = self.accessor.object_meta().as_ref().clone().into(); + let base_url = self.accessor.location(); - // This config is setup to make use of `FileStream` to stream from csv files in - // datafusion let base_config = FileScanConfig { - // `store` in `JsonExec` will be used instead of the datafusion object store registry. - object_store_url: ObjectStoreUrl::local_filesystem(), + object_store_url: ObjectStoreUrl::parse(base_url) + .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()), file_schema: self.arrow_schema.clone(), file_groups: vec![vec![file]], statistics: Statistics::default(), @@ -96,106 +90,13 @@ where }; // Project the schema. - let projected_schema = match projection { + let _projected_schema = match projection { Some(projection) => Arc::new(self.arrow_schema.project(projection)?), None => self.arrow_schema.clone(), }; - let exec = JsonExec { - base_config, - file_compression_type: JsonExec::DEFAULT_FILE_COMPRESSION_TYPE, - projected_schema, - store: self.accessor.store().clone(), - }; + let exec = NdJsonExec::new(base_config, FileCompressionType::UNCOMPRESSED); Ok(Arc::new(exec)) } } - -// /// Execution plan for scanning a CSV file -#[derive(Debug, Clone)] -struct JsonExec { - base_config: FileScanConfig, - file_compression_type: FileCompressionType, - projected_schema: ArrowSchemaRef, - store: Arc, -} - -impl JsonExec { - const DEFAULT_BATCH_SIZE: usize = 8192; - const DEFAULT_FILE_COMPRESSION_TYPE: FileCompressionType = FileCompressionType::UNCOMPRESSED; -} - -impl ExecutionPlan for JsonExec { - /// Return a reference to Any that can be used for downcasting - fn as_any(&self) -> &dyn Any { - self - } - - /// Get the schema for this execution plan - fn schema(&self) -> ArrowSchemaRef { - self.projected_schema.clone() - } - - /// Get the output partitioning of this plan - fn output_partitioning(&self) -> Partitioning { - Partitioning::UnknownPartitioning(1) - } - - /// See comments on `impl ExecutionPlan for ParquetExec`: output order can't - /// be - fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> { - None - } - - fn children(&self) -> Vec> { - Vec::new() - } - - fn with_new_children( - self: Arc, - _children: Vec>, - ) -> DatafusionResult> { - Err(DataFusionError::Execution( - "cannot replace children for JsonExec".to_string(), - )) - } - - fn execute( - &self, - partition: usize, - _context: Arc, - ) -> DatafusionResult { - let opener = JsonOpener::new( - Self::DEFAULT_BATCH_SIZE, - self.projected_schema.clone(), - self.file_compression_type.to_owned(), - self.store.clone(), - ); - - let stream = FileStream::new( - &self.base_config, - partition, - opener, - &ExecutionPlanMetricsSet::new(), - )?; - Ok(Box::pin(stream) as SendableRecordBatchStream) - } - - fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { - let files = self - .base_config - .file_groups - .iter() - .flatten() - .map(|f| f.object_meta.location.as_ref()) - .collect::>() - .join(", "); - - write!(f, "JsonExec: files={files}") - } - - fn statistics(&self) -> Statistics { - Statistics::default() - } -} diff --git a/crates/datasources/src/object_store/parquet.rs b/crates/datasources/src/object_store/parquet.rs index 2ef93a197..5d2964336 100644 --- a/crates/datasources/src/object_store/parquet.rs +++ b/crates/datasources/src/object_store/parquet.rs @@ -183,12 +183,14 @@ where }; let file = self.accessor.object_meta().as_ref().clone().into(); + let base_url = self.accessor.location(); let base_config = FileScanConfig { // `object_store_url` will be ignored as we are providing a // `SimpleParquetFileReaderFactory` to `ParquetExec` to use instead of the // datafusion object store registry. - object_store_url: ObjectStoreUrl::local_filesystem(), + object_store_url: ObjectStoreUrl::parse(base_url) + .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()), file_schema: self.arrow_schema.clone(), file_groups: vec![vec![file]], statistics: Statistics::default(), diff --git a/crates/datasources/src/object_store/registry.rs b/crates/datasources/src/object_store/registry.rs index 916c18502..976146726 100644 --- a/crates/datasources/src/object_store/registry.rs +++ b/crates/datasources/src/object_store/registry.rs @@ -2,27 +2,53 @@ use std::sync::Arc; use dashmap::DashMap; use datafusion::{error::DataFusionError, execution::object_store::ObjectStoreRegistry}; -use object_store::ObjectStore; +use metastoreproto::{session::NamespacedCatalogEntry, types::options::TableOptionsGcs}; +use object_store::{local::LocalFileSystem, ObjectStore}; use url::Url; +use crate::object_store::gcs::GcsTableAccess; + #[derive(Debug)] pub struct GlareDBRegistry { + // catalog: Vec>, object_stores: DashMap>, } -impl Default for GlareDBRegistry { - fn default() -> Self { - Self::new() - } -} impl GlareDBRegistry { - pub fn new() -> Self { + pub fn new<'a>(entries: impl Iterator>) -> Self { + use metastoreproto::types::catalog::CatalogEntry; + use metastoreproto::types::options::TableOptions; let object_stores: DashMap> = DashMap::new(); + object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new())); + for entry in entries { + match entry.entry { + CatalogEntry::Table(tbl_entry) => match &tbl_entry.options { + TableOptions::Gcs(opts) => { + let store = gcs_to_store(opts); + let key = format!("gs://{}", opts.bucket); + object_stores.insert(key, store); + } + _ => todo!("add support for other object stores"), + }, + _ => todo!("add support for other object stores"), + }; + } Self { object_stores } } } +fn gcs_to_store(opts: &TableOptionsGcs) -> Arc { + GcsTableAccess { + bucket_name: opts.bucket.clone(), + service_acccount_key_json: opts.service_account_key.clone(), + location: opts.location.clone(), + file_type: None, + } + .into_object_store() + .unwrap() +} + impl ObjectStoreRegistry for GlareDBRegistry { fn register_store( &self, @@ -38,6 +64,17 @@ impl ObjectStoreRegistry for GlareDBRegistry { self.object_stores .get(&s) .map(|o| o.value().clone()) + .or_else(|| { + if matches!(url.scheme(), "http" | "https") { + let store = object_store::http::HttpBuilder::new() + .with_url(url.as_str()) + .build() + .unwrap(); + Some(Arc::new(store)) + } else { + None + } + }) .ok_or_else(|| { DataFusionError::Internal(format!("No suitable object store found for {url}")) }) diff --git a/crates/glaredb/src/local.rs b/crates/glaredb/src/local.rs index bfab35dec..5ed74def8 100644 --- a/crates/glaredb/src/local.rs +++ b/crates/glaredb/src/local.rs @@ -210,8 +210,10 @@ impl LocalSession { } const UNNAMED: String = String::new(); + + // using this to debug. // let lp = self.sess.sql_to_lp(text).await?; - // // // println!("converted to lp: {:#?}", lp); + // println!("converted to lp: {:#?}", lp); // if let sqlexec::LogicalPlan::Query(query) = lp { // let physical_plan = self.sess.create_physical_plan(query).await?; diff --git a/crates/sqlbuiltins/src/functions.rs b/crates/sqlbuiltins/src/functions.rs index 7a05119bd..1b9766630 100644 --- a/crates/sqlbuiltins/src/functions.rs +++ b/crates/sqlbuiltins/src/functions.rs @@ -15,6 +15,7 @@ use datasources::common::listing::VirtualLister; use datasources::debug::DebugVirtualLister; use datasources::mongodb::{MongoAccessor, MongoTableAccessInfo}; use datasources::mysql::{MysqlAccessor, MysqlTableAccess}; +use datasources::object_store::gcs::{GcsAccessor, GcsTableAccess}; use datasources::object_store::http::HttpAccessor; use datasources::object_store::local::{LocalAccessor, LocalTableAccess}; use datasources::object_store::{FileType, TableAccessor}; @@ -589,9 +590,7 @@ async fn create_provider_for_filetype( .into_table_provider(false) .await .map_err(|e| BuiltinError::Access(Box::new(e)))?, - Ok("gs") => { - todo!() - } + Ok("gs") => return Err(BuiltinError::InvalidNumArgs), // no scheme so we assume it's a local file _ => { let location = url_string @@ -613,6 +612,33 @@ async fn create_provider_for_filetype( } }) } + 2 => { + let mut args = args.into_iter(); + let url_string = string_from_scalar(args.next().unwrap())?; + Ok(match Url::parse(&url_string).as_ref().map(Url::scheme) { + Ok("gs") => { + let creds = string_from_scalar(args.next().unwrap())?; + let url = Url::parse(&url_string).unwrap(); + + let bucket = url.host_str().unwrap().to_string(); + let location = url.path().to_string(); + + let access = GcsTableAccess { + bucket_name: bucket, + location, + service_acccount_key_json: Some(creds), + file_type: Some(file_type), + }; + GcsAccessor::new(access) + .await + .map_err(|e| BuiltinError::Access(Box::new(e)))? + .into_table_provider(true) + .await + .map_err(|e| BuiltinError::Access(Box::new(e)))? + } + _ => return Err(BuiltinError::InvalidNumArgs), + }) + } _ => Err(BuiltinError::InvalidNumArgs), } } diff --git a/crates/sqlexec/src/context.rs b/crates/sqlexec/src/context.rs index 954eb5892..eea7cc6eb 100644 --- a/crates/sqlexec/src/context.rs +++ b/crates/sqlexec/src/context.rs @@ -112,8 +112,12 @@ impl SessionContext { // Create a new datafusion runtime env with disk manager and memory pool // if needed. + let entries = catalog + .iter_entries() + .filter(|e| e.entry_type() == EntryType::Table && !e.builtin); + let mut conf = RuntimeConfig::default(); - conf = conf.with_object_store_registry(Arc::new(GlareDBRegistry::new())); + conf = conf.with_object_store_registry(Arc::new(GlareDBRegistry::new(entries))); if let Some(spill_path) = spill_path { conf = conf.with_disk_manager(DiskManagerConfig::NewSpecified(vec![spill_path])); } diff --git a/crates/sqlexec/src/planner/dispatch.rs b/crates/sqlexec/src/planner/dispatch.rs index 5e78e56da..a8706fbe2 100644 --- a/crates/sqlexec/src/planner/dispatch.rs +++ b/crates/sqlexec/src/planner/dispatch.rs @@ -418,6 +418,14 @@ impl<'a> SessionDispatcher<'a> { file_type: None, }; let accessor = GcsAccessor::new(table_access).await?; + let store = accessor.store(); + let url = accessor.base_url(); + + self.ctx + .get_df_state() + .runtime_env() + .register_object_store(&url, store.clone()); + let provider = accessor.into_table_provider(true).await?; Ok(provider) } diff --git a/crates/sqlexec/src/planner/session_planner.rs b/crates/sqlexec/src/planner/session_planner.rs index 6f84a904c..cf6bfd51e 100644 --- a/crates/sqlexec/src/planner/session_planner.rs +++ b/crates/sqlexec/src/planner/session_planner.rs @@ -444,7 +444,6 @@ impl<'a> SessionPlanner<'a> { let bucket = m.remove_required("bucket")?; let location = m.remove_required("location")?; - let url = format!("gs://{bucket}/"); let access = GcsTableAccess { bucket_name: bucket, @@ -459,15 +458,6 @@ impl<'a> SessionPlanner<'a> { source: Box::new(e), })?; - let url = url::Url::parse(&url).unwrap(); - - let store = access.store().unwrap(); - - self.ctx - .get_df_state() - .runtime_env() - .register_object_store(&url, store); - TableOptions::Gcs(TableOptionsGcs { service_account_key: access.service_acccount_key_json, bucket: access.bucket_name, From bc988cb795127d7a2baecd78286668978bd7ff07 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Fri, 7 Jul 2023 12:48:49 -0500 Subject: [PATCH 6/8] remove comment --- crates/sqlbuiltins/src/functions.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/crates/sqlbuiltins/src/functions.rs b/crates/sqlbuiltins/src/functions.rs index 1b9766630..88348061e 100644 --- a/crates/sqlbuiltins/src/functions.rs +++ b/crates/sqlbuiltins/src/functions.rs @@ -572,16 +572,6 @@ async fn create_provider_for_filetype( 1 => { let mut args = args.into_iter(); let url_string = string_from_scalar(args.next().unwrap())?; - // todo: fetch this from the registry instead. - // let url = Url::parse(&url_string)?; - // let store = registry.get_store(url)?; - // match file_type { - // FileType::Parquet => { - // Arc::new(ParquetTableProvider::from_table_accessor(store, true).await?) - // } - // FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(store).await?), - // FileType::Json => Arc::new(JsonTableProvider::from_table_accessor(store).await?), - // }; Ok(match Url::parse(&url_string).as_ref().map(Url::scheme) { Ok("http" | "https") => HttpAccessor::try_new(url_string, file_type) From ecb9ac29fcfcd21a8358a92cc0a61a6a9d7c98d5 Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Fri, 7 Jul 2023 13:44:23 -0500 Subject: [PATCH 7/8] chore: code cleanup --- crates/datasources/src/object_store/csv.rs | 5 +---- crates/datasources/src/object_store/gcs.rs | 17 ++++++----------- crates/datasources/src/object_store/http.rs | 7 ++++++- crates/datasources/src/object_store/json.rs | 2 +- crates/datasources/src/object_store/local.rs | 5 +++++ crates/datasources/src/object_store/mod.rs | 8 ++++++++ crates/datasources/src/object_store/parquet.rs | 5 +---- crates/datasources/src/object_store/s3.rs | 5 +++++ crates/glaredb/src/local.rs | 9 --------- crates/sqlexec/src/planner/dispatch.rs | 12 +++++++++++- 10 files changed, 44 insertions(+), 31 deletions(-) diff --git a/crates/datasources/src/object_store/csv.rs b/crates/datasources/src/object_store/csv.rs index 2ff8c6cb9..b72580d59 100644 --- a/crates/datasources/src/object_store/csv.rs +++ b/crates/datasources/src/object_store/csv.rs @@ -74,12 +74,9 @@ where limit: Option, ) -> DatafusionResult> { let file = self.accessor.object_meta().as_ref().clone(); - let base_url = self.accessor.location(); + let base_url = self.accessor.base_path(); - // This config is setup to make use of `FileStream` to stream from csv files in - // datafusion let base_config = FileScanConfig { - // `store` in `CsvExec` will be used instead of the datafusion object store registry. object_store_url: ObjectStoreUrl::parse(base_url) .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()), file_schema: self.arrow_schema.clone(), diff --git a/crates/datasources/src/object_store/gcs.rs b/crates/datasources/src/object_store/gcs.rs index 4fa786115..f7a84f7ba 100644 --- a/crates/datasources/src/object_store/gcs.rs +++ b/crates/datasources/src/object_store/gcs.rs @@ -6,7 +6,6 @@ use object_store::path::Path as ObjectStorePath; use object_store::{ObjectMeta, ObjectStore}; use serde::{Deserialize, Serialize}; use tracing::trace; -use url::Url; use super::csv::CsvTableProvider; use super::errors::Result; @@ -52,13 +51,9 @@ impl GcsTableAccess { Ok(Arc::new(store)) } - pub fn location(&self) -> ObjectStorePath { + fn location(&self) -> ObjectStorePath { ObjectStorePath::from_url_path(&self.location).unwrap() } - - pub fn base_location(&self) -> ObjectStorePath { - ObjectStorePath::from_url_path(&self.bucket_name).unwrap() - } } #[derive(Debug)] @@ -73,10 +68,14 @@ pub struct GcsAccessor { #[async_trait::async_trait] impl TableAccessor for GcsAccessor { - fn location(&self) -> String { + fn base_path(&self) -> String { format!("gs://{}", self.base_url) } + fn location(&self) -> String { + self.meta.location.to_string() + } + fn store(&self) -> &Arc { &self.store } @@ -124,8 +123,4 @@ impl GcsAccessor { store.head(&location).await?; Ok(()) } - - pub fn base_url(&self) -> Url { - Url::parse(&format!("gs://{}", self.base_url)).unwrap() - } } diff --git a/crates/datasources/src/object_store/http.rs b/crates/datasources/src/object_store/http.rs index f5bdec735..14995f2b6 100644 --- a/crates/datasources/src/object_store/http.rs +++ b/crates/datasources/src/object_store/http.rs @@ -37,9 +37,14 @@ impl HttpAccessor { #[async_trait::async_trait] impl TableAccessor for HttpAccessor { - fn location(&self) -> String { + fn base_path(&self) -> String { self.base_url.clone() } + + fn location(&self) -> String { + self.meta.location.to_string() + } + fn store(&self) -> &Arc { &self.store } diff --git a/crates/datasources/src/object_store/json.rs b/crates/datasources/src/object_store/json.rs index 238d10886..7eb3ba62c 100644 --- a/crates/datasources/src/object_store/json.rs +++ b/crates/datasources/src/object_store/json.rs @@ -74,7 +74,7 @@ where limit: Option, ) -> DatafusionResult> { let file = self.accessor.object_meta().as_ref().clone().into(); - let base_url = self.accessor.location(); + let base_url = self.accessor.base_path(); let base_config = FileScanConfig { object_store_url: ObjectStoreUrl::parse(base_url) diff --git a/crates/datasources/src/object_store/local.rs b/crates/datasources/src/object_store/local.rs index 2436d2c60..0a5b2d7b4 100644 --- a/crates/datasources/src/object_store/local.rs +++ b/crates/datasources/src/object_store/local.rs @@ -39,9 +39,14 @@ pub struct LocalAccessor { #[async_trait::async_trait] impl TableAccessor for LocalAccessor { + fn base_path(&self) -> String { + "file://".to_string() + } + fn location(&self) -> String { self.meta.location.to_string() } + fn store(&self) -> &Arc { &self.store } diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index d7b4d46fd..d6cd14c2f 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -46,6 +46,14 @@ pub trait TableAccessor: Send + Sync { fn store(&self) -> &Arc; fn object_meta(&self) -> &Arc; + /// returns the base path. + /// Example: + /// - s3://bucket_name/path/to/file.parquet -> s3://bucket_name + /// - http://domain.com/path/to/file.parquet -> http://domain.com + fn base_path(&self) -> String; + /// returns the location of the file. + /// Example: + /// - s3://bucket_name/path/to/file.parquet -> path/to/file.parquet fn location(&self) -> String; async fn into_table_provider(self, predicate_pushdown: bool) -> Result>; } diff --git a/crates/datasources/src/object_store/parquet.rs b/crates/datasources/src/object_store/parquet.rs index 5d2964336..d8fb95401 100644 --- a/crates/datasources/src/object_store/parquet.rs +++ b/crates/datasources/src/object_store/parquet.rs @@ -183,12 +183,9 @@ where }; let file = self.accessor.object_meta().as_ref().clone().into(); - let base_url = self.accessor.location(); + let base_url = self.accessor.base_path(); let base_config = FileScanConfig { - // `object_store_url` will be ignored as we are providing a - // `SimpleParquetFileReaderFactory` to `ParquetExec` to use instead of the - // datafusion object store registry. object_store_url: ObjectStoreUrl::parse(base_url) .unwrap_or_else(|_| ObjectStoreUrl::local_filesystem()), file_schema: self.arrow_schema.clone(), diff --git a/crates/datasources/src/object_store/s3.rs b/crates/datasources/src/object_store/s3.rs index 64e36ecf6..b1638a9e1 100644 --- a/crates/datasources/src/object_store/s3.rs +++ b/crates/datasources/src/object_store/s3.rs @@ -59,10 +59,14 @@ pub struct S3Accessor { /// Meta information for location/object pub meta: Arc, pub file_type: FileType, + bucket: String, } #[async_trait::async_trait] impl TableAccessor for S3Accessor { + fn base_path(&self) -> String { + format!("s3://{}/", self.bucket) + } fn location(&self) -> String { self.meta.location.to_string() } @@ -102,6 +106,7 @@ impl S3Accessor { store, meta, file_type, + bucket: access.bucket_name, }) } diff --git a/crates/glaredb/src/local.rs b/crates/glaredb/src/local.rs index 5ed74def8..932b39692 100644 --- a/crates/glaredb/src/local.rs +++ b/crates/glaredb/src/local.rs @@ -210,15 +210,6 @@ impl LocalSession { } const UNNAMED: String = String::new(); - - // using this to debug. - // let lp = self.sess.sql_to_lp(text).await?; - // println!("converted to lp: {:#?}", lp); - // if let sqlexec::LogicalPlan::Query(query) = lp { - // let physical_plan = self.sess.create_physical_plan(query).await?; - - // println!("created physical plan: {:#?}", physical_plan); - // }; let statements = parser::parse_sql(text)?; for stmt in statements { diff --git a/crates/sqlexec/src/planner/dispatch.rs b/crates/sqlexec/src/planner/dispatch.rs index a8706fbe2..595a15929 100644 --- a/crates/sqlexec/src/planner/dispatch.rs +++ b/crates/sqlexec/src/planner/dispatch.rs @@ -419,7 +419,8 @@ impl<'a> SessionDispatcher<'a> { }; let accessor = GcsAccessor::new(table_access).await?; let store = accessor.store(); - let url = accessor.base_url(); + let url = accessor.base_path(); + let url = url::Url::parse(&url).unwrap(); self.ctx .get_df_state() @@ -445,6 +446,15 @@ impl<'a> SessionDispatcher<'a> { file_type: None, }; let accessor = S3Accessor::new(table_access).await?; + let store = accessor.store(); + let url = accessor.base_path(); + let url = url::Url::parse(&url).unwrap(); + + self.ctx + .get_df_state() + .runtime_env() + .register_object_store(&url, store.clone()); + let provider = accessor.into_table_provider(true).await?; Ok(provider) } From d5fa9945648e8555cd3f24f8b6afc1ad5d7c3f0e Mon Sep 17 00:00:00 2001 From: universalmind303 Date: Fri, 7 Jul 2023 13:48:01 -0500 Subject: [PATCH 8/8] chore: code cleanup --- crates/datasources/src/object_store/mod.rs | 4 ++-- justfile | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/datasources/src/object_store/mod.rs b/crates/datasources/src/object_store/mod.rs index d6cd14c2f..af7f4754f 100644 --- a/crates/datasources/src/object_store/mod.rs +++ b/crates/datasources/src/object_store/mod.rs @@ -46,8 +46,8 @@ pub trait TableAccessor: Send + Sync { fn store(&self) -> &Arc; fn object_meta(&self) -> &Arc; - /// returns the base path. - /// Example: + /// returns the base path. + /// Example: /// - s3://bucket_name/path/to/file.parquet -> s3://bucket_name /// - http://domain.com/path/to/file.parquet -> http://domain.com fn base_path(&self) -> String; diff --git a/justfile b/justfile index 6965d3e2f..3999aadc4 100644 --- a/justfile +++ b/justfile @@ -62,9 +62,9 @@ clippy: protoc # apply linting & clippy fixes. fix: protoc - just fmt --all - cargo fix --all --allow-dirty cargo clippy --fix --all --all-features --allow-dirty + cargo fix --all --allow-dirty + just fmt --all # Displays help message. help: