diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 8852854a8b5d..ba2596ad1e19 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -72,7 +72,6 @@ pub mod create_drop; pub mod explain_analyze; pub mod expr; pub mod joins; -pub mod repartition; pub mod select; mod sql_api; diff --git a/datafusion/core/tests/sql/repartition.rs b/datafusion/core/tests/sql/repartition.rs deleted file mode 100644 index 332f18e941aa..000000000000 --- a/datafusion/core/tests/sql/repartition.rs +++ /dev/null @@ -1,59 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow::array::UInt32Array; -use arrow::datatypes::{DataType, Field, Schema}; -use arrow::record_batch::RecordBatch; -use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; -use datafusion::physical_plan::repartition::RepartitionExec; -use datafusion::physical_plan::{ExecutionPlan, Partitioning}; -use datafusion::prelude::{SessionConfig, SessionContext}; -use datafusion::test_util::UnboundedExec; -use datafusion_common::Result; -use datafusion_physical_expr::expressions::Column; -use datafusion_physical_expr::PhysicalExpr; -use futures::StreamExt; -use std::sync::Arc; - -/// See -#[tokio::test] -async fn unbounded_repartition() -> Result<()> { - let config = SessionConfig::new(); - let ctx = SessionContext::new_with_config(config); - let task = ctx.task_ctx(); - let schema = Arc::new(Schema::new(vec![Field::new("a2", DataType::UInt32, false)])); - let batch = RecordBatch::try_new( - Arc::clone(&schema), - vec![Arc::new(UInt32Array::from(vec![1]))], - )?; - let input = Arc::new(UnboundedExec::new(None, batch.clone(), 1)); - let on: Vec> = vec![Arc::new(Column::new("a2", 0))]; - let plan = Arc::new(RepartitionExec::try_new(input, Partitioning::Hash(on, 3))?); - let plan = Arc::new(CoalescePartitionsExec::new(plan.clone())); - let mut stream = plan.execute(0, task)?; - - // Note: `tokio::time::timeout` does NOT help here because in the mentioned issue, the whole runtime is blocked by a - // CPU-spinning thread. Using a multithread runtime with multiple threads is NOT a solution since this would not - // trigger the bug (the bug is not specific to a single-thread RT though, it's just the only way to trigger it reliably). - let batch_actual = stream - .next() - .await - .expect("not terminated") - .expect("no error in stream"); - assert_eq!(batch_actual, batch); - Ok(()) -} diff --git a/datafusion/sqllogictest/test_files/repartition.slt b/datafusion/sqllogictest/test_files/repartition.slt index 9829299f43e5..7c141adf82b1 100644 --- a/datafusion/sqllogictest/test_files/repartition.slt +++ b/datafusion/sqllogictest/test_files/repartition.slt @@ -71,3 +71,59 @@ AggregateExec: mode=FinalPartitioned, gby=[column1@0 as column1], aggr=[SUM(parq # Cleanup statement ok DROP TABLE parquet_table; + + + +# Unbounded repartition +# See https://github.com/apache/arrow-datafusion/issues/5278 +# Set up unbounded table and run a query - the query plan should display a `RepartitionExec` +# and a `CoalescePartitionsExec` +statement ok +CREATE UNBOUNDED EXTERNAL TABLE sink_table ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT NOT NULL, + c5 INTEGER NOT NULL, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 INT UNSIGNED NOT NULL, + c10 BIGINT UNSIGNED NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL + ) +STORED AS CSV +WITH HEADER ROW +LOCATION '../../testing/data/csv/aggregate_test_100.csv'; + +query TII +SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5; +---- +c 2 1 +b 1 29 +e 3 104 +a 3 13 +d 1 38 + +statement ok +set datafusion.execution.target_partitions = 3; + +statement ok +set datafusion.optimizer.enable_round_robin_repartition = true; + +query TT +EXPLAIN SELECT c1, c2, c3 FROM sink_table WHERE c3 > 0 LIMIT 5; +---- +logical_plan +Limit: skip=0, fetch=5 +--Filter: sink_table.c3 > Int16(0) +----TableScan: sink_table projection=[c1, c2, c3] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--CoalescePartitionsExec +----CoalesceBatchesExec: target_batch_size=8192 +------FilterExec: c3@2 > 0 +--------RepartitionExec: partitioning=RoundRobinBatch(3), input_partitions=1 +----------StreamingTableExec: partition_sizes=1, projection=[c1, c2, c3], infinite_source=true