Skip to content

Commit

Permalink
tfbuilder: wait for all ucx enpoints to be created before publishing …
Browse files Browse the repository at this point in the history
…ready and closing the listener thread
  • Loading branch information
ironMann committed Mar 9, 2022
1 parent 2f0c029 commit c924166
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 44 deletions.
47 changes: 18 additions & 29 deletions src/TfBuilder/TfBuilderDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -152,39 +152,18 @@ void TfBuilderDevice::InitTask()
}
}

// start the task
if (!start()) {
mShouldExit = true;
throw std::runtime_error("Aborting InitTask(). Cannot configure.");
}

// wait for the memory allocation and registration to finish
lBuffersAllocatedFuture.wait();
if (!lBuffersAllocatedFuture.get()) {
EDDLOG("InitTask::MemorySegment allocation failed. Exiting...");
throw "InitTask::MemorySegment allocation failed. Exiting...";
return;
}

DDDLOG("InitTask completed.");
}

bool TfBuilderDevice::start()
{
// Connect all StfSender gRPCs
while (!mRpc->start(mTfDataRegionSize, mFlpInputHandler->getStfRequestQueue(), mFlpInputHandler->getDataQueue())) {
// check if should stop looking for TfScheduler
if (mRpc->isTerminateRequested()) {
mShouldExit = true;
return false;
return;
}

// try to reach the scheduler unless we should exit
if (IsRunningState() && NewStatePending()) {
mShouldExit = true;
return false;
return;
}

std::this_thread::sleep_for(1s);
std::this_thread::sleep_for(250ms);
}

// we reached the scheduler instance, initialize everything else
Expand All @@ -200,13 +179,23 @@ bool TfBuilderDevice::start()
// start file sink
mFileSink.start();

// Start input handlers
// wait for the memory allocation and registration to finish
lBuffersAllocatedFuture.wait();
if (!lBuffersAllocatedFuture.get()) {
EDDLOG("InitTask::MemorySegment allocation failed. Exiting...");
throw std::runtime_error("InitTask::MemorySegment allocation failed. Exiting...");
}

// Start input handlers after the memory is finished allocating
if (!mFlpInputHandler->start()) {
mShouldExit = true;
EDDLOG("Could not initialize input connections. Exiting.");
return false;
throw std::runtime_error("Could not initialize input connections. Exiting.");
}

DDDLOG("InitTask completed.");
}

bool TfBuilderDevice::start()
{
return true;
}

Expand Down
3 changes: 2 additions & 1 deletion src/TfBuilder/TfBuilderInput.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ TfBuilderInput::TfBuilderInput(TfBuilderDevice& pStfBuilderDev, std::shared_ptr<
mRpc(pRpc),
mOutStage(pOutStage)
{
// Select which backend is used
// initialize request and data queues
mStfRequestQueue = std::make_shared<ConcurrentQueue<std::string>>();
mReceivedDataQueue = std::make_shared<ConcurrentQueue<ReceivedStfMeta>>();

// Select which backend is used
auto lTransportOpt = mConfig->getStringParam(DataDistNetworkTransportKey, DataDistNetworkTransportDefault);
if (lTransportOpt == "fmq" || lTransportOpt == "FMQ" || lTransportOpt == "fairmq" || lTransportOpt == "FAIRMQ") {
mInputFairMQ = std::make_unique<TfBuilderInputFairMQ>(pRpc, pStfBuilderDev.TfBuilderI(), *mStfRequestQueue, *mReceivedDataQueue);
Expand Down
25 changes: 21 additions & 4 deletions src/TfBuilder/TfBuilderInputUCX.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ void TfBuilderInputUCX::ListenerThread()
// progress the listener worker
const auto lProgress = ucp_worker_progress(listener_worker.ucp_worker);

const auto lSleep = lProgress > 0 ? 0us : 50000us;
const auto lSleep = lProgress > 0 ? 1us : 5000us;

auto lConnInfoOpt = mConnRequestQueue.pop_wait_for(lSleep);
if (!lConnInfoOpt.has_value()) {
Expand Down Expand Up @@ -123,7 +123,7 @@ void TfBuilderInputUCX::ListenerThread()
mConnMap[lStfSenderId] = std::move(lConnStruct);
}

IDDLOG("TfBuilderInputUCX:stop: Listener thread stopped.");
DDDLOG("TfBuilderInputUCX: Listener thread stopped.");
}

bool TfBuilderInputUCX::start()
Expand Down Expand Up @@ -257,11 +257,28 @@ bool TfBuilderInputUCX::start()
// connection successful
break;

} while(true);
} while(true);

