Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Systematic Configuration in 'Create External Table' and 'Copy To' Options #9382

Merged
merged 35 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from 33 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
0f176a1
Initial but not completely work like proto
metesynnada Feb 15, 2024
10000fb
Delete docs.yaml
metesynnada Feb 22, 2024
6c76423
Merge pull request #5 from synnada-ai/ci-action-fixing
mustafasrepo Feb 22, 2024
599516b
Merge branch 'apache:main' into main
mustafasrepo Feb 23, 2024
b20b65c
Merge branch 'apache:main' into main
mustafasrepo Feb 26, 2024
cac943d
Before proto is handled
metesynnada Feb 26, 2024
3fba4a0
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Feb 26, 2024
c17d655
Update listing_table_factory.rs
metesynnada Feb 26, 2024
13e7e83
Before proto 2
metesynnada Feb 26, 2024
25d5685
Minor adjustments
metesynnada Feb 26, 2024
14ed29f
Merge branch 'main' into configuration
metesynnada Feb 26, 2024
daca94e
Update headers
mustafasrepo Feb 27, 2024
0e1300d
Update copy.slt
metesynnada Feb 27, 2024
c2da778
Merge branch 'configuration' of https://github.com/synnada-ai/datafus…
mustafasrepo Feb 27, 2024
ea22682
Add new test,
mustafasrepo Feb 27, 2024
fb86d94
Passes SLT tests
metesynnada Feb 27, 2024
b89b138
Update csv.rs
metesynnada Feb 28, 2024
33eca8d
Before trying char
metesynnada Feb 28, 2024
3caf33e
Fix u8 handling
metesynnada Feb 28, 2024
d16bb65
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Feb 29, 2024
4f1acf1
Update according to review
metesynnada Feb 29, 2024
036f005
Passing tests
metesynnada Mar 5, 2024
ba938a9
passing tests with proto
metesynnada Mar 5, 2024
cff6bc9
Cargo fix
metesynnada Mar 5, 2024
2fd3c4e
Testing and clippy refactors
metesynnada Mar 6, 2024
f141345
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Mar 6, 2024
55a223c
After merge corrections
metesynnada Mar 6, 2024
7dfa7ee
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Mar 6, 2024
4f9acdf
Parquet feature fix
metesynnada Mar 6, 2024
b0bb337
On datafusion-cli register COPY statements
metesynnada Mar 7, 2024
00525a8
Correcting a test
metesynnada Mar 7, 2024
f2f1d96
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Mar 7, 2024
129e682
Review
ozankabak Mar 11, 2024
9f06c6f
Review visited
metesynnada Mar 12, 2024
e27dfe8
Merge remote-tracking branch 'upstream/main' into configuration
metesynnada Mar 12, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,4 @@ jobs:
git add --all
git commit -m 'Publish built docs triggered by ${{ github.sha }}'
git push || git push --force
fi
fi
10 changes: 6 additions & 4 deletions benchmarks/src/parquet_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use crate::AccessLogOpt;
use crate::{BenchmarkRun, CommonOpt};
use std::path::PathBuf;

use crate::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::logical_expr::utils::disjunction;
Expand All @@ -25,7 +27,7 @@ use datafusion::physical_plan::collect;
use datafusion::prelude::{col, SessionContext};
use datafusion::test_util::parquet::{ParquetScanOptions, TestParquetFile};
use datafusion_common::instant::Instant;
use std::path::PathBuf;

use structopt::StructOpt;

