diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs index c731e60636af..fd12eb996785 100644 --- a/ballista/rust/core/src/utils.rs +++ b/ballista/rust/core/src/utils.rs @@ -30,6 +30,7 @@ use crate::memory_stream::MemoryStream; use crate::serde::scheduler::PartitionStats; use crate::config::BallistaConfig; +use async_trait::async_trait; use datafusion::arrow::datatypes::Schema; use datafusion::arrow::error::Result as ArrowResult; use datafusion::arrow::{ @@ -269,8 +270,9 @@ impl BallistaQueryPlanner { } } +#[async_trait] impl QueryPlanner for BallistaQueryPlanner { - fn create_physical_plan( + async fn create_physical_plan( &self, logical_plan: &LogicalPlan, _ctx_state: &ExecutionContextState, diff --git a/ballista/rust/scheduler/src/lib.rs b/ballista/rust/scheduler/src/lib.rs index f03d08b1b0ed..47caf4c21ede 100644 --- a/ballista/rust/scheduler/src/lib.rs +++ b/ballista/rust/scheduler/src/lib.rs @@ -418,6 +418,7 @@ impl SchedulerGrpc for SchedulerServer { let plan = fail_job!(datafusion_ctx .create_physical_plan(&optimized_plan) + .await .map_err(|e| { let msg = format!("Could not create physical plan: {}", e); error!("{}", msg); @@ -447,6 +448,7 @@ impl SchedulerGrpc for SchedulerServer { let mut planner = DistributedPlanner::new(); let stages = fail_job!(planner .plan_query_stages(&job_id_spawn, plan) + .await .map_err(|e| { let msg = format!("Could not plan query stages: {}", e); error!("{}", msg); diff --git a/ballista/rust/scheduler/src/planner.rs b/ballista/rust/scheduler/src/planner.rs index 2872ff924ce4..3d5712ae8a67 100644 --- a/ballista/rust/scheduler/src/planner.rs +++ b/ballista/rust/scheduler/src/planner.rs @@ -31,6 +31,8 @@ use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; use datafusion::physical_plan::repartition::RepartitionExec; use datafusion::physical_plan::windows::WindowAggExec; use datafusion::physical_plan::{ExecutionPlan, Partitioning}; +use futures::future::BoxFuture; +use futures::FutureExt; use log::info; type PartialQueryStageResult = (Arc, Vec>); @@ -55,14 +57,15 @@ impl DistributedPlanner { /// Returns a vector of ExecutionPlans, where the root node is a [ShuffleWriterExec]. /// Plans that depend on the input of other plans will have leaf nodes of type [UnresolvedShuffleExec]. /// A [ShuffleWriterExec] is created whenever the partitioning changes. - pub fn plan_query_stages( - &mut self, - job_id: &str, + pub async fn plan_query_stages<'a>( + &'a mut self, + job_id: &'a str, execution_plan: Arc, ) -> Result>> { info!("planning query stages"); - let (new_plan, mut stages) = - self.plan_query_stages_internal(job_id, execution_plan)?; + let (new_plan, mut stages) = self + .plan_query_stages_internal(job_id, execution_plan) + .await?; stages.push(create_shuffle_writer( job_id, self.next_stage_id(), @@ -75,91 +78,95 @@ impl DistributedPlanner { /// Returns a potentially modified version of the input execution_plan along with the resulting query stages. /// This function is needed because the input execution_plan might need to be modified, but it might not hold a /// complete query stage (its parent might also belong to the same stage) - fn plan_query_stages_internal( - &mut self, - job_id: &str, + fn plan_query_stages_internal<'a>( + &'a mut self, + job_id: &'a str, execution_plan: Arc, - ) -> Result { - // recurse down and replace children - if execution_plan.children().is_empty() { - return Ok((execution_plan, vec![])); - } + ) -> BoxFuture<'a, Result> { + async move { + // recurse down and replace children + if execution_plan.children().is_empty() { + return Ok((execution_plan, vec![])); + } - let mut stages = vec![]; - let mut children = vec![]; - for child in execution_plan.children() { - let (new_child, mut child_stages) = - self.plan_query_stages_internal(job_id, child.clone())?; - children.push(new_child); - stages.append(&mut child_stages); - } + let mut stages = vec![]; + let mut children = vec![]; + for child in execution_plan.children() { + let (new_child, mut child_stages) = self + .plan_query_stages_internal(job_id, child.clone()) + .await?; + children.push(new_child); + stages.append(&mut child_stages); + } - if let Some(coalesce) = execution_plan - .as_any() - .downcast_ref::() - { - let shuffle_writer = create_shuffle_writer( - job_id, - self.next_stage_id(), - children[0].clone(), - None, - )?; - let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - shuffle_writer.stage_id(), - shuffle_writer.schema(), - shuffle_writer.output_partitioning().partition_count(), - shuffle_writer - .shuffle_output_partitioning() - .map(|p| p.partition_count()) - .unwrap_or_else(|| { - shuffle_writer.output_partitioning().partition_count() - }), - )); - stages.push(shuffle_writer); - Ok(( - coalesce.with_new_children(vec![unresolved_shuffle])?, - stages, - )) - } else if let Some(repart) = - execution_plan.as_any().downcast_ref::() - { - match repart.output_partitioning() { - Partitioning::Hash(_, _) => { - let shuffle_writer = create_shuffle_writer( - job_id, - self.next_stage_id(), - children[0].clone(), - Some(repart.partitioning().to_owned()), - )?; - let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( - shuffle_writer.stage_id(), - shuffle_writer.schema(), - shuffle_writer.output_partitioning().partition_count(), - shuffle_writer - .shuffle_output_partitioning() - .map(|p| p.partition_count()) - .unwrap_or_else(|| { - shuffle_writer.output_partitioning().partition_count() - }), - )); - stages.push(shuffle_writer); - Ok((unresolved_shuffle, stages)) - } - _ => { - // remove any non-hash repartition from the distributed plan - Ok((children[0].clone(), stages)) + if let Some(coalesce) = execution_plan + .as_any() + .downcast_ref::() + { + let shuffle_writer = create_shuffle_writer( + job_id, + self.next_stage_id(), + children[0].clone(), + None, + )?; + let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( + shuffle_writer.stage_id(), + shuffle_writer.schema(), + shuffle_writer.output_partitioning().partition_count(), + shuffle_writer + .shuffle_output_partitioning() + .map(|p| p.partition_count()) + .unwrap_or_else(|| { + shuffle_writer.output_partitioning().partition_count() + }), + )); + stages.push(shuffle_writer); + Ok(( + coalesce.with_new_children(vec![unresolved_shuffle])?, + stages, + )) + } else if let Some(repart) = + execution_plan.as_any().downcast_ref::() + { + match repart.output_partitioning() { + Partitioning::Hash(_, _) => { + let shuffle_writer = create_shuffle_writer( + job_id, + self.next_stage_id(), + children[0].clone(), + Some(repart.partitioning().to_owned()), + )?; + let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new( + shuffle_writer.stage_id(), + shuffle_writer.schema(), + shuffle_writer.output_partitioning().partition_count(), + shuffle_writer + .shuffle_output_partitioning() + .map(|p| p.partition_count()) + .unwrap_or_else(|| { + shuffle_writer.output_partitioning().partition_count() + }), + )); + stages.push(shuffle_writer); + Ok((unresolved_shuffle, stages)) + } + _ => { + // remove any non-hash repartition from the distributed plan + Ok((children[0].clone(), stages)) + } } + } else if let Some(window) = + execution_plan.as_any().downcast_ref::() + { + Err(BallistaError::NotImplemented(format!( + "WindowAggExec with window {:?}", + window + ))) + } else { + Ok((execution_plan.with_new_children(children)?, stages)) } - } else if let Some(window) = - execution_plan.as_any().downcast_ref::() - { - Err(BallistaError::NotImplemented(format!( - "WindowAggExec with window {:?}", - window - ))) - } else { - Ok((execution_plan.with_new_children(children)?, stages)) } + .boxed() } /// Generate a new stage ID @@ -262,8 +269,8 @@ mod test { }; } - #[test] - fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> { + #[tokio::test] + async fn distributed_hash_aggregate_plan() -> Result<(), BallistaError> { let mut ctx = datafusion_test_context("testdata")?; // simplified form of TPC-H query 1 @@ -276,11 +283,13 @@ mod test { let plan = df.to_logical_plan(); let plan = ctx.optimize(&plan)?; - let plan = ctx.create_physical_plan(&plan)?; + let plan = ctx.create_physical_plan(&plan).await?; let mut planner = DistributedPlanner::new(); let job_uuid = Uuid::new_v4(); - let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?; + let stages = planner + .plan_query_stages(&job_uuid.to_string(), plan) + .await?; for stage in &stages { println!("{}", displayable(stage.as_ref()).indent().to_string()); } @@ -345,8 +354,8 @@ mod test { Ok(()) } - #[test] - fn distributed_join_plan() -> Result<(), BallistaError> { + #[tokio::test] + async fn distributed_join_plan() -> Result<(), BallistaError> { let mut ctx = datafusion_test_context("testdata")?; // simplified form of TPC-H query 12 @@ -386,11 +395,13 @@ order by let plan = df.to_logical_plan(); let plan = ctx.optimize(&plan)?; - let plan = ctx.create_physical_plan(&plan)?; + let plan = ctx.create_physical_plan(&plan).await?; let mut planner = DistributedPlanner::new(); let job_uuid = Uuid::new_v4(); - let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?; + let stages = planner + .plan_query_stages(&job_uuid.to_string(), plan) + .await?; for stage in &stages { println!("{}", displayable(stage.as_ref()).indent().to_string()); } @@ -516,8 +527,8 @@ order by Ok(()) } - #[test] - fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> { + #[tokio::test] + async fn roundtrip_serde_hash_aggregate() -> Result<(), BallistaError> { let mut ctx = datafusion_test_context("testdata")?; // simplified form of TPC-H query 1 @@ -530,11 +541,13 @@ order by let plan = df.to_logical_plan(); let plan = ctx.optimize(&plan)?; - let plan = ctx.create_physical_plan(&plan)?; + let plan = ctx.create_physical_plan(&plan).await?; let mut planner = DistributedPlanner::new(); let job_uuid = Uuid::new_v4(); - let stages = planner.plan_query_stages(&job_uuid.to_string(), plan)?; + let stages = planner + .plan_query_stages(&job_uuid.to_string(), plan) + .await?; let partial_hash = stages[0].children()[0].clone(); let partial_hash_serde = roundtrip_operator(partial_hash.clone())?; diff --git a/benchmarks/src/bin/nyctaxi.rs b/benchmarks/src/bin/nyctaxi.rs index 938753077661..a88494fc8547 100644 --- a/benchmarks/src/bin/nyctaxi.rs +++ b/benchmarks/src/bin/nyctaxi.rs @@ -121,7 +121,7 @@ async fn execute_sql(ctx: &mut ExecutionContext, sql: &str, debug: bool) -> Resu if debug { println!("Optimized logical plan:\n{:?}", plan); } - let physical_plan = ctx.create_physical_plan(&plan)?; + let physical_plan = ctx.create_physical_plan(&plan).await?; let result = collect(physical_plan).await?; if debug { pretty::print_batches(&result)?; diff --git a/benchmarks/src/bin/tpch.rs b/benchmarks/src/bin/tpch.rs index 852e499b217c..2e9b2ff3df8f 100644 --- a/benchmarks/src/bin/tpch.rs +++ b/benchmarks/src/bin/tpch.rs @@ -348,7 +348,7 @@ async fn execute_query( if debug { println!("=== Optimized logical plan ===\n{:?}\n", plan); } - let physical_plan = ctx.create_physical_plan(&plan)?; + let physical_plan = ctx.create_physical_plan(&plan).await?; if debug { println!( "=== Physical plan ===\n{}\n", @@ -394,7 +394,7 @@ async fn convert_tbl(opt: ConvertOpt) -> Result<()> { // create the physical plan let csv = csv.to_logical_plan(); let csv = ctx.optimize(&csv)?; - let csv = ctx.create_physical_plan(&csv)?; + let csv = ctx.create_physical_plan(&csv).await?; let output_path = output_root_path.join(table); let output_path = output_path.to_str().unwrap().to_owned(); @@ -1063,7 +1063,7 @@ mod tests { use datafusion::physical_plan::ExecutionPlan; use std::convert::TryInto; - fn round_trip_query(n: usize) -> Result<()> { + async fn round_trip_query(n: usize) -> Result<()> { let config = ExecutionConfig::new() .with_target_partitions(1) .with_batch_size(10); @@ -1110,7 +1110,7 @@ mod tests { // test physical plan roundtrip if env::var("TPCH_DATA").is_ok() { - let physical_plan = ctx.create_physical_plan(&plan)?; + let physical_plan = ctx.create_physical_plan(&plan).await?; let proto: protobuf::PhysicalPlanNode = (physical_plan.clone()).try_into().unwrap(); let round_trip: Arc = (&proto).try_into().unwrap(); @@ -1126,9 +1126,9 @@ mod tests { macro_rules! test_round_trip { ($tn:ident, $query:expr) => { - #[test] - fn $tn() -> Result<()> { - round_trip_query($query) + #[tokio::test] + async fn $tn() -> Result<()> { + round_trip_query($query).await } }; } diff --git a/datafusion/src/datasource/avro.rs b/datafusion/src/datasource/avro.rs index ee0fabfe0cc6..ee5cea51d991 100644 --- a/datafusion/src/datasource/avro.rs +++ b/datafusion/src/datasource/avro.rs @@ -27,6 +27,7 @@ use std::{ }; use arrow::datatypes::SchemaRef; +use async_trait::async_trait; use crate::physical_plan::avro::{AvroExec, AvroReadOptions}; use crate::{ @@ -120,6 +121,7 @@ impl AvroFile { } } +#[async_trait] impl TableProvider for AvroFile { fn as_any(&self) -> &dyn Any { self @@ -129,7 +131,7 @@ impl TableProvider for AvroFile { self.schema.clone() } - fn scan( + async fn scan( &self, projection: &Option>, batch_size: usize, @@ -185,7 +187,7 @@ mod tests { async fn read_small_batches() -> Result<()> { let table = load_table("alltypes_plain.avro")?; let projection = None; - let exec = table.scan(&projection, 2, &[], None)?; + let exec = table.scan(&projection, 2, &[], None).await?; let stream = exec.execute(0).await?; let _ = stream @@ -414,7 +416,7 @@ mod tests { table: Arc, projection: &Option>, ) -> Result { - let exec = table.scan(projection, 1024, &[], None)?; + let exec = table.scan(projection, 1024, &[], None).await?; let mut it = exec.execute(0).await?; it.next() .await diff --git a/datafusion/src/datasource/csv.rs b/datafusion/src/datasource/csv.rs index 971bd91315f9..d47312e8b745 100644 --- a/datafusion/src/datasource/csv.rs +++ b/datafusion/src/datasource/csv.rs @@ -34,6 +34,7 @@ //! ``` use arrow::datatypes::SchemaRef; +use async_trait::async_trait; use std::any::Any; use std::io::{Read, Seek}; use std::string::String; @@ -157,6 +158,7 @@ impl CsvFile { } } +#[async_trait] impl TableProvider for CsvFile { fn as_any(&self) -> &dyn Any { self @@ -166,7 +168,7 @@ impl TableProvider for CsvFile { self.schema.clone() } - fn scan( + async fn scan( &self, projection: &Option>, batch_size: usize, diff --git a/datafusion/src/datasource/datasource.rs b/datafusion/src/datasource/datasource.rs index 3c60255590a1..918200f5a91c 100644 --- a/datafusion/src/datasource/datasource.rs +++ b/datafusion/src/datasource/datasource.rs @@ -20,6 +20,8 @@ use std::any::Any; use std::sync::Arc; +use async_trait::async_trait; + use crate::arrow::datatypes::SchemaRef; use crate::error::Result; use crate::logical_plan::Expr; @@ -54,6 +56,7 @@ pub enum TableType { } /// Source table +#[async_trait] pub trait TableProvider: Sync + Send { /// Returns the table provider as [`Any`](std::any::Any) so that it can be /// downcast to a specific implementation. @@ -68,7 +71,7 @@ pub trait TableProvider: Sync + Send { } /// Create an ExecutionPlan that will scan the table. - fn scan( + async fn scan( &self, projection: &Option>, batch_size: usize, diff --git a/datafusion/src/datasource/empty.rs b/datafusion/src/datasource/empty.rs index 183db76829c0..380c5a7ac5d1 100644 --- a/datafusion/src/datasource/empty.rs +++ b/datafusion/src/datasource/empty.rs @@ -21,6 +21,7 @@ use std::any::Any; use std::sync::Arc; use arrow::datatypes::*; +use async_trait::async_trait; use crate::datasource::TableProvider; use crate::error::Result; @@ -39,6 +40,7 @@ impl EmptyTable { } } +#[async_trait] impl TableProvider for EmptyTable { fn as_any(&self) -> &dyn Any { self @@ -48,7 +50,7 @@ impl TableProvider for EmptyTable { self.schema.clone() } - fn scan( + async fn scan( &self, projection: &Option>, _batch_size: usize, diff --git a/datafusion/src/datasource/json.rs b/datafusion/src/datasource/json.rs index f4e67828906e..1a6ec7af0720 100644 --- a/datafusion/src/datasource/json.rs +++ b/datafusion/src/datasource/json.rs @@ -36,6 +36,7 @@ use crate::{ }, }; use arrow::{datatypes::SchemaRef, json::reader::infer_json_schema_from_seekable}; +use async_trait::async_trait; trait SeekRead: Read + Seek {} @@ -101,6 +102,8 @@ impl NdJsonFile { }) } } + +#[async_trait] impl TableProvider for NdJsonFile { fn as_any(&self) -> &dyn Any { self @@ -110,7 +113,7 @@ impl TableProvider for NdJsonFile { self.schema.clone() } - fn scan( + async fn scan( &self, projection: &Option>, batch_size: usize, diff --git a/datafusion/src/datasource/memory.rs b/datafusion/src/datasource/memory.rs index 67b0e7b7c030..b47e7e12e54e 100644 --- a/datafusion/src/datasource/memory.rs +++ b/datafusion/src/datasource/memory.rs @@ -25,6 +25,7 @@ use std::sync::Arc; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; +use async_trait::async_trait; use crate::datasource::TableProvider; use crate::error::{DataFusionError, Result}; @@ -66,7 +67,7 @@ impl MemTable { output_partitions: Option, ) -> Result { let schema = t.schema(); - let exec = t.scan(&None, batch_size, &[], None)?; + let exec = t.scan(&None, batch_size, &[], None).await?; let partition_count = exec.output_partitioning().partition_count(); let tasks = (0..partition_count) @@ -114,6 +115,7 @@ impl MemTable { } } +#[async_trait] impl TableProvider for MemTable { fn as_any(&self) -> &dyn Any { self @@ -123,7 +125,7 @@ impl TableProvider for MemTable { self.schema.clone() } - fn scan( + async fn scan( &self, projection: &Option>, _batch_size: usize, @@ -168,7 +170,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; // scan with projection - let exec = provider.scan(&Some(vec![2, 1]), 1024, &[], None)?; + let exec = provider.scan(&Some(vec![2, 1]), 1024, &[], None).await?; let mut it = exec.execute(0).await?; let batch2 = it.next().await.unwrap()?; assert_eq!(2, batch2.schema().fields().len()); @@ -198,7 +200,7 @@ mod tests { let provider = MemTable::try_new(schema, vec![vec![batch]])?; - let exec = provider.scan(&None, 1024, &[], None)?; + let exec = provider.scan(&None, 1024, &[], None).await?; let mut it = exec.execute(0).await?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); @@ -207,8 +209,8 @@ mod tests { Ok(()) } - #[test] - fn test_invalid_projection() -> Result<()> { + #[tokio::test] + async fn test_invalid_projection() -> Result<()> { let schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Int32, false), Field::new("b", DataType::Int32, false), @@ -228,7 +230,7 @@ mod tests { let projection: Vec = vec![0, 4]; - match provider.scan(&Some(projection), 1024, &[], None) { + match provider.scan(&Some(projection), 1024, &[], None).await { Err(DataFusionError::Internal(e)) => { assert_eq!("\"Projection index out of range\"", format!("{:?}", e)) } @@ -349,7 +351,7 @@ mod tests { let provider = MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?; - let exec = provider.scan(&None, 1024, &[], None)?; + let exec = provider.scan(&None, 1024, &[], None).await?; let mut it = exec.execute(0).await?; let batch1 = it.next().await.unwrap()?; assert_eq!(3, batch1.schema().fields().len()); diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs index 8dc9bc52213d..65c90897cb12 100644 --- a/datafusion/src/datasource/parquet.rs +++ b/datafusion/src/datasource/parquet.rs @@ -21,6 +21,7 @@ use std::any::Any; use std::fs::File; use std::sync::Arc; +use async_trait::async_trait; use parquet::arrow::ArrowReader; use parquet::arrow::ParquetFileArrowReader; use parquet::file::serialized_reader::SerializedFileReader; @@ -110,6 +111,7 @@ impl ParquetTable { } } +#[async_trait] impl TableProvider for ParquetTable { fn as_any(&self) -> &dyn Any { self @@ -129,7 +131,7 @@ impl TableProvider for ParquetTable { /// Scan the file(s), using the provided projection, and return one BatchIterator per /// partition. - fn scan( + async fn scan( &self, projection: &Option>, batch_size: usize, @@ -414,7 +416,7 @@ mod tests { async fn read_small_batches() -> Result<()> { let table = load_table("alltypes_plain.parquet")?; let projection = None; - let exec = table.scan(&projection, 2, &[], None)?; + let exec = table.scan(&projection, 2, &[], None).await?; let stream = exec.execute(0).await?; let _ = stream @@ -635,7 +637,7 @@ mod tests { table: Arc, projection: &Option>, ) -> Result { - let exec = table.scan(projection, 1024, &[], None)?; + let exec = table.scan(projection, 1024, &[], None).await?; let mut it = exec.execute(0).await?; it.next() .await diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs index 6789b79e399c..00adbb0b039e 100644 --- a/datafusion/src/execution/context.rs +++ b/datafusion/src/execution/context.rs @@ -81,6 +81,7 @@ use crate::sql::{ }; use crate::variable::{VarProvider, VarType}; use crate::{dataframe::DataFrame, physical_plan::udaf::AggregateUDF}; +use async_trait::async_trait; use chrono::{DateTime, Utc}; use parquet::arrow::ArrowWriter; use parquet::file::properties::WriterProperties; @@ -533,17 +534,27 @@ impl ExecutionContext { } /// Creates a physical plan from a logical plan. - pub fn create_physical_plan( + pub async fn create_physical_plan( &self, logical_plan: &LogicalPlan, ) -> Result> { - let mut state = self.state.lock().unwrap(); - state.execution_props.start_execution(); + let (state, planner) = { + let mut state = self.state.lock().unwrap(); + state.execution_props.start_execution(); + + // We need to clone `state` to release the lock that is not `Send`. We could + // make the lock `Send` by using `tokio::sync::Mutex`, but that would require to + // propagate async even to the `LogicalPlan` building methods. + // Cloning `state` here is fine as we then pass it as immutable `&state`, which + // means that we avoid write consistency issues as the cloned version will not + // be written to. As for eventual modifications that would be applied to the + // original state after it has been cloned, they will not be picked up by the + // clone but that is okay, as it is equivalent to postponing the state update + // by keeping the lock until the end of the function scope. + (state.clone(), Arc::clone(&state.config.query_planner)) + }; - state - .config - .query_planner - .create_physical_plan(logical_plan, &state) + planner.create_physical_plan(logical_plan, &state).await } /// Executes a query and writes the results to a partitioned CSV file. @@ -676,9 +687,10 @@ impl FunctionRegistry for ExecutionContext { } /// A planner used to add extensions to DataFusion logical and physical plans. +#[async_trait] pub trait QueryPlanner { /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution - fn create_physical_plan( + async fn create_physical_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, @@ -688,15 +700,16 @@ pub trait QueryPlanner { /// The query planner used if no user defined planner is provided struct DefaultQueryPlanner {} +#[async_trait] impl QueryPlanner for DefaultQueryPlanner { /// Given a `LogicalPlan`, create an `ExecutionPlan` suitable for execution - fn create_physical_plan( + async fn create_physical_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, ) -> Result> { let planner = DefaultPhysicalPlanner::default(); - planner.create_physical_plan(logical_plan, ctx_state) + planner.create_physical_plan(logical_plan, ctx_state).await } } @@ -1054,6 +1067,7 @@ mod tests { use arrow::compute::add; use arrow::datatypes::*; use arrow::record_batch::RecordBatch; + use async_trait::async_trait; use std::fs::File; use std::sync::Weak; use std::thread::{self, JoinHandle}; @@ -1214,7 +1228,7 @@ mod tests { ctx.create_logical_plan("SELECT c1, c2 FROM test WHERE c1 > 0 AND c1 < 3")?; let logical_plan = ctx.optimize(&logical_plan)?; - let physical_plan = ctx.create_physical_plan(&logical_plan)?; + let physical_plan = ctx.create_physical_plan(&logical_plan).await?; let results = collect_partitioned(physical_plan).await?; @@ -1290,7 +1304,7 @@ mod tests { \n TableScan: test projection=Some([1])"; assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan)?; + let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); @@ -1301,8 +1315,8 @@ mod tests { Ok(()) } - #[test] - fn preserve_nullability_on_projection() -> Result<()> { + #[tokio::test] + async fn preserve_nullability_on_projection() -> Result<()> { let tmp_dir = TempDir::new()?; let ctx = create_ctx(&tmp_dir, 1)?; @@ -1314,7 +1328,7 @@ mod tests { .build()?; let plan = ctx.optimize(&plan)?; - let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?; + let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?; assert!(!physical_plan.schema().field_with_name("c1")?.is_nullable()); Ok(()) } @@ -1366,7 +1380,7 @@ mod tests { ); assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan)?; + let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("b", physical_plan.schema().field(0).name().as_str()); @@ -2442,8 +2456,8 @@ mod tests { Ok(()) } - #[test] - fn aggregate_with_alias() -> Result<()> { + #[tokio::test] + async fn aggregate_with_alias() -> Result<()> { let tmp_dir = TempDir::new()?; let ctx = create_ctx(&tmp_dir, 1)?; @@ -2459,7 +2473,7 @@ mod tests { let plan = ctx.optimize(&plan)?; - let physical_plan = ctx.create_physical_plan(&Arc::new(plan))?; + let physical_plan = ctx.create_physical_plan(&Arc::new(plan)).await?; assert_eq!("c1", physical_plan.schema().field(0).name().as_str()); assert_eq!( "total_salary", @@ -2873,8 +2887,8 @@ mod tests { Ok(()) } - #[test] - fn send_context_to_threads() -> Result<()> { + #[tokio::test] + async fn send_context_to_threads() -> Result<()> { // ensure ExecutionContexts can be used in a multi-threaded // environment. Usecase is for concurrent planing. let tmp_dir = TempDir::new()?; @@ -2900,8 +2914,8 @@ mod tests { Ok(()) } - #[test] - fn ctx_sql_should_optimize_plan() -> Result<()> { + #[tokio::test] + async fn ctx_sql_should_optimize_plan() -> Result<()> { let mut ctx = ExecutionContext::new(); let plan1 = ctx.create_logical_plan("SELECT * FROM (SELECT 1) WHERE TRUE AND TRUE")?; @@ -2977,7 +2991,7 @@ mod tests { ); let plan = ctx.optimize(&plan)?; - let plan = ctx.create_physical_plan(&plan)?; + let plan = ctx.create_physical_plan(&plan).await?; let result = collect(plan).await?; let expected = vec![ @@ -3247,6 +3261,7 @@ mod tests { async fn information_schema_tables_table_types() { struct TestTable(TableType); + #[async_trait] impl TableProvider for TestTable { fn as_any(&self) -> &dyn std::any::Any { self @@ -3260,7 +3275,7 @@ mod tests { unimplemented!() } - fn scan( + async fn scan( &self, _: &Option>, _: usize, @@ -3764,8 +3779,9 @@ mod tests { struct MyPhysicalPlanner {} + #[async_trait] impl PhysicalPlanner for MyPhysicalPlanner { - fn create_physical_plan( + async fn create_physical_plan( &self, _logical_plan: &LogicalPlan, _ctx_state: &ExecutionContextState, @@ -3788,14 +3804,17 @@ mod tests { struct MyQueryPlanner {} + #[async_trait] impl QueryPlanner for MyQueryPlanner { - fn create_physical_plan( + async fn create_physical_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, ) -> Result> { let physical_planner = MyPhysicalPlanner {}; - physical_planner.create_physical_plan(logical_plan, ctx_state) + physical_planner + .create_physical_plan(logical_plan, ctx_state) + .await } } @@ -3822,7 +3841,7 @@ mod tests { ) -> Result<()> { let logical_plan = ctx.create_logical_plan(sql)?; let logical_plan = ctx.optimize(&logical_plan)?; - let physical_plan = ctx.create_physical_plan(&logical_plan)?; + let physical_plan = ctx.create_physical_plan(&logical_plan).await?; ctx.write_csv(physical_plan, out_dir.to_string()).await } @@ -3835,7 +3854,7 @@ mod tests { ) -> Result<()> { let logical_plan = ctx.create_logical_plan(sql)?; let logical_plan = ctx.optimize(&logical_plan)?; - let physical_plan = ctx.create_physical_plan(&logical_plan)?; + let physical_plan = ctx.create_physical_plan(&logical_plan).await?; ctx.write_parquet(physical_plan, out_dir.to_string(), writer_properties) .await } diff --git a/datafusion/src/execution/dataframe_impl.rs b/datafusion/src/execution/dataframe_impl.rs index 724a3f8493c5..cd39dd6f3e7b 100644 --- a/datafusion/src/execution/dataframe_impl.rs +++ b/datafusion/src/execution/dataframe_impl.rs @@ -57,7 +57,7 @@ impl DataFrameImpl { let state = self.ctx_state.lock().unwrap().clone(); let ctx = ExecutionContext::from(Arc::new(Mutex::new(state))); let plan = ctx.optimize(&self.plan)?; - ctx.create_physical_plan(&plan) + ctx.create_physical_plan(&plan).await } } diff --git a/datafusion/src/optimizer/filter_push_down.rs b/datafusion/src/optimizer/filter_push_down.rs index a51fbc225724..594e37194b27 100644 --- a/datafusion/src/optimizer/filter_push_down.rs +++ b/datafusion/src/optimizer/filter_push_down.rs @@ -549,6 +549,7 @@ mod tests { use crate::test::*; use crate::{logical_plan::col, prelude::JoinType}; use arrow::datatypes::SchemaRef; + use async_trait::async_trait; fn optimize_plan(plan: &LogicalPlan) -> LogicalPlan { let rule = FilterPushDown::new(); @@ -1129,6 +1130,7 @@ mod tests { pub filter_support: TableProviderFilterPushDown, } + #[async_trait] impl TableProvider for PushDownProvider { fn schema(&self) -> SchemaRef { Arc::new(arrow::datatypes::Schema::new(vec![ @@ -1140,7 +1142,7 @@ mod tests { ])) } - fn scan( + async fn scan( &self, _: &Option>, _: usize, diff --git a/datafusion/src/physical_plan/mod.rs b/datafusion/src/physical_plan/mod.rs index d12b2178e39e..f0b5622c43ba 100644 --- a/datafusion/src/physical_plan/mod.rs +++ b/datafusion/src/physical_plan/mod.rs @@ -193,31 +193,34 @@ pub trait ExecutionPlan: Debug + Send + Sync { /// use datafusion::prelude::*; /// use datafusion::physical_plan::displayable; /// -/// // Hard code target_partitions as it appears in the RepartitionExec output -/// let config = ExecutionConfig::new() -/// .with_target_partitions(3); -/// let mut ctx = ExecutionContext::with_config(config); +/// #[tokio::main] +/// async fn main() { +/// // Hard code target_partitions as it appears in the RepartitionExec output +/// let config = ExecutionConfig::new() +/// .with_target_partitions(3); +/// let mut ctx = ExecutionContext::with_config(config); /// -/// // register the a table -/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap(); +/// // register the a table +/// ctx.register_csv("example", "tests/example.csv", CsvReadOptions::new()).unwrap(); /// -/// // create a plan to run a SQL query -/// let plan = ctx -/// .create_logical_plan("SELECT a FROM example WHERE a < 5") -/// .unwrap(); -/// let plan = ctx.optimize(&plan).unwrap(); -/// let physical_plan = ctx.create_physical_plan(&plan).unwrap(); +/// // create a plan to run a SQL query +/// let plan = ctx +/// .create_logical_plan("SELECT a FROM example WHERE a < 5") +/// .unwrap(); +/// let plan = ctx.optimize(&plan).unwrap(); +/// let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); /// -/// // Format using display string -/// let displayable_plan = displayable(physical_plan.as_ref()); -/// let plan_string = format!("{}", displayable_plan.indent()); +/// // Format using display string +/// let displayable_plan = displayable(physical_plan.as_ref()); +/// let plan_string = format!("{}", displayable_plan.indent()); /// -/// assert_eq!("ProjectionExec: expr=[a@0 as a]\ -/// \n CoalesceBatchesExec: target_batch_size=4096\ -/// \n FilterExec: a@0 < 5\ -/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ -/// \n CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true", -/// plan_string.trim()); +/// assert_eq!("ProjectionExec: expr=[a@0 as a]\ +/// \n CoalesceBatchesExec: target_batch_size=4096\ +/// \n FilterExec: a@0 < 5\ +/// \n RepartitionExec: partitioning=RoundRobinBatch(3)\ +/// \n CsvExec: source=Path(tests/example.csv: [tests/example.csv]), has_header=true", +/// plan_string.trim()); +/// } /// ``` /// pub fn displayable(plan: &dyn ExecutionPlan) -> DisplayableExecutionPlan<'_> { diff --git a/datafusion/src/physical_plan/planner.rs b/datafusion/src/physical_plan/planner.rs index 55dc9369e037..06f3a1ddd961 100644 --- a/datafusion/src/physical_plan/planner.rs +++ b/datafusion/src/physical_plan/planner.rs @@ -54,7 +54,10 @@ use crate::{ use arrow::compute::SortOptions; use arrow::datatypes::{Schema, SchemaRef}; use arrow::{compute::can_cast_types, datatypes::DataType}; +use async_trait::async_trait; use expressions::col; +use futures::future::BoxFuture; +use futures::{FutureExt, StreamExt, TryStreamExt}; use log::debug; use std::sync::Arc; @@ -182,9 +185,10 @@ fn create_physical_name(e: &Expr, is_first_expr: bool) -> Result { /// Physical query planner that converts a `LogicalPlan` to an /// `ExecutionPlan` suitable for execution. +#[async_trait] pub trait PhysicalPlanner { /// Create a physical plan from a logical plan - fn create_physical_plan( + async fn create_physical_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, @@ -243,17 +247,18 @@ impl Default for DefaultPhysicalPlanner { } } +#[async_trait] impl PhysicalPlanner for DefaultPhysicalPlanner { /// Create a physical plan from a logical plan - fn create_physical_plan( + async fn create_physical_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, ) -> Result> { - match self.handle_explain(logical_plan, ctx_state)? { + match self.handle_explain(logical_plan, ctx_state).await? { Some(plan) => Ok(plan), None => { - let plan = self.create_initial_plan(logical_plan, ctx_state)?; + let plan = self.create_initial_plan(logical_plan, ctx_state).await?; self.optimize_internal(plan, ctx_state, |_, _| {}) } } @@ -296,95 +301,343 @@ impl DefaultPhysicalPlanner { } /// Create a physical plan from a logical plan - fn create_initial_plan( - &self, - logical_plan: &LogicalPlan, - ctx_state: &ExecutionContextState, - ) -> Result> { - let batch_size = ctx_state.config.batch_size; - - match logical_plan { - LogicalPlan::TableScan { - source, - projection, - filters, - limit, - .. - } => { - // Remove all qualifiers from the scan as the provider - // doesn't know (nor should care) how the relation was - // referred to in the query - let filters = unnormalize_cols(filters.iter().cloned()); - source.scan(projection, batch_size, &filters, *limit) - } - LogicalPlan::Window { - input, window_expr, .. - } => { - if window_expr.is_empty() { - return Err(DataFusionError::Internal( - "Impossibly got empty window expression".to_owned(), - )); + fn create_initial_plan<'a>( + &'a self, + logical_plan: &'a LogicalPlan, + ctx_state: &'a ExecutionContextState, + ) -> BoxFuture<'a, Result>> { + async move { + let batch_size = ctx_state.config.batch_size; + + let exec_plan: Result> = match logical_plan { + LogicalPlan::TableScan { + source, + projection, + filters, + limit, + .. + } => { + // Remove all qualifiers from the scan as the provider + // doesn't know (nor should care) how the relation was + // referred to in the query + let filters = unnormalize_cols(filters.iter().cloned()); + source.scan(projection, batch_size, &filters, *limit).await } + LogicalPlan::Window { + input, window_expr, .. + } => { + if window_expr.is_empty() { + return Err(DataFusionError::Internal( + "Impossibly got empty window expression".to_owned(), + )); + } + + let input_exec = self.create_initial_plan(input, ctx_state).await?; - let input_exec = self.create_initial_plan(input, ctx_state)?; + // at this moment we are guaranteed by the logical planner + // to have all the window_expr to have equal sort key + let partition_keys = window_expr_common_partition_keys(window_expr)?; - // at this moment we are guaranteed by the logical planner - // to have all the window_expr to have equal sort key - let partition_keys = window_expr_common_partition_keys(window_expr)?; + let can_repartition = !partition_keys.is_empty() + && ctx_state.config.target_partitions > 1 + && ctx_state.config.repartition_windows; + + let input_exec = if can_repartition { + let partition_keys = partition_keys + .iter() + .map(|e| { + self.create_physical_expr( + e, + input.schema(), + &input_exec.schema(), + ctx_state, + ) + }) + .collect::>>>()?; + Arc::new(RepartitionExec::try_new( + input_exec, + Partitioning::Hash( + partition_keys, + ctx_state.config.target_partitions, + ), + )?) + } else { + input_exec + }; + + // add a sort phase + let get_sort_keys = |expr: &Expr| match expr { + Expr::WindowFunction { + ref partition_by, + ref order_by, + .. + } => generate_sort_key(partition_by, order_by), + _ => unreachable!(), + }; + let sort_keys = get_sort_keys(&window_expr[0]); + if window_expr.len() > 1 { + debug_assert!( + window_expr[1..] + .iter() + .all(|expr| get_sort_keys(expr) == sort_keys), + "all window expressions shall have the same sort keys, as guaranteed by logical planning" + ); + } - let can_repartition = !partition_keys.is_empty() - && ctx_state.config.target_partitions > 1 - && ctx_state.config.repartition_windows; + let logical_input_schema = input.schema(); - let input_exec = if can_repartition { - let partition_keys = partition_keys + let input_exec = if sort_keys.is_empty() { + input_exec + } else { + let physical_input_schema = input_exec.schema(); + let sort_keys = sort_keys + .iter() + .map(|e| match e { + Expr::Sort { + expr, + asc, + nulls_first, + } => self.create_physical_sort_expr( + expr, + logical_input_schema, + &physical_input_schema, + SortOptions { + descending: !*asc, + nulls_first: *nulls_first, + }, + ctx_state, + ), + _ => unreachable!(), + }) + .collect::>>()?; + Arc::new(if can_repartition { + SortExec::new_with_partitioning(sort_keys, input_exec, true) + } else { + SortExec::try_new(sort_keys, input_exec)? + }) + }; + + let physical_input_schema = input_exec.schema(); + let window_expr = window_expr .iter() .map(|e| { - self.create_physical_expr( + self.create_window_expr( e, - input.schema(), - &input_exec.schema(), + logical_input_schema, + &physical_input_schema, ctx_state, ) }) - .collect::>>>()?; - Arc::new(RepartitionExec::try_new( + .collect::>>()?; + + Ok(Arc::new(WindowAggExec::try_new( + window_expr, input_exec, - Partitioning::Hash( - partition_keys, - ctx_state.config.target_partitions, - ), - )?) - } else { - input_exec - }; + physical_input_schema, + )?) ) + } + LogicalPlan::Aggregate { + input, + group_expr, + aggr_expr, + .. + } => { + // Initially need to perform the aggregate and then merge the partitions + let input_exec = self.create_initial_plan(input, ctx_state).await?; + let physical_input_schema = input_exec.schema(); + let logical_input_schema = input.as_ref().schema(); - // add a sort phase - let get_sort_keys = |expr: &Expr| match expr { - Expr::WindowFunction { - ref partition_by, - ref order_by, - .. - } => generate_sort_key(partition_by, order_by), - _ => unreachable!(), - }; - let sort_keys = get_sort_keys(&window_expr[0]); - if window_expr.len() > 1 { - debug_assert!( - window_expr[1..] + let groups = group_expr + .iter() + .map(|e| { + tuple_err(( + self.create_physical_expr( + e, + logical_input_schema, + &physical_input_schema, + ctx_state, + ), + physical_name(e), + )) + }) + .collect::>>()?; + let aggregates = aggr_expr + .iter() + .map(|e| { + self.create_aggregate_expr( + e, + logical_input_schema, + &physical_input_schema, + ctx_state, + ) + }) + .collect::>>()?; + + let initial_aggr = Arc::new(HashAggregateExec::try_new( + AggregateMode::Partial, + groups.clone(), + aggregates.clone(), + input_exec, + physical_input_schema.clone(), + )?); + + // update group column indices based on partial aggregate plan evaluation + let final_group: Vec> = (0..groups.len()) + .map(|i| col(&groups[i].1, &initial_aggr.schema())) + .collect::>()?; + + // TODO: dictionary type not yet supported in Hash Repartition + let contains_dict = groups + .iter() + .flat_map(|x| x.0.data_type(physical_input_schema.as_ref())) + .any(|x| matches!(x, DataType::Dictionary(_, _))); + + let can_repartition = !groups.is_empty() + && ctx_state.config.target_partitions > 1 + && ctx_state.config.repartition_aggregations + && !contains_dict; + + let (initial_aggr, next_partition_mode): ( + Arc, + AggregateMode, + ) = if can_repartition { + // Divide partial hash aggregates into multiple partitions by hash key + let hash_repartition = Arc::new(RepartitionExec::try_new( + initial_aggr, + Partitioning::Hash( + final_group.clone(), + ctx_state.config.target_partitions, + ), + )?); + // Combine hash aggregates within the partition + (hash_repartition, AggregateMode::FinalPartitioned) + } else { + // construct a second aggregation, keeping the final column name equal to the + // first aggregation and the expressions corresponding to the respective aggregate + (initial_aggr, AggregateMode::Final) + }; + + Ok(Arc::new(HashAggregateExec::try_new( + next_partition_mode, + final_group .iter() - .all(|expr| get_sort_keys(expr) == sort_keys), - "all window expressions shall have the same sort keys, as guaranteed by logical planning" - ); + .enumerate() + .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) + .collect(), + aggregates, + initial_aggr, + physical_input_schema.clone(), + )?) ) } + LogicalPlan::Projection { input, expr, .. } => { + let input_exec = self.create_initial_plan(input, ctx_state).await?; + let input_schema = input.as_ref().schema(); - let logical_input_schema = input.schema(); + let physical_exprs = expr + .iter() + .map(|e| { + // For projections, SQL planner and logical plan builder may convert user + // provided expressions into logical Column expressions if their results + // are already provided from the input plans. Because we work with + // qualified columns in logical plane, derived columns involve operators or + // functions will contain qualifers as well. This will result in logical + // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc. + // + // If we run these logical columns through physical_name function, we will + // get physical names with column qualifiers, which violates Datafusion's + // field name semantics. To account for this, we need to derive the + // physical name from physical input instead. + // + // This depends on the invariant that logical schema field index MUST match + // with physical schema field index. + let physical_name = if let Expr::Column(col) = e { + match input_schema.index_of_column(col) { + Ok(idx) => { + // index physical field using logical field index + Ok(input_exec.schema().field(idx).name().to_string()) + } + // logical column is not a derived column, safe to pass along to + // physical_name + Err(_) => physical_name(e), + } + } else { + physical_name(e) + }; - let input_exec = if sort_keys.is_empty() { - input_exec - } else { - let physical_input_schema = input_exec.schema(); - let sort_keys = sort_keys + tuple_err(( + self.create_physical_expr( + e, + input_schema, + &input_exec.schema(), + ctx_state, + ), + physical_name, + )) + }) + .collect::>>()?; + + Ok(Arc::new(ProjectionExec::try_new( + physical_exprs, + input_exec, + )?) ) + } + LogicalPlan::Filter { + input, predicate, .. + } => { + let physical_input = self.create_initial_plan(input, ctx_state).await?; + let input_schema = physical_input.as_ref().schema(); + let input_dfschema = input.as_ref().schema(); + let runtime_expr = self.create_physical_expr( + predicate, + input_dfschema, + &input_schema, + ctx_state, + )?; + Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?) ) + } + LogicalPlan::Union { inputs, .. } => { + let physical_plans = futures::stream::iter(inputs) + .then(|lp| self.create_initial_plan(lp, ctx_state)) + .try_collect::>() + .await?; + Ok(Arc::new(UnionExec::new(physical_plans)) ) + } + LogicalPlan::Repartition { + input, + partitioning_scheme, + } => { + let physical_input = self.create_initial_plan(input, ctx_state).await?; + let input_schema = physical_input.schema(); + let input_dfschema = input.as_ref().schema(); + let physical_partitioning = match partitioning_scheme { + LogicalPartitioning::RoundRobinBatch(n) => { + Partitioning::RoundRobinBatch(*n) + } + LogicalPartitioning::Hash(expr, n) => { + let runtime_expr = expr + .iter() + .map(|e| { + self.create_physical_expr( + e, + input_dfschema, + &input_schema, + ctx_state, + ) + }) + .collect::>>()?; + Partitioning::Hash(runtime_expr, *n) + } + }; + Ok(Arc::new(RepartitionExec::try_new( + physical_input, + physical_partitioning, + )?) ) + } + LogicalPlan::Sort { expr, input, .. } => { + let physical_input = self.create_initial_plan(input, ctx_state).await?; + let input_schema = physical_input.as_ref().schema(); + let input_dfschema = input.as_ref().schema(); + let sort_expr = expr .iter() .map(|e| match e { Expr::Sort { @@ -393,423 +646,175 @@ impl DefaultPhysicalPlanner { nulls_first, } => self.create_physical_sort_expr( expr, - logical_input_schema, - &physical_input_schema, + input_dfschema, + &input_schema, SortOptions { descending: !*asc, nulls_first: *nulls_first, }, ctx_state, ), - _ => unreachable!(), + _ => Err(DataFusionError::Plan( + "Sort only accepts sort expressions".to_string(), + )), }) .collect::>>()?; - Arc::new(if can_repartition { - SortExec::new_with_partitioning(sort_keys, input_exec, true) - } else { - SortExec::try_new(sort_keys, input_exec)? - }) - }; - - let physical_input_schema = input_exec.schema(); - let window_expr = window_expr - .iter() - .map(|e| { - self.create_window_expr( - e, - logical_input_schema, - &physical_input_schema, - ctx_state, - ) - }) - .collect::>>()?; - - Ok(Arc::new(WindowAggExec::try_new( - window_expr, - input_exec, - physical_input_schema, - )?)) - } - LogicalPlan::Aggregate { - input, - group_expr, - aggr_expr, - .. - } => { - // Initially need to perform the aggregate and then merge the partitions - let input_exec = self.create_initial_plan(input, ctx_state)?; - let physical_input_schema = input_exec.schema(); - let logical_input_schema = input.as_ref().schema(); - - let groups = group_expr - .iter() - .map(|e| { - tuple_err(( - self.create_physical_expr( - e, - logical_input_schema, - &physical_input_schema, - ctx_state, - ), - physical_name(e), - )) - }) - .collect::>>()?; - let aggregates = aggr_expr - .iter() - .map(|e| { - self.create_aggregate_expr( - e, - logical_input_schema, - &physical_input_schema, - ctx_state, - ) - }) - .collect::>>()?; - - let initial_aggr = Arc::new(HashAggregateExec::try_new( - AggregateMode::Partial, - groups.clone(), - aggregates.clone(), - input_exec, - physical_input_schema.clone(), - )?); - - // update group column indices based on partial aggregate plan evaluation - let final_group: Vec> = (0..groups.len()) - .map(|i| col(&groups[i].1, &initial_aggr.schema())) - .collect::>()?; - - // TODO: dictionary type not yet supported in Hash Repartition - let contains_dict = groups - .iter() - .flat_map(|x| x.0.data_type(physical_input_schema.as_ref())) - .any(|x| matches!(x, DataType::Dictionary(_, _))); - - let can_repartition = !groups.is_empty() - && ctx_state.config.target_partitions > 1 - && ctx_state.config.repartition_aggregations - && !contains_dict; - - let (initial_aggr, next_partition_mode): ( - Arc, - AggregateMode, - ) = if can_repartition { - // Divide partial hash aggregates into multiple partitions by hash key - let hash_repartition = Arc::new(RepartitionExec::try_new( - initial_aggr, - Partitioning::Hash( - final_group.clone(), - ctx_state.config.target_partitions, - ), - )?); - // Combine hash aggregates within the partition - (hash_repartition, AggregateMode::FinalPartitioned) - } else { - // construct a second aggregation, keeping the final column name equal to the - // first aggregation and the expressions corresponding to the respective aggregate - (initial_aggr, AggregateMode::Final) - }; - - Ok(Arc::new(HashAggregateExec::try_new( - next_partition_mode, - final_group + Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?) ) + } + LogicalPlan::Join { + left, + right, + on: keys, + join_type, + .. + } => { + let left_df_schema = left.schema(); + let physical_left = self.create_initial_plan(left, ctx_state).await?; + let right_df_schema = right.schema(); + let physical_right = self.create_initial_plan(right, ctx_state).await?; + let join_on = keys .iter() - .enumerate() - .map(|(i, expr)| (expr.clone(), groups[i].1.clone())) - .collect(), - aggregates, - initial_aggr, - physical_input_schema.clone(), - )?)) - } - LogicalPlan::Projection { input, expr, .. } => { - let input_exec = self.create_initial_plan(input, ctx_state)?; - let input_schema = input.as_ref().schema(); - - let physical_exprs = expr - .iter() - .map(|e| { - // For projections, SQL planner and logical plan builder may convert user - // provided expressions into logical Column expressions if their results - // are already provided from the input plans. Because we work with - // qualified columns in logical plane, derived columns involve operators or - // functions will contain qualifers as well. This will result in logical - // columns with names like `SUM(t1.c1)`, `t1.c1 + t1.c2`, etc. - // - // If we run these logical columns through physical_name function, we will - // get physical names with column qualifiers, which violates Datafusion's - // field name semantics. To account for this, we need to derive the - // physical name from physical input instead. - // - // This depends on the invariant that logical schema field index MUST match - // with physical schema field index. - let physical_name = if let Expr::Column(col) = e { - match input_schema.index_of_column(col) { - Ok(idx) => { - // index physical field using logical field index - Ok(input_exec.schema().field(idx).name().to_string()) - } - // logical column is not a derived column, safe to pass along to - // physical_name - Err(_) => physical_name(e), - } - } else { - physical_name(e) - }; - - tuple_err(( - self.create_physical_expr( - e, - input_schema, - &input_exec.schema(), - ctx_state, - ), - physical_name, - )) - }) - .collect::>>()?; + .map(|(l, r)| { + Ok(( + Column::new(&l.name, left_df_schema.index_of_column(l)?), + Column::new(&r.name, right_df_schema.index_of_column(r)?), + )) + }) + .collect::>()?; - Ok(Arc::new(ProjectionExec::try_new( - physical_exprs, - input_exec, - )?)) - } - LogicalPlan::Filter { - input, predicate, .. - } => { - let physical_input = self.create_initial_plan(input, ctx_state)?; - let input_schema = physical_input.as_ref().schema(); - let input_dfschema = input.as_ref().schema(); - let runtime_expr = self.create_physical_expr( - predicate, - input_dfschema, - &input_schema, - ctx_state, - )?; - Ok(Arc::new(FilterExec::try_new(runtime_expr, physical_input)?)) - } - LogicalPlan::Union { inputs, .. } => { - let physical_plans = inputs - .iter() - .map(|input| self.create_initial_plan(input, ctx_state)) - .collect::>>()?; - Ok(Arc::new(UnionExec::new(physical_plans))) - } - LogicalPlan::Repartition { - input, - partitioning_scheme, - } => { - let physical_input = self.create_initial_plan(input, ctx_state)?; - let input_schema = physical_input.schema(); - let input_dfschema = input.as_ref().schema(); - let physical_partitioning = match partitioning_scheme { - LogicalPartitioning::RoundRobinBatch(n) => { - Partitioning::RoundRobinBatch(*n) - } - LogicalPartitioning::Hash(expr, n) => { - let runtime_expr = expr + if ctx_state.config.target_partitions > 1 + && ctx_state.config.repartition_joins + { + let (left_expr, right_expr) = join_on .iter() - .map(|e| { - self.create_physical_expr( - e, - input_dfschema, - &input_schema, - ctx_state, + .map(|(l, r)| { + ( + Arc::new(l.clone()) as Arc, + Arc::new(r.clone()) as Arc, ) }) - .collect::>>()?; - Partitioning::Hash(runtime_expr, *n) - } - }; - Ok(Arc::new(RepartitionExec::try_new( - physical_input, - physical_partitioning, - )?)) - } - LogicalPlan::Sort { expr, input, .. } => { - let physical_input = self.create_initial_plan(input, ctx_state)?; - let input_schema = physical_input.as_ref().schema(); - let input_dfschema = input.as_ref().schema(); - - let sort_expr = expr - .iter() - .map(|e| match e { - Expr::Sort { - expr, - asc, - nulls_first, - } => self.create_physical_sort_expr( - expr, - input_dfschema, - &input_schema, - SortOptions { - descending: !*asc, - nulls_first: *nulls_first, - }, - ctx_state, - ), - _ => Err(DataFusionError::Plan( - "Sort only accepts sort expressions".to_string(), - )), - }) - .collect::>>()?; - - Ok(Arc::new(SortExec::try_new(sort_expr, physical_input)?)) - } - LogicalPlan::Join { - left, - right, - on: keys, - join_type, - .. - } => { - let left_df_schema = left.schema(); - let physical_left = self.create_initial_plan(left, ctx_state)?; - let right_df_schema = right.schema(); - let physical_right = self.create_initial_plan(right, ctx_state)?; - let join_on = keys - .iter() - .map(|(l, r)| { - Ok(( - Column::new(&l.name, left_df_schema.index_of_column(l)?), - Column::new(&r.name, right_df_schema.index_of_column(r)?), - )) - }) - .collect::>()?; - - if ctx_state.config.target_partitions > 1 - && ctx_state.config.repartition_joins - { - let (left_expr, right_expr) = join_on - .iter() - .map(|(l, r)| { - ( - Arc::new(l.clone()) as Arc, - Arc::new(r.clone()) as Arc, - ) - }) - .unzip(); - - // Use hash partition by default to parallelize hash joins - Ok(Arc::new(HashJoinExec::try_new( - Arc::new(RepartitionExec::try_new( + .unzip(); + + // Use hash partition by default to parallelize hash joins + Ok(Arc::new(HashJoinExec::try_new( + Arc::new(RepartitionExec::try_new( + physical_left, + Partitioning::Hash( + left_expr, + ctx_state.config.target_partitions, + ), + )?), + Arc::new(RepartitionExec::try_new( + physical_right, + Partitioning::Hash( + right_expr, + ctx_state.config.target_partitions, + ), + )?), + join_on, + join_type, + PartitionMode::Partitioned, + )?)) + } else { + Ok(Arc::new(HashJoinExec::try_new( physical_left, - Partitioning::Hash( - left_expr, - ctx_state.config.target_partitions, - ), - )?), - Arc::new(RepartitionExec::try_new( physical_right, - Partitioning::Hash( - right_expr, - ctx_state.config.target_partitions, - ), - )?), - join_on, - join_type, - PartitionMode::Partitioned, - )?)) - } else { - Ok(Arc::new(HashJoinExec::try_new( - physical_left, - physical_right, - join_on, - join_type, - PartitionMode::CollectLeft, - )?)) + join_on, + join_type, + PartitionMode::CollectLeft, + )?)) + } } - } - LogicalPlan::CrossJoin { left, right, .. } => { - let left = self.create_initial_plan(left, ctx_state)?; - let right = self.create_initial_plan(right, ctx_state)?; - Ok(Arc::new(CrossJoinExec::try_new(left, right)?)) - } - LogicalPlan::EmptyRelation { - produce_one_row, - schema, - } => Ok(Arc::new(EmptyExec::new( - *produce_one_row, - SchemaRef::new(schema.as_ref().to_owned().into()), - ))), - LogicalPlan::Limit { input, n, .. } => { - let limit = *n; - let input = self.create_initial_plan(input, ctx_state)?; - - // GlobalLimitExec requires a single partition for input - let input = if input.output_partitioning().partition_count() == 1 { - input - } else { - // Apply a LocalLimitExec to each partition. The optimizer will also insert - // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec - Arc::new(LocalLimitExec::new(input, limit)) - }; - - Ok(Arc::new(GlobalLimitExec::new(input, limit))) - } - LogicalPlan::CreateExternalTable { .. } => { - // There is no default plan for "CREATE EXTERNAL - // TABLE" -- it must be handled at a higher level (so - // that the appropriate table can be registered with - // the context) - Err(DataFusionError::Internal( - "Unsupported logical plan: CreateExternalTable".to_string(), - )) - } - LogicalPlan::Explain { .. } => Err(DataFusionError::Internal( - "Unsupported logical plan: Explain must be root of the plan".to_string(), - )), - LogicalPlan::Analyze { - verbose, - input, - schema, - } => { - let input = self.create_initial_plan(input, ctx_state)?; - let schema = SchemaRef::new(schema.as_ref().to_owned().into()); - Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema))) - } - LogicalPlan::Extension { node } => { - let physical_inputs = node - .inputs() - .into_iter() - .map(|input_plan| self.create_initial_plan(input_plan, ctx_state)) - .collect::>>()?; + LogicalPlan::CrossJoin { left, right, .. } => { + let left = self.create_initial_plan(left, ctx_state).await?; + let right = self.create_initial_plan(right, ctx_state).await?; + Ok(Arc::new(CrossJoinExec::try_new(left, right)?)) + } + LogicalPlan::EmptyRelation { + produce_one_row, + schema, + } => Ok(Arc::new(EmptyExec::new( + *produce_one_row, + SchemaRef::new(schema.as_ref().to_owned().into()), + ))), + LogicalPlan::Limit { input, n, .. } => { + let limit = *n; + let input = self.create_initial_plan(input, ctx_state).await?; + + // GlobalLimitExec requires a single partition for input + let input = if input.output_partitioning().partition_count() == 1 { + input + } else { + // Apply a LocalLimitExec to each partition. The optimizer will also insert + // a CoalescePartitionsExec between the GlobalLimitExec and LocalLimitExec + Arc::new(LocalLimitExec::new(input, limit)) + }; - let maybe_plan = self.extension_planners.iter().try_fold( - None, - |maybe_plan, planner| { - if let Some(plan) = maybe_plan { - Ok(Some(plan)) - } else { - planner.plan_extension( - self, - node.as_ref(), - &node.inputs(), - &physical_inputs, - ctx_state, - ) - } - }, - )?; - let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!( - "No installed planner was able to convert the custom node to an execution plan: {:?}", node - )))?; - - // Ensure the ExecutionPlan's schema matches the - // declared logical schema to catch and warn about - // logic errors when creating user defined plans. - if !node.schema().matches_arrow_schema(&plan.schema()) { - Err(DataFusionError::Plan(format!( - "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \ - LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}", - node, node.schema(), plan.schema() - ))) - } else { - Ok(plan) + Ok(Arc::new(GlobalLimitExec::new(input, limit))) } - } - } + LogicalPlan::CreateExternalTable { .. } => { + // There is no default plan for "CREATE EXTERNAL + // TABLE" -- it must be handled at a higher level (so + // that the appropriate table can be registered with + // the context) + Err(DataFusionError::Internal( + "Unsupported logical plan: CreateExternalTable".to_string(), + )) + } + LogicalPlan::Explain { .. } => Err(DataFusionError::Internal( + "Unsupported logical plan: Explain must be root of the plan".to_string(), + )), + LogicalPlan::Analyze { + verbose, + input, + schema, + } => { + let input = self.create_initial_plan(input, ctx_state).await?; + let schema = SchemaRef::new(schema.as_ref().to_owned().into()); + Ok(Arc::new(AnalyzeExec::new(*verbose, input, schema))) + } + LogicalPlan::Extension { node } => { + let physical_inputs = futures::stream::iter(node.inputs()) + .then(|lp| self.create_initial_plan(lp, ctx_state)) + .try_collect::>() + .await?; + + let maybe_plan = self.extension_planners.iter().try_fold( + None, + |maybe_plan, planner| { + if let Some(plan) = maybe_plan { + Ok(Some(plan)) + } else { + planner.plan_extension( + self, + node.as_ref(), + &node.inputs(), + &physical_inputs, + ctx_state, + ) + } + }, + )?; + let plan = maybe_plan.ok_or_else(|| DataFusionError::Plan(format!( + "No installed planner was able to convert the custom node to an execution plan: {:?}", node + )))?; + + // Ensure the ExecutionPlan's schema matches the + // declared logical schema to catch and warn about + // logic errors when creating user defined plans. + if !node.schema().matches_arrow_schema(&plan.schema()) { + Err(DataFusionError::Plan(format!( + "Extension planner for {:?} created an ExecutionPlan with mismatched schema. \ + LogicalPlan schema: {:?}, ExecutionPlan schema: {:?}", + node, node.schema(), plan.schema() + ))) + } else { + Ok(plan) + } + } + }; + exec_plan + }.boxed() } /// Create a physical expression from a logical expression @@ -1315,7 +1320,7 @@ impl DefaultPhysicalPlanner { /// Returns /// Some(plan) if optimized, and None if logical_plan was not an /// explain (and thus needs to be optimized as normal) - fn handle_explain( + async fn handle_explain( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, @@ -1332,7 +1337,7 @@ impl DefaultPhysicalPlanner { stringified_plans.push(plan.to_stringified(FinalLogicalPlan)); - let input = self.create_initial_plan(plan, ctx_state)?; + let input = self.create_initial_plan(plan, ctx_state).await?; stringified_plans .push(displayable(input.as_ref()).to_stringified(InitialPhysicalPlan)); @@ -1411,15 +1416,15 @@ mod tests { ExecutionContextState::new() } - fn plan(logical_plan: &LogicalPlan) -> Result> { + async fn plan(logical_plan: &LogicalPlan) -> Result> { let mut ctx_state = make_ctx_state(); ctx_state.config.target_partitions = 4; let planner = DefaultPhysicalPlanner::default(); - planner.create_physical_plan(logical_plan, &ctx_state) + planner.create_physical_plan(logical_plan, &ctx_state).await } - #[test] - fn test_all_operators() -> Result<()> { + #[tokio::test] + async fn test_all_operators() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); @@ -1433,7 +1438,7 @@ mod tests { .limit(10)? .build()?; - let plan = plan(&logical_plan)?; + let plan = plan(&logical_plan).await?; // verify that the plan correctly casts u8 to i64 // the cast here is implicit so has CastOptions with safe=true @@ -1443,8 +1448,8 @@ mod tests { Ok(()) } - #[test] - fn test_create_not() -> Result<()> { + #[tokio::test] + async fn test_create_not() -> Result<()> { let schema = Schema::new(vec![Field::new("a", DataType::Boolean, true)]); let dfschema = DFSchema::try_from(schema.clone())?; @@ -1463,8 +1468,8 @@ mod tests { Ok(()) } - #[test] - fn test_with_csv_plan() -> Result<()> { + #[tokio::test] + async fn test_with_csv_plan() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); @@ -1473,7 +1478,7 @@ mod tests { .filter(col("c7").lt(col("c12")))? .build()?; - let plan = plan(&logical_plan)?; + let plan = plan(&logical_plan).await?; // c12 is f64, c7 is u8 -> cast c7 to f64 // the cast here is implicit so has CastOptions with safe=true @@ -1482,8 +1487,8 @@ mod tests { Ok(()) } - #[test] - fn errors() -> Result<()> { + #[tokio::test] + async fn errors() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -1517,14 +1522,16 @@ mod tests { Ok(()) } - #[test] - fn default_extension_planner() { + #[tokio::test] + async fn default_extension_planner() { let ctx_state = make_ctx_state(); let planner = DefaultPhysicalPlanner::default(); let logical_plan = LogicalPlan::Extension { node: Arc::new(NoOpExtensionNode::default()), }; - let plan = planner.create_physical_plan(&logical_plan, &ctx_state); + let plan = planner + .create_physical_plan(&logical_plan, &ctx_state) + .await; let expected_error = "No installed planner was able to convert the custom node to an execution plan: NoOp"; @@ -1539,8 +1546,8 @@ mod tests { } } - #[test] - fn bad_extension_planner() { + #[tokio::test] + async fn bad_extension_planner() { // Test that creating an execution plan whose schema doesn't // match the logical plan's schema generates an error. let ctx_state = make_ctx_state(); @@ -1551,7 +1558,9 @@ mod tests { let logical_plan = LogicalPlan::Extension { node: Arc::new(NoOpExtensionNode::default()), }; - let plan = planner.create_physical_plan(&logical_plan, &ctx_state); + let plan = planner + .create_physical_plan(&logical_plan, &ctx_state) + .await; let expected_error: &str = "Error during planning: \ Extension planner for NoOp created an ExecutionPlan with mismatched schema. \ @@ -1584,8 +1593,8 @@ mod tests { } } - #[test] - fn in_list_types() -> Result<()> { + #[tokio::test] + async fn in_list_types() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); let options = CsvReadOptions::new().schema_infer_max_records(100); @@ -1600,7 +1609,7 @@ mod tests { .filter(col("c12").lt(lit(0.05)))? .project(vec![col("c1").in_list(list, false)])? .build()?; - let execution_plan = plan(&logical_plan)?; + let execution_plan = plan(&logical_plan).await?; // verify that the plan correctly adds cast from Int64(1) to Utf8 let expected = "InListExpr { expr: Column { name: \"c1\", index: 0 }, list: [Literal { value: Utf8(\"a\") }, CastExpr { expr: Literal { value: Int64(1) }, cast_type: Utf8, cast_options: CastOptions { safe: false } }], negated: false }"; assert!(format!("{:?}", execution_plan).contains(expected)); @@ -1615,7 +1624,7 @@ mod tests { .filter(col("c12").lt(lit(0.05)))? .project(vec![col("c12").lt_eq(lit(0.025)).in_list(list, false)])? .build()?; - let execution_plan = plan(&logical_plan); + let execution_plan = plan(&logical_plan).await; let expected_error = "Unsupported CAST from Utf8 to Boolean"; match execution_plan { @@ -1631,8 +1640,8 @@ mod tests { Ok(()) } - #[test] - fn hash_agg_input_schema() -> Result<()> { + #[tokio::test] + async fn hash_agg_input_schema() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); @@ -1646,7 +1655,7 @@ mod tests { .aggregate(vec![col("c1")], vec![sum(col("c2"))])? .build()?; - let execution_plan = plan(&logical_plan)?; + let execution_plan = plan(&logical_plan).await?; let final_hash_agg = execution_plan .as_any() .downcast_ref::() @@ -1662,8 +1671,8 @@ mod tests { Ok(()) } - #[test] - fn hash_agg_group_by_partitioned() -> Result<()> { + #[tokio::test] + async fn hash_agg_group_by_partitioned() -> Result<()> { let testdata = crate::test_util::arrow_test_data(); let path = format!("{}/csv/aggregate_test_100.csv", testdata); @@ -1672,7 +1681,7 @@ mod tests { .aggregate(vec![col("c1")], vec![sum(col("c2"))])? .build()?; - let execution_plan = plan(&logical_plan)?; + let execution_plan = plan(&logical_plan).await?; let formatted = format!("{:?}", execution_plan); // Make sure the plan contains a FinalPartitioned, which means it will not use the Final @@ -1682,8 +1691,8 @@ mod tests { Ok(()) } - #[test] - fn test_explain() { + #[tokio::test] + async fn test_explain() { let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]); let logical_plan = @@ -1694,7 +1703,7 @@ mod tests { .build() .unwrap(); - let plan = plan(&logical_plan).unwrap(); + let plan = plan(&logical_plan).await.unwrap(); if let Some(plan) = plan.as_any().downcast_ref::() { let stringified_plans = plan.stringified_plans(); assert!(stringified_plans.len() >= 4); diff --git a/datafusion/tests/custom_sources.rs b/datafusion/tests/custom_sources.rs index 31551a91d904..a6456f11e91c 100644 --- a/datafusion/tests/custom_sources.rs +++ b/datafusion/tests/custom_sources.rs @@ -182,6 +182,7 @@ impl ExecutionPlan for CustomExecutionPlan { } } +#[async_trait] impl TableProvider for CustomTableProvider { fn as_any(&self) -> &dyn Any { self @@ -191,7 +192,7 @@ impl TableProvider for CustomTableProvider { TEST_CUSTOM_SCHEMA_REF!() } - fn scan( + async fn scan( &self, projection: &Option>, _batch_size: usize, @@ -236,7 +237,7 @@ async fn custom_source_dataframe() -> Result<()> { ); assert_eq!(format!("{:?}", optimized_plan), expected); - let physical_plan = ctx.create_physical_plan(&optimized_plan)?; + let physical_plan = ctx.create_physical_plan(&optimized_plan).await?; assert_eq!(1, physical_plan.schema().fields().len()); assert_eq!("c2", physical_plan.schema().field(0).name().as_str()); @@ -260,7 +261,10 @@ async fn optimizers_catch_all_statistics() { .sql("SELECT count(*), min(c1), max(c1) from test") .unwrap(); - let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap(); + let physical_plan = ctx + .create_physical_plan(&df.to_logical_plan()) + .await + .unwrap(); // when the optimization kicks in, the source is replaced by an EmptyExec assert!( diff --git a/datafusion/tests/parquet_pruning.rs b/datafusion/tests/parquet_pruning.rs index 14f5dd20f470..511a9e60c7a4 100644 --- a/datafusion/tests/parquet_pruning.rs +++ b/datafusion/tests/parquet_pruning.rs @@ -533,6 +533,7 @@ impl ContextWithParquet { let physical_plan = self .ctx .create_physical_plan(&logical_plan) + .await .expect("creating physical plan"); let results = datafusion::physical_plan::collect(physical_plan.clone()) diff --git a/datafusion/tests/provider_filter_pushdown.rs b/datafusion/tests/provider_filter_pushdown.rs index e0102c4f1bcc..653b96c39320 100644 --- a/datafusion/tests/provider_filter_pushdown.rs +++ b/datafusion/tests/provider_filter_pushdown.rs @@ -110,6 +110,7 @@ struct CustomProvider { one_batch: RecordBatch, } +#[async_trait] impl TableProvider for CustomProvider { fn as_any(&self) -> &dyn std::any::Any { self @@ -119,7 +120,7 @@ impl TableProvider for CustomProvider { self.zero_batch.schema() } - fn scan( + async fn scan( &self, _: &Option>, _: usize, diff --git a/datafusion/tests/sql.rs b/datafusion/tests/sql.rs index 4cd0ed3d6a84..a67c82b5232d 100644 --- a/datafusion/tests/sql.rs +++ b/datafusion/tests/sql.rs @@ -188,7 +188,7 @@ async fn parquet_single_nan_schema() { let sql = "SELECT mycol FROM single_nan"; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan).await.unwrap(); let results = collect(plan).await.unwrap(); for batch in results { assert_eq!(1, batch.num_rows()); @@ -223,7 +223,7 @@ async fn parquet_list_columns() { let sql = "SELECT int64_list, utf8_list FROM list_columns"; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan).await.unwrap(); let results = collect(plan).await.unwrap(); // int64_list utf8_list @@ -928,7 +928,7 @@ async fn csv_query_avg_multi_batch() -> Result<()> { let sql = "SELECT avg(c12) FROM aggregate_test_100"; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan).await.unwrap(); let results = collect(plan).await.unwrap(); let batch = &results[0]; let column = batch.column(0); @@ -2366,7 +2366,7 @@ async fn explain_analyze_baseline_metrics() { println!("running query: {}", sql); let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let physical_plan = ctx.create_physical_plan(&plan).unwrap(); + let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let results = collect(physical_plan.clone()).await.unwrap(); let formatted = arrow::util::pretty::pretty_format_batches(&results).unwrap(); println!("Query Output:\n\n{}", formatted); @@ -2648,7 +2648,7 @@ async fn csv_explain_plans() { // Physical plan // Create plan let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx.create_physical_plan(&plan).expect(&msg); + let plan = ctx.create_physical_plan(&plan).await.expect(&msg); // // Execute plan let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); @@ -2845,7 +2845,7 @@ async fn csv_explain_verbose_plans() { // Physical plan // Create plan let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx.create_physical_plan(&plan).expect(&msg); + let plan = ctx.create_physical_plan(&plan).await.expect(&msg); // // Execute plan let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); @@ -3002,7 +3002,7 @@ async fn execute_to_batches(ctx: &mut ExecutionContext, sql: &str) -> Vec Result<()> { let plan = ctx.create_logical_plan(sql).expect(&msg); let msg = format!("Creating physical plan for '{}': {:?}", sql, plan); - let plan = ctx.create_physical_plan(&plan).expect(&msg); + let plan = ctx.create_physical_plan(&plan).await.expect(&msg); let msg = format!("Executing physical plan for '{}': {:?}", sql, plan); let res = collect(plan).await.expect(&msg); @@ -4439,7 +4439,7 @@ async fn test_cast_expressions_error() -> Result<()> { let sql = "SELECT CAST(c1 AS INT) FROM aggregate_test_100"; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan).await.unwrap(); let result = collect(plan).await; match result { @@ -4469,7 +4469,7 @@ async fn test_physical_plan_display_indent() { let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let physical_plan = ctx.create_physical_plan(&plan).unwrap(); + let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let expected = vec![ "GlobalLimitExec: limit=10", " SortExec: [the_min@2 DESC]", @@ -4517,7 +4517,7 @@ async fn test_physical_plan_display_indent_multi_children() { let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let physical_plan = ctx.create_physical_plan(&plan).unwrap(); + let physical_plan = ctx.create_physical_plan(&plan).await.unwrap(); let expected = vec![ "ProjectionExec: expr=[c1@0 as c1]", " CoalesceBatchesExec: target_batch_size=4096", @@ -4555,7 +4555,7 @@ async fn test_aggregation_with_bad_arguments() -> Result<()> { register_aggregate_csv(&mut ctx)?; let sql = "SELECT COUNT(DISTINCT) FROM aggregate_test_100"; let logical_plan = ctx.create_logical_plan(sql)?; - let physical_plan = ctx.create_physical_plan(&logical_plan); + let physical_plan = ctx.create_physical_plan(&logical_plan).await; let err = physical_plan.unwrap_err(); assert_eq!(err.to_string(), "Error during planning: Invalid or wrong number of arguments passed to aggregate: 'COUNT(DISTINCT )'"); Ok(()) @@ -4875,7 +4875,7 @@ async fn avro_single_nan_schema() { let sql = "SELECT mycol FROM single_nan"; let plan = ctx.create_logical_plan(sql).unwrap(); let plan = ctx.optimize(&plan).unwrap(); - let plan = ctx.create_physical_plan(&plan).unwrap(); + let plan = ctx.create_physical_plan(&plan).await.unwrap(); let results = collect(plan).await.unwrap(); for batch in results { assert_eq!(1, batch.num_rows()); diff --git a/datafusion/tests/statistics.rs b/datafusion/tests/statistics.rs index a2375ad282c0..7a19aa7deb69 100644 --- a/datafusion/tests/statistics.rs +++ b/datafusion/tests/statistics.rs @@ -59,6 +59,7 @@ impl StatisticsValidation { } } +#[async_trait] impl TableProvider for StatisticsValidation { fn as_any(&self) -> &dyn Any { self @@ -68,7 +69,7 @@ impl TableProvider for StatisticsValidation { Arc::clone(&self.schema) } - fn scan( + async fn scan( &self, projection: &Option>, _batch_size: usize, @@ -212,7 +213,10 @@ async fn sql_basic() -> Result<()> { let df = ctx.sql("SELECT * from stats_table").unwrap(); - let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap(); + let physical_plan = ctx + .create_physical_plan(&df.to_logical_plan()) + .await + .unwrap(); // the statistics should be those of the source assert_eq!(stats, physical_plan.statistics()); @@ -227,7 +231,10 @@ async fn sql_filter() -> Result<()> { let df = ctx.sql("SELECT * FROM stats_table WHERE c1 = 5").unwrap(); - let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap(); + let physical_plan = ctx + .create_physical_plan(&df.to_logical_plan()) + .await + .unwrap(); // with a filtering condition we loose all knowledge about the statistics assert_eq!(Statistics::default(), physical_plan.statistics()); @@ -241,7 +248,10 @@ async fn sql_limit() -> Result<()> { let mut ctx = init_ctx(stats.clone(), schema)?; let df = ctx.sql("SELECT * FROM stats_table LIMIT 5").unwrap(); - let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap(); + let physical_plan = ctx + .create_physical_plan(&df.to_logical_plan()) + .await + .unwrap(); // when the limit is smaller than the original number of lines // we loose all statistics except the for number of rows which becomes the limit assert_eq!( @@ -254,7 +264,10 @@ async fn sql_limit() -> Result<()> { ); let df = ctx.sql("SELECT * FROM stats_table LIMIT 100").unwrap(); - let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap(); + let physical_plan = ctx + .create_physical_plan(&df.to_logical_plan()) + .await + .unwrap(); // when the limit is larger than the original number of lines, statistics remain unchanged assert_eq!(stats, physical_plan.statistics()); @@ -270,7 +283,10 @@ async fn sql_window() -> Result<()> { .sql("SELECT c2, sum(c1) over (partition by c2) FROM stats_table") .unwrap(); - let physical_plan = ctx.create_physical_plan(&df.to_logical_plan()).unwrap(); + let physical_plan = ctx + .create_physical_plan(&df.to_logical_plan()) + .await + .unwrap(); let result = physical_plan.statistics(); diff --git a/datafusion/tests/user_defined_plan.rs b/datafusion/tests/user_defined_plan.rs index d57a24c33fb2..14600f22d198 100644 --- a/datafusion/tests/user_defined_plan.rs +++ b/datafusion/tests/user_defined_plan.rs @@ -202,10 +202,11 @@ fn make_topk_context() -> ExecutionContext { struct TopKQueryPlanner {} +#[async_trait] impl QueryPlanner for TopKQueryPlanner { /// Given a `LogicalPlan` created from above, create an /// `ExecutionPlan` suitable for execution - fn create_physical_plan( + async fn create_physical_plan( &self, logical_plan: &LogicalPlan, ctx_state: &ExecutionContextState, @@ -216,7 +217,9 @@ impl QueryPlanner for TopKQueryPlanner { TopKPlanner {}, )]); // Delegate most work of physical planning to the default physical planner - physical_planner.create_physical_plan(logical_plan, ctx_state) + physical_planner + .create_physical_plan(logical_plan, ctx_state) + .await } } diff --git a/python/src/dataframe.rs b/python/src/dataframe.rs index 0885ae367a8e..48da234fc23e 100644 --- a/python/src/dataframe.rs +++ b/python/src/dataframe.rs @@ -122,14 +122,19 @@ impl DataFrame { /// Unless some order is specified in the plan, there is no guarantee of the order of the result fn collect(&self, py: Python) -> PyResult { let ctx = _ExecutionContext::from(self.ctx_state.clone()); + let rt = Runtime::new().unwrap(); let plan = ctx .optimize(&self.plan) .map_err(|e| -> errors::DataFusionError { e.into() })?; - let plan = ctx - .create_physical_plan(&plan) - .map_err(|e| -> errors::DataFusionError { e.into() })?; - let rt = Runtime::new().unwrap(); + let plan = py.allow_threads(|| { + rt.block_on(async { + ctx.create_physical_plan(&plan) + .await + .map_err(|e| -> errors::DataFusionError { e.into() }) + }) + })?; + let batches = py.allow_threads(|| { rt.block_on(async { collect(plan) @@ -144,12 +149,20 @@ impl DataFrame { #[args(num = "20")] fn show(&self, py: Python, num: usize) -> PyResult<()> { let ctx = _ExecutionContext::from(self.ctx_state.clone()); - let plan = ctx - .optimize(&self.limit(num)?.plan) - .and_then(|plan| ctx.create_physical_plan(&plan)) - .map_err(|e| -> errors::DataFusionError { e.into() })?; - let rt = Runtime::new().unwrap(); + let plan = py.allow_threads(|| { + rt.block_on(async { + let l_plan = ctx + .optimize(&self.limit(num)?.plan) + .map_err(|e| -> errors::DataFusionError { e.into() })?; + let p_plan = ctx + .create_physical_plan(&l_plan) + .await + .map_err(|e| -> errors::DataFusionError { e.into() })?; + Ok::<_, PyErr>(p_plan) + }) + })?; + let batches = py.allow_threads(|| { rt.block_on(async { collect(plan)