// Start all the threads
// Wait until we have all endpoints for StfSenders
do {
std::size_t lNumConnected = 0;
{
std::scoped_lock lLock(mConnectionMapLock);
lNumConnected = mConnMap.size();
}

if (lNumConnected == lNumStfSenders) {
break;
}

std::this_thread::sleep_for(100ms);
DDDLOG_RL(5000, "TfBuilderInputUCX::start: Waiting for all StfSender ucx endpoints. connected={} total={}", lNumConnected, lNumStfSenders);
} while (true);

// This will stop the Listener thread
mState = RUNNING;

DDDLOG("TfBuilderInputUCX::start: Finished");
return true;
}

Expand Down
4 changes: 2 additions & 2 deletions src/TfBuilder/TfBuilderRpc.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ ::grpc::Status TfBuilderRpcImpl::BuildTfRequest(::grpc::ServerContext* /*context
}

sNumTfRequests++;
DDDLOG_RL(5000, "Requesting SubTimeFrames. tf_id={} tf_size={} total_requests={}", lTfId, lTfSize, sNumTfRequests);
DDDLOG_GRL(5000, "Requesting SubTimeFrames. tf_id={} tf_size={} total_requests={}", lTfId, lTfSize, sNumTfRequests);

StfDataRequestMessage lStfRequest;
const auto &lTfBuilderId = mDiscoveryConfig->status().info().process_id();
Expand Down Expand Up @@ -384,7 +384,7 @@ void TfBuilderRpcImpl::StfRequestThread()
while (mRunning && !lReqVector.empty()) {
// wait for the stf slots to become free
if (mNumReqInFlight.load() >= mMaxNumReqInFlight) {
std::this_thread::sleep_for(5ms);
std::this_thread::sleep_for(500us);
continue; // reevaluate the max TF conditions
}

Expand Down
5 changes: 1 addition & 4 deletions src/common/discovery/StfSenderRpcClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
#include <string>
#include <chrono>

namespace o2
{
namespace DataDistribution
namespace o2::DataDistribution
{

StfSenderRpcClient::StfSenderRpcClient(const std::string &pEndpoint) {
Expand Down Expand Up @@ -57,4 +55,3 @@ std::string StfSenderRpcClient::grpc_status() {


}
}
6 changes: 3 additions & 3 deletions src/common/discovery/StfSenderRpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ class StfSenderRpcClientCollection {
}

if (lStfSenderStatus.rpc_endpoint().empty()) {
EDDLOG("StfSender rpc_endpoint field empty. stfs_id={}", lStfSenderId);
DDDLOG("StfSender rpc_endpoint field empty. stfs_id={}", lStfSenderId);
continue;
}

Expand All @@ -169,14 +169,14 @@ class StfSenderRpcClientCollection {

if (mClients.size() < lNumStfSenders) {
lWaitForStfSenders = true;
IDDLOG_RL(1000, "gRPC: Connected to {} out of {} StfSenders", mClients.size(), lNumStfSenders);
IDDLOG_RL(10000, "gRPC: Connected to {} out of {} StfSenders", mClients.size(), lNumStfSenders);
}

// check the connection on existing clients
for (auto &[ mCliId, lClient] : mClients) {
if (!lClient->is_ready()) {
lAllConnReady = false;
IDDLOG_RL(1000, "StfSender gRPC client connection is not ready. stfs_id={} grpc_status={}", mCliId, lClient->grpc_status());
IDDLOG_RL(10000, "StfSender gRPC client connection is not ready. stfs_id={} grpc_status={}", mCliId, lClient->grpc_status());
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/discovery/TfSchedulerRpcClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class TfSchedulerRpcClient {
template <typename ConsulCli>
bool start(std::shared_ptr<ConsulCli> pConfig) {

if (!mShouldRetryStart) {
if (!should_retry_start()) {
return false;
}

Expand Down

0 comments on commit c924166

Please sign in to comment.