/// Test performance of parquet filter pushdown
Expand Down Expand Up @@ -179,7 +181,7 @@ async fn exec_scan(
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let exec = test_file.create_scan(Some(filter)).await?;
let exec = test_file.create_scan(ctx, Some(filter)).await?;

let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
Expand Down
13 changes: 7 additions & 6 deletions benchmarks/src/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::AccessLogOpt;
use crate::BenchmarkRun;
use crate::CommonOpt;
use std::path::PathBuf;
use std::sync::Arc;

use crate::{AccessLogOpt, BenchmarkRun, CommonOpt};

use arrow::util::pretty;
use datafusion::common::Result;
use datafusion::physical_expr::PhysicalSortExpr;
Expand All @@ -26,8 +28,7 @@ use datafusion::physical_plan::sorts::sort::SortExec;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion::test_util::parquet::TestParquetFile;
use datafusion_common::instant::Instant;
use std::path::PathBuf;
use std::sync::Arc;

use structopt::StructOpt;

/// Test performance of sorting large datasets
Expand Down Expand Up @@ -174,7 +175,7 @@ async fn exec_sort(
debug: bool,
) -> Result<(usize, std::time::Duration)> {
let start = Instant::now();
let scan = test_file.create_scan(None).await?;
let scan = test_file.create_scan(ctx, None).await?;
let exec = Arc::new(SortExec::new(expr.to_owned(), scan));
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
Expand Down
28 changes: 15 additions & 13 deletions benchmarks/src/tpch/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,14 @@
// specific language governing permissions and limitations
// under the License.

use super::get_query_sql;
use std::path::PathBuf;
use std::sync::Arc;

use super::{
get_query_sql, get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES,
};
use crate::{BenchmarkRun, CommonOpt};

use arrow::record_batch::RecordBatch;
use arrow::util::pretty::{self, pretty_format_batches};
use datafusion::datasource::file_format::csv::CsvFormat;
Expand All @@ -26,21 +32,16 @@ use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::datasource::{MemTable, TableProvider};
use datafusion::error::Result;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
use datafusion::physical_plan::{collect, displayable};
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};
use log::info;

use datafusion::prelude::*;
use datafusion_common::instant::Instant;
use std::path::PathBuf;
use std::sync::Arc;
use datafusion_common::{DEFAULT_CSV_EXTENSION, DEFAULT_PARQUET_EXTENSION};

use datafusion::error::Result;
use datafusion::prelude::*;
use log::info;
use structopt::StructOpt;

use super::{get_tbl_tpch_table_schema, get_tpch_table_schema, TPCH_TABLES};

/// Run the tpch benchmark.
///
/// This benchmarks is derived from the [TPC-H][1] version
Expand Down Expand Up @@ -253,7 +254,7 @@ impl RunOpt {
}
"parquet" => {
let path = format!("{path}/{table}");
let format = ParquetFormat::default().with_enable_pruning(Some(true));
let format = ParquetFormat::default().with_enable_pruning(true);

(Arc::new(format), path, DEFAULT_PARQUET_EXTENSION)
}
Expand Down Expand Up @@ -298,11 +299,12 @@ struct QueryResult {
// Only run with "ci" mode when we have the data
#[cfg(feature = "ci")]
mod tests {
use std::path::Path;

use super::*;

use datafusion::common::exec_err;
use datafusion::error::{DataFusionError, Result};
use std::path::Path;

use datafusion_proto::bytes::{
logical_plan_from_bytes, logical_plan_to_bytes, physical_plan_from_bytes,
physical_plan_to_bytes,
Expand Down
39 changes: 27 additions & 12 deletions datafusion-cli/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@
// specific language governing permissions and limitations
// under the License.

use crate::object_storage::get_object_store;
use async_trait::async_trait;
use std::any::Any;
use std::sync::{Arc, Weak};

use crate::object_storage::{get_object_store, AwsOptions, GcpOptions};

use datafusion::catalog::schema::SchemaProvider;
use datafusion::catalog::{CatalogProvider, CatalogProviderList};
use datafusion::common::plan_datafusion_err;
Expand All @@ -26,12 +29,10 @@ use datafusion::datasource::listing::{
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::execution::context::SessionState;

use async_trait::async_trait;
use dirs::home_dir;
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use url::Url;

/// Wraps another catalog, automatically creating table providers
/// for local files if needed
Expand Down Expand Up @@ -155,15 +156,16 @@ impl SchemaProvider for DynamicFileSchemaProvider {

// if the inner schema provider didn't have a table by
// that name, try to treat it as a listing table
let state = self
let mut state = self
.state
.upgrade()
.ok_or_else(|| plan_datafusion_err!("locking error"))?
.read()
.clone();
let optimized_name = substitute_tilde(name.to_owned());
let table_url = ListingTableUrl::parse(optimized_name.as_str())?;
let url: &Url = table_url.as_ref();
let scheme = table_url.scheme();
let url = table_url.as_ref();

// If the store is already registered for this URL then `get_store`
// will return `Ok` which means we don't need to register it again. However,
Expand All @@ -174,10 +176,22 @@ impl SchemaProvider for DynamicFileSchemaProvider {
Err(_) => {
// Register the store for this URL. Here we don't have access
// to any command options so the only choice is to use an empty collection
let mut options = HashMap::new();
let store =
get_object_store(&state, &mut options, table_url.scheme(), url)
.await?;
match scheme {
"s3" | "oss" => {
state = state.add_table_options_extension(AwsOptions::default());
}
"gs" | "gcs" => {
state = state.add_table_options_extension(GcpOptions::default())
}
_ => {}
};
let store = get_object_store(
&state,
table_url.scheme(),
url,
state.default_table_options(),
)
.await?;
state.runtime_env().register_object_store(url, store);
}
}
Expand Down Expand Up @@ -215,6 +229,7 @@ fn substitute_tilde(cur: String) -> String {
#[cfg(test)]
mod tests {
use super::*;

use datafusion::catalog::schema::SchemaProvider;
use datafusion::prelude::SessionContext;

Expand Down
Loading