Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for reading from "file://" for parquet_scan #1164

Merged
merged 1 commit into from
Jun 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ lto = "thin"
[workspace.dependencies]
datafusion = { version = "26.0" }
deltalake = { git = "https://github.com/delta-io/delta-rs.git", branch = "main",features = ["s3", "gcs", "azure", "datafusion","arrow","parquet"] }
url = "2.4.0"
2 changes: 1 addition & 1 deletion crates/datasources/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,5 @@ tokio-postgres = { version = "0.7.8", features = ["with-uuid-1", "with-serde_jso
tokio-rustls = "0.24.1"
tracing = "0.1"
uuid = "1.3.4"
url = "2.4.0"
url.workspace = true
webpki-roots = "0.23.1"
24 changes: 11 additions & 13 deletions crates/datasources/src/object_store/gcs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ pub struct GcsAccessor {
pub file_type: FileType,
}

#[async_trait::async_trait]
impl TableAccessor for GcsAccessor {
fn store(&self) -> &Arc<dyn ObjectStore> {
&self.store
Expand All @@ -52,6 +53,16 @@ impl TableAccessor for GcsAccessor {
fn object_meta(&self) -> &Arc<ObjectMeta> {
&self.meta
}

async fn into_table_provider(self, predicate_pushdown: bool) -> Result<Arc<dyn TableProvider>> {
let table_provider: Arc<dyn TableProvider> = match self.file_type {
FileType::Parquet => {
Arc::new(ParquetTableProvider::from_table_accessor(self, predicate_pushdown).await?)
}
FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(self).await?),
};
Ok(table_provider)
}
}

impl GcsAccessor {
Expand Down Expand Up @@ -79,17 +90,4 @@ impl GcsAccessor {
store.head(&location).await?;
Ok(())
}

pub async fn into_table_provider(
self,
predicate_pushdown: bool,
) -> Result<Arc<dyn TableProvider>> {
let table_provider: Arc<dyn TableProvider> = match self.file_type {
FileType::Parquet => {
Arc::new(ParquetTableProvider::from_table_accessor(self, predicate_pushdown).await?)
}
FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(self).await?),
};
Ok(table_provider)
}
}
16 changes: 16 additions & 0 deletions crates/datasources/src/object_store/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@ use std::sync::Arc;
use crate::object_store::{errors::ObjectStoreSourceError, Result, TableAccessor};

use chrono::{DateTime, Utc};
use datafusion::datasource::TableProvider;
use object_store::{ObjectMeta, ObjectStore};

use super::{csv::CsvTableProvider, parquet::ParquetTableProvider, FileType};

#[derive(Debug, Clone)]
pub struct HttpTableAccess {
/// Public url where the file is hosted.
Expand All @@ -15,6 +18,7 @@ pub struct HttpTableAccess {
pub struct HttpAccessor {
pub store: Arc<dyn ObjectStore>,
pub meta: Arc<ObjectMeta>,
pub file_type: FileType,
}

impl HttpAccessor {
Expand All @@ -25,10 +29,12 @@ impl HttpAccessor {
Ok(Self {
store: Arc::new(store),
meta: Arc::new(meta),
file_type: FileType::Parquet,
})
}
}

#[async_trait::async_trait]
impl TableAccessor for HttpAccessor {
fn store(&self) -> &Arc<dyn ObjectStore> {
&self.store
Expand All @@ -37,6 +43,16 @@ impl TableAccessor for HttpAccessor {
fn object_meta(&self) -> &Arc<ObjectMeta> {
&self.meta
}

async fn into_table_provider(self, _: bool) -> Result<Arc<dyn TableProvider>> {
let table_provider: Arc<dyn TableProvider> = match self.file_type {
FileType::Parquet => {
Arc::new(ParquetTableProvider::from_table_accessor(self, false).await?)
}
FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(self).await?),
};
Ok(table_provider)
}
}

/// Get the object meta from a HEAD request to the url.
Expand Down
24 changes: 11 additions & 13 deletions crates/datasources/src/object_store/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ pub struct LocalAccessor {
pub file_type: FileType,
}

#[async_trait::async_trait]
impl TableAccessor for LocalAccessor {
fn store(&self) -> &Arc<dyn ObjectStore> {
&self.store
Expand All @@ -36,6 +37,16 @@ impl TableAccessor for LocalAccessor {
fn object_meta(&self) -> &Arc<ObjectMeta> {
&self.meta
}

async fn into_table_provider(self, predicate_pushdown: bool) -> Result<Arc<dyn TableProvider>> {
let table_provider: Arc<dyn TableProvider> = match self.file_type {
FileType::Parquet => {
Arc::new(ParquetTableProvider::from_table_accessor(self, predicate_pushdown).await?)
}
FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(self).await?),
};
Ok(table_provider)
}
}

impl LocalAccessor {
Expand Down Expand Up @@ -64,17 +75,4 @@ impl LocalAccessor {
store.head(&location).await?;
Ok(())
}

pub async fn into_table_provider(
self,
predicate_pushdown: bool,
) -> Result<Arc<dyn TableProvider>> {
let table_provider: Arc<dyn TableProvider> = match self.file_type {
FileType::Parquet => {
Arc::new(ParquetTableProvider::from_table_accessor(self, predicate_pushdown).await?)
}
FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(self).await?),
};
Ok(table_provider)
}
}
3 changes: 3 additions & 0 deletions crates/datasources/src/object_store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fmt::Debug;
use std::str::FromStr;
use std::sync::Arc;

use datafusion::datasource::TableProvider;
use errors::ObjectStoreSourceError;
use object_store::path::Path as ObjectStorePath;
use object_store::{ObjectMeta, ObjectStore};
Expand Down Expand Up @@ -37,10 +38,12 @@ impl FromStr for FileType {
}
}

#[async_trait::async_trait]
pub trait TableAccessor: Send + Sync {
fn store(&self) -> &Arc<dyn ObjectStore>;

fn object_meta(&self) -> &Arc<ObjectMeta>;
async fn into_table_provider(self, predicate_pushdown: bool) -> Result<Arc<dyn TableProvider>>;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo we should remove predicate_pushdown (can do later). I don't know when there's a time where we don't want this.

Copy link
Contributor Author

@universalmind303 universalmind303 Jun 21, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we remove, then couldn't we just change this use the std TryInto?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, we still need it be async since there's a bit of setup done in this function for some data sources (e.g. for postgres we need to make a connection and run a query to get the schema).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean would could do IntoFuture (https://doc.rust-lang.org/std/future/trait.IntoFuture.html), but I'm not really feelin it...

}

pub fn file_type_from_path(path: &ObjectStorePath) -> Result<FileType> {
Expand Down
24 changes: 11 additions & 13 deletions crates/datasources/src/object_store/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ pub struct S3Accessor {
pub file_type: FileType,
}

#[async_trait::async_trait]
impl TableAccessor for S3Accessor {
fn store(&self) -> &Arc<dyn ObjectStore> {
&self.store
Expand All @@ -62,6 +63,16 @@ impl TableAccessor for S3Accessor {
fn object_meta(&self) -> &Arc<ObjectMeta> {
&self.meta
}

async fn into_table_provider(self, predicate_pushdown: bool) -> Result<Arc<dyn TableProvider>> {
let table_provider: Arc<dyn TableProvider> = match self.file_type {
FileType::Parquet => {
Arc::new(ParquetTableProvider::from_table_accessor(self, predicate_pushdown).await?)
}
FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(self).await?),
};
Ok(table_provider)
}
}

impl S3Accessor {
Expand Down Expand Up @@ -90,17 +101,4 @@ impl S3Accessor {
store.head(&location).await?;
Ok(())
}

pub async fn into_table_provider(
self,
predicate_pushdown: bool,
) -> Result<Arc<dyn TableProvider>> {
let table_provider: Arc<dyn TableProvider> = match self.file_type {
FileType::Parquet => {
Arc::new(ParquetTableProvider::from_table_accessor(self, predicate_pushdown).await?)
}
FileType::Csv => Arc::new(CsvTableProvider::from_table_accessor(self).await?),
};
Ok(table_provider)
}
}
2 changes: 1 addition & 1 deletion crates/sqlbuiltins/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ regex = "1.8"
tonic = { version = "0.9", features = ["transport", "tls", "tls-roots"] }
tokio-postgres = "0.7.8"
once_cell = "1.18.0"

url.workspace = true
37 changes: 26 additions & 11 deletions crates/sqlbuiltins/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ use datasources::debug::DebugVirtualLister;
use datasources::mongodb::{MongoAccessor, MongoTableAccessInfo};
use datasources::mysql::{MysqlAccessor, MysqlTableAccess};
use datasources::object_store::http::HttpAccessor;
use datasources::object_store::parquet::ParquetTableProvider;
use datasources::object_store::local::{LocalAccessor, LocalTableAccess};
use datasources::object_store::{FileType, TableAccessor};
use datasources::postgres::{PostgresAccessor, PostgresTableAccess};
use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection, SnowflakeTableAccess};
use futures::Stream;
Expand All @@ -30,6 +31,7 @@ use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};
use url::Url;

/// Builtin table returning functions available for all sessions.
pub static BUILTIN_TABLE_FUNCS: Lazy<BuiltinTableFuncs> = Lazy::new(BuiltinTableFuncs::new);
Expand Down Expand Up @@ -498,16 +500,29 @@ impl TableFunc for ParquetScan {
// parquet_scan(url)
1 => {
let mut args = args.into_iter();
let url = string_from_scalar(args.next().unwrap())?;
let accessor = HttpAccessor::try_new(url)
.await
.map_err(|e| BuiltinError::Access(Box::new(e)))?;

let prov = ParquetTableProvider::from_table_accessor(accessor, false)
.await
.map_err(|e| BuiltinError::Access(Box::new(e)))?;

Ok(Arc::new(prov))
let url_string = string_from_scalar(args.next().unwrap())?;
let url = Url::parse(&url_string).map_err(|e| BuiltinError::Access(Box::new(e)))?;
Ok(match url.scheme() {
"file" => {
let table_access = LocalTableAccess {
location: url.to_file_path().unwrap().to_str().unwrap().to_string(),
file_type: Some(FileType::Parquet),
};
LocalAccessor::new(table_access)
.await
.map_err(|e| BuiltinError::Access(Box::new(e)))?
.into_table_provider(false)
.await
.map_err(|e| BuiltinError::Access(Box::new(e)))?
}
"http" | "https" => HttpAccessor::try_new(url_string)
.await
.map_err(|e| BuiltinError::Access(Box::new(e)))?
.into_table_provider(false)
.await
.map_err(|e| BuiltinError::Access(Box::new(e)))?,
_ => return Err(BuiltinError::Static("Unsupported scheme")),
})
}
_ => Err(BuiltinError::InvalidNumArgs),
}
Expand Down
1 change: 1 addition & 0 deletions crates/sqlexec/src/planner/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use datasources::mysql::{MysqlAccessor, MysqlTableAccess};
use datasources::object_store::gcs::{GcsAccessor, GcsTableAccess};
use datasources::object_store::local::{LocalAccessor, LocalTableAccess};
use datasources::object_store::s3::{S3Accessor, S3TableAccess};
use datasources::object_store::TableAccessor;
use datasources::postgres::{PostgresAccessor, PostgresTableAccess};
use datasources::snowflake::{SnowflakeAccessor, SnowflakeDbConnection, SnowflakeTableAccess};
use metastore::builtins::{
Expand Down