diff --git a/src/pika_client_conn.cc b/src/pika_client_conn.cc index ea26fb52df..89760e8bd8 100644 --- a/src/pika_client_conn.cc +++ b/src/pika_client_conn.cc @@ -182,34 +182,31 @@ void PikaClientConn::ProcessRedisCmds(const std::vector& } void PikaClientConn::DoBackgroundTask(void* arg) { - BgTaskArg* bg_arg = reinterpret_cast(arg); + std::unique_ptr bg_arg(static_cast(arg)); std::shared_ptr conn_ptr = bg_arg->conn_ptr; if (bg_arg->redis_cmds.size() == 0) { - delete bg_arg; conn_ptr->NotifyEpoll(false); return; } for (const auto& argv : bg_arg->redis_cmds) { if (argv.size() == 0) { - delete bg_arg; conn_ptr->NotifyEpoll(false); return; } } conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds); - delete bg_arg; } void PikaClientConn::DoExecTask(void* arg) { - BgTaskArg* bg_arg = reinterpret_cast(arg); + std::unique_ptr bg_arg(static_cast(arg)); std::shared_ptr cmd_ptr = bg_arg->cmd_ptr; std::shared_ptr conn_ptr = bg_arg->conn_ptr; std::shared_ptr resp_ptr = bg_arg->resp_ptr; LogOffset offset = bg_arg->offset; std::string table_name = bg_arg->table_name; uint32_t partition_id = bg_arg->partition_id; - delete bg_arg; + bg_arg.reset(); uint64_t start_us = 0; if (g_pika_conf->slowlog_slower_than() >= 0) { diff --git a/src/pika_partition.cc b/src/pika_partition.cc index 98e03b759c..a0f5f9f0fd 100644 --- a/src/pika_partition.cc +++ b/src/pika_partition.cc @@ -288,7 +288,7 @@ BgSaveInfo Partition::bgsave_info() { } void Partition::DoBgSave(void* arg) { - BgTaskArg* bg_task_arg = static_cast(arg); + std::unique_ptr bg_task_arg(static_cast(arg)); // Do BgSave bool success = bg_task_arg->partition->RunBgsaveEngine(); @@ -314,7 +314,6 @@ void Partition::DoBgSave(void* arg) { } bg_task_arg->partition->FinishBgsave(); - delete bg_task_arg; } bool Partition::RunBgsaveEngine() { diff --git a/src/pika_repl_bgworker.cc b/src/pika_repl_bgworker.cc index 32df757e75..ac44d8b39a 100644 --- a/src/pika_repl_bgworker.cc +++ b/src/pika_repl_bgworker.cc @@ -11,6 +11,7 @@ #include "include/pika_conf.h" #include "include/pika_rm.h" #include "include/pika_server.h" +#include "pstd/include/pstd_defer.h" extern PikaConf* g_pika_conf; extern PikaServer* g_pika_server; @@ -50,6 +51,11 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { PikaReplBgWorker* worker = task_arg->worker; worker->ip_port_ = conn->ip_port(); + DEFER { + delete index; + delete task_arg; + }; + std::string table_name; uint32_t partition_id = 0; LogOffset pb_begin, pb_end; @@ -97,8 +103,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id)); if (!partition) { LOG(WARNING) << "Partition " << table_name << "_" << partition_id << " Not Found"; - delete index; - delete task_arg; return; } @@ -106,8 +110,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id)); if (!slave_partition) { LOG(WARNING) << "Slave Partition " << table_name << "_" << partition_id << " Not Found"; - delete index; - delete task_arg; return; } @@ -120,8 +122,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { } else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/ { LOG(WARNING) << "Drop outdated binlog sync response " << table_name << "_" << partition_id << " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm(); - delete index; - delete task_arg; return; } if (!only_keepalive) { @@ -133,8 +133,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { LOG(WARNING) << "last_offset " << last_offset.ToString() << " NOT equal to pb prev_offset " << prev_offset.ToString(); slave_partition->SetReplState(ReplState::kTryConnect); - delete index; - delete task_arg; return; } } @@ -146,8 +144,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { // BinlogSync state, we drop remain write binlog task if ((!(g_pika_server->role() & PIKA_ROLE_SLAVE)) || ((slave_partition->State() != ReplState::kConnected) && (slave_partition->State() != ReplState::kWaitDBSync))) { - delete index; - delete task_arg; return; } @@ -159,8 +155,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { LOG(WARNING) << "Check Session failed " << binlog_res.partition().table_name() << "_" << binlog_res.partition().partition_id(); slave_partition->SetReplState(ReplState::kTryConnect); - delete index; - delete task_arg; return; } @@ -171,8 +165,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_res.binlog(), &worker->binlog_item_)) { LOG(WARNING) << "Binlog item decode failed"; slave_partition->SetReplState(ReplState::kTryConnect); - delete index; - delete task_arg; return; } const char* redis_parser_start = binlog_res.binlog().data() + BINLOG_ENCODE_LEN; @@ -183,13 +175,9 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) { if (ret != net::kRedisParserDone) { LOG(WARNING) << "Redis parser failed"; slave_partition->SetReplState(ReplState::kTryConnect); - delete index; - delete task_arg; return; } } - delete index; - delete task_arg; if (res->has_consensus_meta()) { LogOffset leader_commit; @@ -255,7 +243,7 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red } void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { - ReplClientWriteDBTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr c_ptr = task_arg->cmd_ptr; const PikaCmdArgsType& argv = c_ptr->argv(); LogOffset offset = task_arg->offset; @@ -289,7 +277,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) { } } - delete task_arg; if (g_pika_conf->consensus_level() != 0) { std::shared_ptr partition = diff --git a/src/pika_repl_client_conn.cc b/src/pika_repl_client_conn.cc index 2e7e5dfe75..a399f7a539 100644 --- a/src/pika_repl_client_conn.cc +++ b/src/pika_repl_client_conn.cc @@ -88,7 +88,7 @@ int PikaReplClientConn::DealMessage() { } void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { - ReplClientTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); std::shared_ptr conn = task_arg->conn; std::shared_ptr response = task_arg->res; @@ -104,7 +104,6 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { LOG(WARNING) << "Meta Sync Failed: " << reply; g_pika_server->SyncError(); conn->NotifyClose(); - delete task_arg; return; } @@ -123,7 +122,6 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { << "), failed to establish master-slave relationship"; g_pika_server->SyncError(); conn->NotifyClose(); - delete task_arg; return; } @@ -131,11 +129,10 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) { g_pika_server->PreparePartitionTrySync(); g_pika_server->FinishMetaSync(); LOG(INFO) << "Finish to handle meta sync response"; - delete task_arg; } void PikaReplClientConn::HandleDBSyncResponse(void* arg) { - ReplClientTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); std::shared_ptr conn = task_arg->conn; std::shared_ptr response = task_arg->res; @@ -149,7 +146,6 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) { g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id)); if (!slave_partition) { LOG(WARNING) << "Slave Partition: " << table_name << ":" << partition_id << " Not Found"; - delete task_arg; return; } @@ -157,7 +153,6 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) { slave_partition->SetReplState(ReplState::kError); std::string reply = response->has_reply() ? response->reply() : ""; LOG(WARNING) << "DBSync Failed: " << reply; - delete task_arg; return; } @@ -166,18 +161,16 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) { std::string partition_name = slave_partition->PartitionName(); slave_partition->SetReplState(ReplState::kWaitDBSync); LOG(INFO) << "Partition: " << partition_name << " Need Wait To Sync"; - delete task_arg; } void PikaReplClientConn::HandleTrySyncResponse(void* arg) { - ReplClientTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); std::shared_ptr conn = task_arg->conn; std::shared_ptr response = task_arg->res; if (response->code() != InnerMessage::kOk) { std::string reply = response->has_reply() ? response->reply() : ""; LOG(WARNING) << "TrySync Failed: " << reply; - delete task_arg; return; } @@ -189,7 +182,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id)); if (!partition) { LOG(WARNING) << "Partition: " << table_name << ":" << partition_id << " Not Found"; - delete task_arg; return; } @@ -197,7 +189,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id)); if (!slave_partition) { LOG(WARNING) << "Slave Partition: " << table_name << ":" << partition_id << " Not Found"; - delete task_arg; return; } @@ -211,7 +202,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { } else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/ { LOG(WARNING) << "Drop outdated trysync response " << table_name << ":" << partition_id << " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm(); - delete task_arg; return; } @@ -221,7 +211,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { slave_partition->SetReplState(ReplState::kError); LOG(WARNING) << "Consensus Check failed " << s.ToString(); } - delete task_arg; return; } @@ -251,7 +240,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) { slave_partition->SetReplState(ReplState::kError); LOG(WARNING) << "Partition: " << partition_name << " TrySync Error"; } - delete task_arg; } Status PikaReplClientConn::TrySyncConsensusCheck(const InnerMessage::ConsensusMeta& consensus_meta, @@ -308,14 +296,12 @@ void PikaReplClientConn::DispatchBinlogRes(const std::shared_ptr(arg); + std::unique_ptr task_arg(static_cast(arg)); std::shared_ptr conn = task_arg->conn; std::shared_ptr response = task_arg->res; if (response->code() != InnerMessage::kOk) { std::string reply = response->has_reply() ? response->reply() : ""; LOG(WARNING) << "Remove slave node Failed: " << reply; - delete task_arg; return; } - delete task_arg; } diff --git a/src/pika_repl_server_conn.cc b/src/pika_repl_server_conn.cc index 829fad70c2..0a49ad8c19 100644 --- a/src/pika_repl_server_conn.cc +++ b/src/pika_repl_server_conn.cc @@ -20,7 +20,7 @@ PikaReplServerConn::PikaReplServerConn(int fd, std::string ip_port, net::Thread* PikaReplServerConn::~PikaReplServerConn() {} void PikaReplServerConn::HandleMetaSyncRequest(void* arg) { - ReplServerTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; @@ -58,15 +58,13 @@ void PikaReplServerConn::HandleMetaSyncRequest(void* arg) { if (!response.SerializeToString(&reply_str) || conn->WriteResp(reply_str)) { LOG(WARNING) << "Process MetaSync request serialization failed"; conn->NotifyClose(); - delete task_arg; return; } conn->NotifyWrite(); - delete task_arg; } void PikaReplServerConn::HandleTrySyncRequest(void* arg) { - ReplServerTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; @@ -133,11 +131,9 @@ void PikaReplServerConn::HandleTrySyncRequest(void* arg) { if (!response.SerializeToString(&reply_str) || conn->WriteResp(reply_str)) { LOG(WARNING) << "Handle Try Sync Failed"; conn->NotifyClose(); - delete task_arg; return; } conn->NotifyWrite(); - delete task_arg; } bool PikaReplServerConn::TrySyncUpdateSlaveNode(const std::shared_ptr& partition, @@ -280,7 +276,7 @@ void PikaReplServerConn::BuildConsensusMeta(const bool& reject, const std::vecto } void PikaReplServerConn::HandleDBSyncRequest(void* arg) { - ReplServerTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; @@ -352,21 +348,18 @@ void PikaReplServerConn::HandleDBSyncRequest(void* arg) { if (!response.SerializeToString(&reply_str) || conn->WriteResp(reply_str)) { LOG(WARNING) << "Handle DBSync Failed"; conn->NotifyClose(); - delete task_arg; return; } conn->NotifyWrite(); - delete task_arg; } void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { - ReplServerTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; if (!req->has_binlog_sync()) { LOG(WARNING) << "Pb parse error"; // conn->NotifyClose(); - delete task_arg; return; } const InnerMessage::InnerRequest::BinlogSync& binlog_req = req->binlog_sync(); @@ -389,7 +382,6 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id)); if (!master_partition) { LOG(WARNING) << "Sync Master Partition: " << table_name << ":" << partition_id << ", NotFound"; - delete task_arg; return; } @@ -402,7 +394,6 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { } else if (meta.term() < master_partition->ConsensusTerm()) /*outdated pb*/ { LOG(WARNING) << "Drop outdated binlog sync req " << table_name << ":" << partition_id << " recv term: " << meta.term() << " local term: " << master_partition->ConsensusTerm(); - delete task_arg; return; } } @@ -411,7 +402,6 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { LOG(WARNING) << "Check Session failed " << node.ip() << ":" << node.port() << ", " << table_name << "_" << partition_id; // conn->NotifyClose(); - delete task_arg; return; } @@ -423,7 +413,6 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { LOG(WARNING) << "SetMasterLastRecvTime failed " << node.ip() << ":" << node.port() << ", " << table_name << "_" << partition_id << " " << s.ToString(); conn->NotifyClose(); - delete task_arg; return; } @@ -431,7 +420,6 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { if (range_start.b_offset != range_end.b_offset) { LOG(WARNING) << "first binlogsync request pb argument invalid"; conn->NotifyClose(); - delete task_arg; return; } @@ -439,40 +427,34 @@ void PikaReplServerConn::HandleBinlogSyncRequest(void* arg) { if (!s.ok()) { LOG(WARNING) << "Activate Binlog Sync failed " << slave_node.ToString() << " " << s.ToString(); conn->NotifyClose(); - delete task_arg; return; } - delete task_arg; return; } // not the first_send the range_ack cant be 0 // set this case as ping if (range_start.b_offset == BinlogOffset() && range_end.b_offset == BinlogOffset()) { - delete task_arg; return; } s = g_pika_rm->UpdateSyncBinlogStatus(slave_node, range_start, range_end); if (!s.ok()) { LOG(WARNING) << "Update binlog ack failed " << table_name << " " << partition_id << " " << s.ToString(); conn->NotifyClose(); - delete task_arg; return; } - delete task_arg; g_pika_server->SignalAuxiliary(); return; } void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) { - ReplServerTaskArg* task_arg = static_cast(arg); + std::unique_ptr task_arg(static_cast(arg)); const std::shared_ptr req = task_arg->req; std::shared_ptr conn = task_arg->conn; if (!req->remove_slave_node_size()) { LOG(WARNING) << "Pb parse error"; conn->NotifyClose(); - delete task_arg; return; } const InnerMessage::InnerRequest::RemoveSlaveNode& remove_slave_node_req = req->remove_slave_node(0); @@ -503,11 +485,9 @@ void PikaReplServerConn::HandleRemoveSlaveNodeRequest(void* arg) { if (!response.SerializeToString(&reply_str) || conn->WriteResp(reply_str)) { LOG(WARNING) << "Remove Slave Node Failed"; conn->NotifyClose(); - delete task_arg; return; } conn->NotifyWrite(); - delete task_arg; } int PikaReplServerConn::DealMessage() { diff --git a/src/pika_server.cc b/src/pika_server.cc index fba8311c00..58a01486b1 100644 --- a/src/pika_server.cc +++ b/src/pika_server.cc @@ -30,18 +30,16 @@ extern PikaReplicaManager* g_pika_rm; extern PikaCmdTableManager* g_pika_cmd_table_manager; void DoPurgeDir(void* arg) { - std::string path = *(static_cast(arg)); - LOG(INFO) << "Delete dir: " << path << " start"; - pstd::DeleteDir(path); - LOG(INFO) << "Delete dir: " << path << " done"; - delete static_cast(arg); + std::unique_ptr path(static_cast(arg)); + LOG(INFO) << "Delete dir: " << *path << " start"; + pstd::DeleteDir(*path); + LOG(INFO) << "Delete dir: " << *path << " done"; } void DoDBSync(void* arg) { - DBSyncArg* dbsa = reinterpret_cast(arg); + std::unique_ptr dbsa(static_cast(arg)); PikaServer* const ps = dbsa->p; ps->DbSyncSendFile(dbsa->ip, dbsa->port, dbsa->table_name, dbsa->partition_id); - delete dbsa; } PikaServer::PikaServer() diff --git a/src/pika_stable_log.cc b/src/pika_stable_log.cc index d3d30bad97..5219b3e286 100644 --- a/src/pika_stable_log.cc +++ b/src/pika_stable_log.cc @@ -71,10 +71,9 @@ bool StableLog::PurgeStableLogs(uint32_t to, bool manual) { void StableLog::ClearPurge() { purging_ = false; } void StableLog::DoPurgeStableLogs(void* arg) { - PurgeStableLogArg* purge_arg = static_cast(arg); + std::unique_ptr purge_arg(static_cast(arg)); purge_arg->logger->PurgeFiles(purge_arg->to, purge_arg->manual); purge_arg->logger->ClearPurge(); - delete static_cast(arg); } bool StableLog::PurgeFiles(uint32_t to, bool manual) { diff --git a/src/pika_table.cc b/src/pika_table.cc index b660350ae1..90d66e3072 100644 --- a/src/pika_table.cc +++ b/src/pika_table.cc @@ -210,9 +210,8 @@ void Table::Compact(const storage::DataType& type) { } void Table::DoKeyScan(void* arg) { - BgTaskArg* bg_task_arg = reinterpret_cast(arg); + std::unique_ptr bg_task_arg(static_cast(arg)); bg_task_arg->table->RunKeyScan(); - delete bg_task_arg; } void Table::InitKeyScan() {