Skip to content

Commit

Permalink
tfscheduler: reset monitoring counters on the start of a new run
Browse files Browse the repository at this point in the history
  • Loading branch information
ironMann committed Mar 4, 2022
1 parent 02348cc commit af83384
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 21 deletions.
8 changes: 3 additions & 5 deletions src/TfScheduler/TfSchedulerStfInfo.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,14 @@ void TfSchedulerStfInfo::SchedulingThread()
std::map<std::string, std::uint64_t> lStfSenderMissingCnt;
std::optional<std::vector<StfInfo>> lStfInfosOpt;

// total number of scheduled Tfs
std::size_t lScheduledTfs = 0;

// Build or discard
bool lBuildIncomplete = mDiscoveryConfig->getBoolParam(BuildIncompleteTfsKey, BuildIncompleteTfsValue);
IDDLOG("TfScheduler: Building of incomplete TimeFrames is {}.", lBuildIncomplete ? "enabled" : "disabled");

while ((lStfInfosOpt = mCompleteStfsInfoQueue.pop()) != std::nullopt) {

DDMON("tfscheduler", "tf.rejected.total", mNotScheduledTfsCount);
DDMON("tfscheduler", "tf.scheduled.total", lScheduledTfs);
DDMON("tfscheduler", "tf.scheduled.total", mScheduledTfs);

const std::vector<StfInfo> &lStfInfos = lStfInfosOpt.value();
TfBuildingInformation lRequest;
Expand Down Expand Up @@ -105,7 +102,7 @@ void TfSchedulerStfInfo::SchedulingThread()
switch (lResponse.status()) {
case BuildTfResponse::OK:
// marked TfBuilder as scheduled
lScheduledTfs++;
mScheduledTfs++;
mTfBuilderInfo.markTfBuilderWithTfId(lTfBuilderId, lRequest.tf_id());
break;
case BuildTfResponse::ERROR_NOMEM:
Expand Down Expand Up @@ -434,6 +431,7 @@ void TfSchedulerStfInfo::addStfInfo(const StfSenderStfInfo &pStfInfo, SchedulerS
DDDLOG("New RunNumber received. run_number={}", lRunNumber);
// reset internal counters
reset();
mTfBuilderInfo.resetCounters();
mRunNumber = lRunNumber;
} else if (mRunNumber > lRunNumber) {
EDDLOG_GRL(500, "New RunNumber is smaller than the previous. run_number={} prev_run_number={}", lRunNumber, mRunNumber);
Expand Down
2 changes: 2 additions & 0 deletions src/TfScheduler/TfSchedulerStfInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ class TfSchedulerStfInfo
std::uint64_t mMaxCompletedTfId = 0;
std::uint64_t mNotScheduledTfsCount = 0;
std::uint64_t mStaleTfCount = 0;
std::uint64_t mScheduledTfs = 0;
EventRecorder mDroppedStfs;
EventRecorder mBuiltTfs;

Expand All @@ -189,6 +190,7 @@ class TfSchedulerStfInfo
mMaxCompletedTfId = 0;
mNotScheduledTfsCount = 0;
mStaleTfCount = 0;
mScheduledTfs = 0;
mDroppedStfs.reset();
mBuiltTfs.reset();

Expand Down
28 changes: 12 additions & 16 deletions src/TfScheduler/TfSchedulerTfBuilderInfo.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,6 @@ void TfSchedulerTfBuilderInfo::updateTfBuilderInfo(const TfBuilderUpdateMessage

bool TfSchedulerTfBuilderInfo::findTfBuilderForTf(const std::uint64_t pSize, std::string& pTfBuilderId /*out*/)
{
static std::atomic_uint64_t sNoTfBuilderAvailable = 0;
static std::atomic_uint64_t sNoMemoryAvailable = 0;
static std::atomic_uint64_t sTfNumExceeeded = 0;

// NOTE: we will overestimate memory requirement by a factor, until TfBuilder updates
// us with the actual size.
const std::uint64_t lTfEstSize = pSize * (sTfSizeOverestimatePercent + 100) / 100;
Expand All @@ -159,22 +155,22 @@ bool TfSchedulerTfBuilderInfo::findTfBuilderForTf(const std::uint64_t pSize, std
// TfBuilder not found?
if ( lIt == mReadyTfBuilders.end() ) {
if (mReadyTfBuilders.empty()) {
++sNoTfBuilderAvailable;
DDMON("tfscheduler", "tf.rejected.no_tfb_inst", sNoTfBuilderAvailable);
++mNoTfBuilderAvailable;
DDMON("tfscheduler", "tf.rejected.no_tfb_inst", mNoTfBuilderAvailable);

WDDLOG_RL(1000, "FindTfBuilder: TF cannot be scheduled. reason=NO_TFBUILDERS total={}",
sNoTfBuilderAvailable);
WDDLOG_RL(10000, "FindTfBuilder: TF cannot be scheduled. reason=NO_TFBUILDERS total={}",
mNoTfBuilderAvailable);

} else if (lMaxTfExceeded) {
++sTfNumExceeeded;
WDDLOG_RL(1000, "FindTfBuilder: TF cannot be scheduled. reason=NUM_TF_EXCEEEDED total={} tf_size={} ready_tfb={}",
sTfNumExceeeded, lTfEstSize, mReadyTfBuilders.size());
DDMON("tfscheduler", "tf.rejected.max_tf_exceeded", sTfNumExceeeded);
++mTfNumExceeeded;
WDDLOG_RL(10000, "FindTfBuilder: TF cannot be scheduled. reason=NUM_TF_EXCEEEDED total={} tf_size={} ready_tfb={}",
mTfNumExceeeded, lTfEstSize, mReadyTfBuilders.size());
DDMON("tfscheduler", "tf.rejected.max_tf_exceeded", mTfNumExceeeded);
} else {
++sNoMemoryAvailable;
DDMON("tfscheduler", "tf.rejected.no_tfb_buf", sNoMemoryAvailable);
WDDLOG_RL(1000, "FindTfBuilder: TF cannot be scheduled. reason=NO_MEMORY total={} tf_size={} ready_tfb={}",
sNoMemoryAvailable, lTfEstSize, mReadyTfBuilders.size());
++mNoMemoryAvailable;
DDMON("tfscheduler", "tf.rejected.no_tfb_buf", mNoMemoryAvailable);
WDDLOG_RL(10000, "FindTfBuilder: TF cannot be scheduled. reason=NO_MEMORY total={} tf_size={} ready_tfb={}",
mNoMemoryAvailable, lTfEstSize, mReadyTfBuilders.size());
}
return false;
}
Expand Down
12 changes: 12 additions & 0 deletions src/TfScheduler/TfSchedulerTfBuilderInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ class TfSchedulerTfBuilderInfo
}
}

void resetCounters() {
mNoTfBuilderAvailable = 0;
mNoMemoryAvailable = 0;
mTfNumExceeeded = 0;
}

private:
/// Overestimation of actual size for TF building
static constexpr std::uint64_t sTfSizeOverestimatePercent = std::uint64_t(10);
Expand Down Expand Up @@ -204,6 +210,12 @@ class TfSchedulerTfBuilderInfo
/// List of TfBuilders for Topological distribution
mutable std::recursive_mutex mTopoInfoLock;
std::unordered_map<TfBuilderTopoInfo, std::shared_ptr<TfBuilderInfo>, TfBuilderTopoInfo> mTopoTfBuilders;


/// Counters
std::atomic_uint64_t mNoTfBuilderAvailable = 0;
std::atomic_uint64_t mNoMemoryAvailable = 0;
std::atomic_uint64_t mTfNumExceeeded = 0;
};


Expand Down

0 comments on commit af83384

Please sign in to comment.