From c92416675f363b03d764155f689cf4a471bf8487 Mon Sep 17 00:00:00 2001 From: Gvozden Neskovic Date: Wed, 9 Mar 2022 18:43:40 +0100 Subject: [PATCH] tfbuilder: wait for all ucx enpoints to be created before publishing ready and closing the listener thread --- src/TfBuilder/TfBuilderDevice.cxx | 47 ++++++++------------- src/TfBuilder/TfBuilderInput.cxx | 3 +- src/TfBuilder/TfBuilderInputUCX.cxx | 25 +++++++++-- src/TfBuilder/TfBuilderRpc.cxx | 4 +- src/common/discovery/StfSenderRpcClient.cxx | 5 +-- src/common/discovery/StfSenderRpcClient.h | 6 +-- src/common/discovery/TfSchedulerRpcClient.h | 2 +- 7 files changed, 48 insertions(+), 44 deletions(-) diff --git a/src/TfBuilder/TfBuilderDevice.cxx b/src/TfBuilder/TfBuilderDevice.cxx index b98ac7f..3645d30 100644 --- a/src/TfBuilder/TfBuilderDevice.cxx +++ b/src/TfBuilder/TfBuilderDevice.cxx @@ -152,39 +152,18 @@ void TfBuilderDevice::InitTask() } } - // start the task - if (!start()) { - mShouldExit = true; - throw std::runtime_error("Aborting InitTask(). Cannot configure."); - } - - // wait for the memory allocation and registration to finish - lBuffersAllocatedFuture.wait(); - if (!lBuffersAllocatedFuture.get()) { - EDDLOG("InitTask::MemorySegment allocation failed. Exiting..."); - throw "InitTask::MemorySegment allocation failed. Exiting..."; - return; - } - - DDDLOG("InitTask completed."); -} - -bool TfBuilderDevice::start() -{ + // Connect all StfSender gRPCs while (!mRpc->start(mTfDataRegionSize, mFlpInputHandler->getStfRequestQueue(), mFlpInputHandler->getDataQueue())) { // check if should stop looking for TfScheduler if (mRpc->isTerminateRequested()) { - mShouldExit = true; - return false; + return; } // try to reach the scheduler unless we should exit if (IsRunningState() && NewStatePending()) { - mShouldExit = true; - return false; + return; } - - std::this_thread::sleep_for(1s); + std::this_thread::sleep_for(250ms); } // we reached the scheduler instance, initialize everything else @@ -200,13 +179,23 @@ bool TfBuilderDevice::start() // start file sink mFileSink.start(); - // Start input handlers + // wait for the memory allocation and registration to finish + lBuffersAllocatedFuture.wait(); + if (!lBuffersAllocatedFuture.get()) { + EDDLOG("InitTask::MemorySegment allocation failed. Exiting..."); + throw std::runtime_error("InitTask::MemorySegment allocation failed. Exiting..."); + } + + // Start input handlers after the memory is finished allocating if (!mFlpInputHandler->start()) { - mShouldExit = true; - EDDLOG("Could not initialize input connections. Exiting."); - return false; + throw std::runtime_error("Could not initialize input connections. Exiting."); } + DDDLOG("InitTask completed."); +} + +bool TfBuilderDevice::start() +{ return true; } diff --git a/src/TfBuilder/TfBuilderInput.cxx b/src/TfBuilder/TfBuilderInput.cxx index 1054408..c8af7e2 100644 --- a/src/TfBuilder/TfBuilderInput.cxx +++ b/src/TfBuilder/TfBuilderInput.cxx @@ -39,10 +39,11 @@ TfBuilderInput::TfBuilderInput(TfBuilderDevice& pStfBuilderDev, std::shared_ptr< mRpc(pRpc), mOutStage(pOutStage) { - // Select which backend is used + // initialize request and data queues mStfRequestQueue = std::make_shared>(); mReceivedDataQueue = std::make_shared>(); + // Select which backend is used auto lTransportOpt = mConfig->getStringParam(DataDistNetworkTransportKey, DataDistNetworkTransportDefault); if (lTransportOpt == "fmq" || lTransportOpt == "FMQ" || lTransportOpt == "fairmq" || lTransportOpt == "FAIRMQ") { mInputFairMQ = std::make_unique(pRpc, pStfBuilderDev.TfBuilderI(), *mStfRequestQueue, *mReceivedDataQueue); diff --git a/src/TfBuilder/TfBuilderInputUCX.cxx b/src/TfBuilder/TfBuilderInputUCX.cxx index caff672..67af0c2 100644 --- a/src/TfBuilder/TfBuilderInputUCX.cxx +++ b/src/TfBuilder/TfBuilderInputUCX.cxx @@ -81,7 +81,7 @@ void TfBuilderInputUCX::ListenerThread() // progress the listener worker const auto lProgress = ucp_worker_progress(listener_worker.ucp_worker); - const auto lSleep = lProgress > 0 ? 0us : 50000us; + const auto lSleep = lProgress > 0 ? 1us : 5000us; auto lConnInfoOpt = mConnRequestQueue.pop_wait_for(lSleep); if (!lConnInfoOpt.has_value()) { @@ -123,7 +123,7 @@ void TfBuilderInputUCX::ListenerThread() mConnMap[lStfSenderId] = std::move(lConnStruct); } - IDDLOG("TfBuilderInputUCX:stop: Listener thread stopped."); + DDDLOG("TfBuilderInputUCX: Listener thread stopped."); } bool TfBuilderInputUCX::start() @@ -257,11 +257,28 @@ bool TfBuilderInputUCX::start() // connection successful break; - } while(true); + } while(true); - // Start all the threads + // Wait until we have all endpoints for StfSenders + do { + std::size_t lNumConnected = 0; + { + std::scoped_lock lLock(mConnectionMapLock); + lNumConnected = mConnMap.size(); + } + + if (lNumConnected == lNumStfSenders) { + break; + } + + std::this_thread::sleep_for(100ms); + DDDLOG_RL(5000, "TfBuilderInputUCX::start: Waiting for all StfSender ucx endpoints. connected={} total={}", lNumConnected, lNumStfSenders); + } while (true); + + // This will stop the Listener thread mState = RUNNING; + DDDLOG("TfBuilderInputUCX::start: Finished"); return true; } diff --git a/src/TfBuilder/TfBuilderRpc.cxx b/src/TfBuilder/TfBuilderRpc.cxx index 66ea1bc..7eac79a 100644 --- a/src/TfBuilder/TfBuilderRpc.cxx +++ b/src/TfBuilder/TfBuilderRpc.cxx @@ -295,7 +295,7 @@ ::grpc::Status TfBuilderRpcImpl::BuildTfRequest(::grpc::ServerContext* /*context } sNumTfRequests++; - DDDLOG_RL(5000, "Requesting SubTimeFrames. tf_id={} tf_size={} total_requests={}", lTfId, lTfSize, sNumTfRequests); + DDDLOG_GRL(5000, "Requesting SubTimeFrames. tf_id={} tf_size={} total_requests={}", lTfId, lTfSize, sNumTfRequests); StfDataRequestMessage lStfRequest; const auto &lTfBuilderId = mDiscoveryConfig->status().info().process_id(); @@ -384,7 +384,7 @@ void TfBuilderRpcImpl::StfRequestThread() while (mRunning && !lReqVector.empty()) { // wait for the stf slots to become free if (mNumReqInFlight.load() >= mMaxNumReqInFlight) { - std::this_thread::sleep_for(5ms); + std::this_thread::sleep_for(500us); continue; // reevaluate the max TF conditions } diff --git a/src/common/discovery/StfSenderRpcClient.cxx b/src/common/discovery/StfSenderRpcClient.cxx index 22e5975..a863eb2 100644 --- a/src/common/discovery/StfSenderRpcClient.cxx +++ b/src/common/discovery/StfSenderRpcClient.cxx @@ -16,9 +16,7 @@ #include #include -namespace o2 -{ -namespace DataDistribution +namespace o2::DataDistribution { StfSenderRpcClient::StfSenderRpcClient(const std::string &pEndpoint) { @@ -57,4 +55,3 @@ std::string StfSenderRpcClient::grpc_status() { } -} diff --git a/src/common/discovery/StfSenderRpcClient.h b/src/common/discovery/StfSenderRpcClient.h index 1fb57ff..33dd744 100644 --- a/src/common/discovery/StfSenderRpcClient.h +++ b/src/common/discovery/StfSenderRpcClient.h @@ -150,7 +150,7 @@ class StfSenderRpcClientCollection { } if (lStfSenderStatus.rpc_endpoint().empty()) { - EDDLOG("StfSender rpc_endpoint field empty. stfs_id={}", lStfSenderId); + DDDLOG("StfSender rpc_endpoint field empty. stfs_id={}", lStfSenderId); continue; } @@ -169,14 +169,14 @@ class StfSenderRpcClientCollection { if (mClients.size() < lNumStfSenders) { lWaitForStfSenders = true; - IDDLOG_RL(1000, "gRPC: Connected to {} out of {} StfSenders", mClients.size(), lNumStfSenders); + IDDLOG_RL(10000, "gRPC: Connected to {} out of {} StfSenders", mClients.size(), lNumStfSenders); } // check the connection on existing clients for (auto &[ mCliId, lClient] : mClients) { if (!lClient->is_ready()) { lAllConnReady = false; - IDDLOG_RL(1000, "StfSender gRPC client connection is not ready. stfs_id={} grpc_status={}", mCliId, lClient->grpc_status()); + IDDLOG_RL(10000, "StfSender gRPC client connection is not ready. stfs_id={} grpc_status={}", mCliId, lClient->grpc_status()); } } } diff --git a/src/common/discovery/TfSchedulerRpcClient.h b/src/common/discovery/TfSchedulerRpcClient.h index 48f6b84..5b51804 100644 --- a/src/common/discovery/TfSchedulerRpcClient.h +++ b/src/common/discovery/TfSchedulerRpcClient.h @@ -41,7 +41,7 @@ class TfSchedulerRpcClient { template bool start(std::shared_ptr pConfig) { - if (!mShouldRetryStart) { + if (!should_retry_start()) { return false; }