From 4cba288d89799178ca8bee8d690d4ce612bb2fba Mon Sep 17 00:00:00 2001 From: Rossi Sun Date: Fri, 21 Feb 2025 10:02:08 +0800 Subject: [PATCH] Address review comments --- cpp/src/arrow/acero/hash_join_benchmark.cc | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/cpp/src/arrow/acero/hash_join_benchmark.cc b/cpp/src/arrow/acero/hash_join_benchmark.cc index 424693ed5349e..f592e17b1e6f7 100644 --- a/cpp/src/arrow/acero/hash_join_benchmark.cc +++ b/cpp/src/arrow/acero/hash_join_benchmark.cc @@ -163,11 +163,7 @@ class JoinBenchmark { &(schema_mgr_->proj_maps[1]), std::move(key_cmp), settings.residual_filter, std::move(register_task_group_callback), std::move(start_task_group_callback), [](int64_t, ExecBatch) { return Status::OK(); }, - [&](int64_t) { - std::unique_lock lk(finished_mutex_); - finished_cv_.notify_one(); - return Status::OK(); - })); + [&](int64_t) { return Status::OK(); })); task_group_probe_ = scheduler_->RegisterTaskGroup( [this](size_t thread_index, int64_t task_id) -> Status { @@ -194,15 +190,12 @@ class JoinBenchmark { l_batches_.batch_count()); })); - std::unique_lock lk(finished_mutex_); - finished_cv_.wait(lk); + thread_pool_->WaitForIdle(); } std::unique_ptr scheduler_; ThreadIndexer thread_indexer_; arrow::internal::ThreadPool* thread_pool_; - std::condition_variable finished_cv_; - std::mutex finished_mutex_; AccumulationQueue l_batches_; AccumulationQueue r_batches_;