From 0e3c90a997caea19f6f813d00079f7bf683d1e0d Mon Sep 17 00:00:00 2001 From: cheniujh <41671101+cheniujh@users.noreply.github.com> Date: Fri, 24 May 2024 19:30:17 +0800 Subject: [PATCH] fix the problem that BinlogAckEnd smaller than BinlogAckStart(due to the Master clean un-relevant WriteQueue when one DB timeout) (#2666) Co-authored-by: cjh <1271435567@qq.com> --- include/pika_rm.h | 1 + src/pika_rm.cc | 12 ++++++++++-- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/include/pika_rm.h b/include/pika_rm.h index b3e310ddf4..d035b400f3 100644 --- a/include/pika_rm.h +++ b/include/pika_rm.h @@ -172,6 +172,7 @@ class PikaReplicaManager { // write_queue related void ProduceWriteQueue(const std::string& ip, int port, std::string db_name, const std::vector& tasks); + void DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name); void DropItemInWriteQueue(const std::string& ip, int port); int ConsumeWriteQueue(); diff --git a/src/pika_rm.cc b/src/pika_rm.cc index a996463bc2..99cbd548dd 100644 --- a/src/pika_rm.cc +++ b/src/pika_rm.cc @@ -99,7 +99,7 @@ Status SyncMasterDB::ActivateSlaveBinlogSync(const std::string& ip, int port, co } //Since we init a new reader, we should drop items in write queue and reset sync_window. //Or the sent_offset and acked_offset will not match - g_pika_rm->DropItemInWriteQueue(ip, port); + g_pika_rm->DropItemInOneWriteQueue(ip, port, slave_ptr->DBName()); slave_ptr->sync_win.Reset(); slave_ptr->b_state = kReadFromFile; } @@ -335,7 +335,7 @@ Status SyncMasterDB::CheckSyncTimeout(uint64_t now) { for (auto& node : to_del) { coordinator_.SyncPros().RemoveSlaveNode(node.Ip(), node.Port()); - g_pika_rm->DropItemInWriteQueue(node.Ip(), node.Port()); + g_pika_rm->DropItemInOneWriteQueue(node.Ip(), node.Port(), DBName()); LOG(WARNING) << SyncDBInfo().ToString() << " Master del Recv Timeout slave success " << node.ToString(); } return Status::OK(); @@ -645,6 +645,14 @@ int PikaReplicaManager::ConsumeWriteQueue() { return counter; } +void PikaReplicaManager::DropItemInOneWriteQueue(const std::string& ip, int port, const std::string& db_name) { + std::lock_guard l(write_queue_mu_); + std::string index = ip + ":" + std::to_string(port); + if (write_queues_.find(index) != write_queues_.end()) { + write_queues_[index].erase(db_name); + } +} + void PikaReplicaManager::DropItemInWriteQueue(const std::string& ip, int port) { std::lock_guard l(write_queue_mu_); std::string index = ip + ":" + std::to_string(port);