From 521281deee9dc18fbe2987a2625a0ccb44ef0308 Mon Sep 17 00:00:00 2001 From: UdjinM6 Date: Sat, 16 Feb 2019 17:43:10 +0300 Subject: [PATCH] Separate init/destroy and start/stop steps in LLMQ flow --- src/init.cpp | 3 ++ src/llmq/quorums_chainlocks.cpp | 10 ++++++- src/llmq/quorums_chainlocks.h | 4 ++- src/llmq/quorums_debug.cpp | 4 +++ src/llmq/quorums_debug.h | 2 ++ src/llmq/quorums_dkgsessionmgr.cpp | 12 ++++++-- src/llmq/quorums_dkgsessionmgr.h | 3 ++ src/llmq/quorums_init.cpp | 43 ++++++++++++++++++++++++----- src/llmq/quorums_init.h | 6 +++- src/llmq/quorums_signing_shares.cpp | 15 +++++++--- src/llmq/quorums_signing_shares.h | 5 ++-- 11 files changed, 88 insertions(+), 19 deletions(-) diff --git a/src/init.cpp b/src/init.cpp index bb22cf3e66de9..0a331af55d264 100644 --- a/src/init.cpp +++ b/src/init.cpp @@ -236,6 +236,7 @@ void PrepareShutdown() StopREST(); StopRPC(); StopHTTPServer(); + llmq::StopLLMQSystem(); // fRPCInWarmup should be `false` if we completed the loading sequence // before a shutdown request was received @@ -2066,6 +2067,8 @@ bool AppInitMain(boost::thread_group& threadGroup, CScheduler& scheduler) #endif // ENABLE_WALLET } + llmq::StartLLMQSystem(); + // ********************************************************* Step 12: start node //// debug print diff --git a/src/llmq/quorums_chainlocks.cpp b/src/llmq/quorums_chainlocks.cpp index 2cf46ad008772..7ab7226fffffa 100644 --- a/src/llmq/quorums_chainlocks.cpp +++ b/src/llmq/quorums_chainlocks.cpp @@ -28,10 +28,18 @@ std::string CChainLockSig::ToString() const CChainLocksHandler::CChainLocksHandler(CScheduler* _scheduler) : scheduler(_scheduler) { - quorumSigningManager->RegisterRecoveredSigsListener(this); } CChainLocksHandler::~CChainLocksHandler() +{ +} + +void CChainLocksHandler::RegisterAsRecoveredSigsListener() +{ + quorumSigningManager->RegisterRecoveredSigsListener(this); +} + +void CChainLocksHandler::UnregisterAsRecoveredSigsListener() { quorumSigningManager->UnregisterRecoveredSigsListener(this); } diff --git a/src/llmq/quorums_chainlocks.h b/src/llmq/quorums_chainlocks.h index 064d6bd417cd5..31065a478f9d7 100644 --- a/src/llmq/quorums_chainlocks.h +++ b/src/llmq/quorums_chainlocks.h @@ -68,7 +68,9 @@ class CChainLocksHandler : public CRecoveredSigsListener CChainLocksHandler(CScheduler* _scheduler); ~CChainLocksHandler(); -public: + void RegisterAsRecoveredSigsListener(); + void UnregisterAsRecoveredSigsListener(); + bool AlreadyHave(const CInv& inv); bool GetChainLockByHash(const uint256& hash, CChainLockSig& ret); diff --git a/src/llmq/quorums_debug.cpp b/src/llmq/quorums_debug.cpp index 6237b2721e8af..14524d97d4754 100644 --- a/src/llmq/quorums_debug.cpp +++ b/src/llmq/quorums_debug.cpp @@ -110,6 +110,10 @@ UniValue CDKGDebugSessionStatus::ToJson(int detailLevel) const CDKGDebugManager::CDKGDebugManager(CScheduler* _scheduler) : scheduler(_scheduler) +{ +} + +void CDKGDebugManager::StartScheduler() { if (scheduler) { scheduler->scheduleEvery([&]() { diff --git a/src/llmq/quorums_debug.h b/src/llmq/quorums_debug.h index 136d0ab3d7940..33d9bcd8caa1d 100644 --- a/src/llmq/quorums_debug.h +++ b/src/llmq/quorums_debug.h @@ -158,6 +158,8 @@ class CDKGDebugManager public: CDKGDebugManager(CScheduler* _scheduler); + void StartScheduler(); + void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); bool PreVerifyDebugStatusMessage(const uint256& hash, CDKGDebugStatus& status, bool& retBan); void ScheduleProcessPending(); diff --git a/src/llmq/quorums_dkgsessionmgr.cpp b/src/llmq/quorums_dkgsessionmgr.cpp index f34bed2f5c32a..f3796a42eab36 100644 --- a/src/llmq/quorums_dkgsessionmgr.cpp +++ b/src/llmq/quorums_dkgsessionmgr.cpp @@ -24,18 +24,26 @@ static const std::string DB_SKCONTRIB = "qdkg_S"; CDKGSessionManager::CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker) : evoDb(_evoDb), blsWorker(_blsWorker) +{ +} + +CDKGSessionManager::~CDKGSessionManager() +{ +} + +void CDKGSessionManager::StartMessageHandlerPool() { for (const auto& qt : Params().GetConsensus().llmqs) { dkgSessionHandlers.emplace(std::piecewise_construct, std::forward_as_tuple(qt.first), - std::forward_as_tuple(qt.second, _evoDb, messageHandlerPool, blsWorker, *this)); + std::forward_as_tuple(qt.second, evoDb, messageHandlerPool, blsWorker, *this)); } messageHandlerPool.resize(2); RenameThreadPool(messageHandlerPool, "quorum-msg"); } -CDKGSessionManager::~CDKGSessionManager() +void CDKGSessionManager::StopMessageHandlerPool() { messageHandlerPool.stop(true); } diff --git a/src/llmq/quorums_dkgsessionmgr.h b/src/llmq/quorums_dkgsessionmgr.h index 4b506e07fc51e..0f63ab62d21bd 100644 --- a/src/llmq/quorums_dkgsessionmgr.h +++ b/src/llmq/quorums_dkgsessionmgr.h @@ -50,6 +50,9 @@ class CDKGSessionManager CDKGSessionManager(CEvoDB& _evoDb, CBLSWorker& _blsWorker); ~CDKGSessionManager(); + void StartMessageHandlerPool(); + void StopMessageHandlerPool(); + void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload); void ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, CConnman& connman); diff --git a/src/llmq/quorums_init.cpp b/src/llmq/quorums_init.cpp index 30a867b9a50bb..5b3c4aaceb196 100644 --- a/src/llmq/quorums_init.cpp +++ b/src/llmq/quorums_init.cpp @@ -31,13 +31,6 @@ void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests) chainLocksHandler = new CChainLocksHandler(scheduler); } -void InterruptLLMQSystem() -{ - if (quorumSigSharesManager) { - quorumSigSharesManager->InterruptWorkerThread(); - } -} - void DestroyLLMQSystem() { delete chainLocksHandler; @@ -56,4 +49,40 @@ void DestroyLLMQSystem() quorumDKGDebugManager = nullptr; } +void StartLLMQSystem() +{ + if (quorumDKGDebugManager) { + quorumDKGDebugManager->StartScheduler(); + } + if (quorumDKGSessionManager) { + quorumDKGSessionManager->StartMessageHandlerPool(); + } + if (quorumSigSharesManager) { + quorumSigSharesManager->StartWorkerThread(); + } + if (chainLocksHandler) { + chainLocksHandler->RegisterAsRecoveredSigsListener(); + } +} + +void StopLLMQSystem() +{ + if (chainLocksHandler) { + chainLocksHandler->UnregisterAsRecoveredSigsListener(); + } + if (quorumSigSharesManager) { + quorumSigSharesManager->StopWorkerThread(); + } + if (quorumDKGSessionManager) { + quorumDKGSessionManager->StopMessageHandlerPool(); + } +} + +void InterruptLLMQSystem() +{ + if (quorumSigSharesManager) { + quorumSigSharesManager->InterruptWorkerThread(); + } +} + } diff --git a/src/llmq/quorums_init.h b/src/llmq/quorums_init.h index a01f1622cbd72..9a5398d63c129 100644 --- a/src/llmq/quorums_init.h +++ b/src/llmq/quorums_init.h @@ -14,10 +14,14 @@ namespace llmq // If true, we will connect to all new quorums and watch their communication static const bool DEFAULT_WATCH_QUORUMS = false; +// Init/destroy LLMQ globals void InitLLMQSystem(CEvoDB& evoDb, CScheduler* scheduler, bool unitTests); -void InterruptLLMQSystem(); void DestroyLLMQSystem(); +// Manage scheduled tasks, threads, listeners etc. +void StartLLMQSystem(); +void StopLLMQSystem(); +void InterruptLLMQSystem(); } #endif //DASH_QUORUMS_INIT_H diff --git a/src/llmq/quorums_signing_shares.cpp b/src/llmq/quorums_signing_shares.cpp index b316f7440ab3b..ba605677d9993 100644 --- a/src/llmq/quorums_signing_shares.cpp +++ b/src/llmq/quorums_signing_shares.cpp @@ -176,16 +176,20 @@ CSigSharesInv CBatchedSigShares::ToInv() const CSigSharesManager::CSigSharesManager() { - StartWorkerThread(); + workInterrupt.reset(); } CSigSharesManager::~CSigSharesManager() { - StopWorkerThread(); } void CSigSharesManager::StartWorkerThread() { + // can't start new thread if we have one running already + if (workThread.joinable()) { + assert(false); + } + workThread = std::thread(&TraceThread >, "sigshares", std::function(std::bind(&CSigSharesManager::WorkThreadMain, this))); @@ -193,6 +197,11 @@ void CSigSharesManager::StartWorkerThread() void CSigSharesManager::StopWorkerThread() { + // make sure to call InterruptWorkerThread() first + if (!workInterrupt) { + assert(false); + } + if (workThread.joinable()) { workThread.join(); } @@ -1112,8 +1121,6 @@ void CSigSharesManager::BanNode(NodeId nodeId) void CSigSharesManager::WorkThreadMain() { - workInterrupt.reset(); - int64_t lastSendTime = 0; while (!workInterrupt) { diff --git a/src/llmq/quorums_signing_shares.h b/src/llmq/quorums_signing_shares.h index fb25f52f4b09f..d1e0812a7157d 100644 --- a/src/llmq/quorums_signing_shares.h +++ b/src/llmq/quorums_signing_shares.h @@ -192,6 +192,8 @@ class CSigSharesManager CSigSharesManager(); ~CSigSharesManager(); + void StartWorkerThread(); + void StopWorkerThread(); void InterruptWorkerThread(); public: @@ -201,9 +203,6 @@ class CSigSharesManager void Sign(const CQuorumCPtr& quorum, const uint256& id, const uint256& msgHash); private: - void StartWorkerThread(); - void StopWorkerThread(); - void ProcessMessageSigSharesInv(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageGetSigShares(CNode* pfrom, const CSigSharesInv& inv, CConnman& connman); void ProcessMessageBatchedSigShares(CNode* pfrom, const CBatchedSigShares& batchedSigShares, CConnman& connman);