Skip to content

Commit

Permalink
fix: use RAII to release resources in async task (OpenAtomFoundation#…
Browse files Browse the repository at this point in the history
…1502)

* use raii to release resources in async task

* use static_cast instead of reinterpret_cast

---------

Co-authored-by: J1senn <[email protected]>
  • Loading branch information
A2ureStone and J1senn authored May 16, 2023
1 parent 5330209 commit d2e9cfd
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 82 deletions.
9 changes: 3 additions & 6 deletions src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,34 +182,31 @@ void PikaClientConn::ProcessRedisCmds(const std::vector<net::RedisCmdArgsType>&
}

void PikaClientConn::DoBackgroundTask(void* arg) {
BgTaskArg* bg_arg = reinterpret_cast<BgTaskArg*>(arg);
std::unique_ptr<BgTaskArg> bg_arg(static_cast<BgTaskArg*>(arg));
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
if (bg_arg->redis_cmds.size() == 0) {
delete bg_arg;
conn_ptr->NotifyEpoll(false);
return;
}
for (const auto& argv : bg_arg->redis_cmds) {
if (argv.size() == 0) {
delete bg_arg;
conn_ptr->NotifyEpoll(false);
return;
}
}

conn_ptr->BatchExecRedisCmd(bg_arg->redis_cmds);
delete bg_arg;
}

void PikaClientConn::DoExecTask(void* arg) {
BgTaskArg* bg_arg = reinterpret_cast<BgTaskArg*>(arg);
std::unique_ptr<BgTaskArg> bg_arg(static_cast<BgTaskArg*>(arg));
std::shared_ptr<Cmd> cmd_ptr = bg_arg->cmd_ptr;
std::shared_ptr<PikaClientConn> conn_ptr = bg_arg->conn_ptr;
std::shared_ptr<std::string> resp_ptr = bg_arg->resp_ptr;
LogOffset offset = bg_arg->offset;
std::string table_name = bg_arg->table_name;
uint32_t partition_id = bg_arg->partition_id;
delete bg_arg;
bg_arg.reset();

uint64_t start_us = 0;
if (g_pika_conf->slowlog_slower_than() >= 0) {
Expand Down
3 changes: 1 addition & 2 deletions src/pika_partition.cc
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ BgSaveInfo Partition::bgsave_info() {
}

void Partition::DoBgSave(void* arg) {
BgTaskArg* bg_task_arg = static_cast<BgTaskArg*>(arg);
std::unique_ptr<BgTaskArg> bg_task_arg(static_cast<BgTaskArg*>(arg));

// Do BgSave
bool success = bg_task_arg->partition->RunBgsaveEngine();
Expand All @@ -314,7 +314,6 @@ void Partition::DoBgSave(void* arg) {
}
bg_task_arg->partition->FinishBgsave();

delete bg_task_arg;
}

bool Partition::RunBgsaveEngine() {
Expand Down
27 changes: 7 additions & 20 deletions src/pika_repl_bgworker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "include/pika_conf.h"
#include "include/pika_rm.h"
#include "include/pika_server.h"
#include "pstd/include/pstd_defer.h"

extern PikaConf* g_pika_conf;
extern PikaServer* g_pika_server;
Expand Down Expand Up @@ -50,6 +51,11 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
PikaReplBgWorker* worker = task_arg->worker;
worker->ip_port_ = conn->ip_port();

DEFER {
delete index;
delete task_arg;
};

std::string table_name;
uint32_t partition_id = 0;
LogOffset pb_begin, pb_end;
Expand Down Expand Up @@ -97,17 +103,13 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id));
if (!partition) {
LOG(WARNING) << "Partition " << table_name << "_" << partition_id << " Not Found";
delete index;
delete task_arg;
return;
}

std::shared_ptr<SyncSlavePartition> slave_partition =
g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id));
if (!slave_partition) {
LOG(WARNING) << "Slave Partition " << table_name << "_" << partition_id << " Not Found";
delete index;
delete task_arg;
return;
}

Expand All @@ -120,8 +122,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
} else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/ {
LOG(WARNING) << "Drop outdated binlog sync response " << table_name << "_" << partition_id
<< " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm();
delete index;
delete task_arg;
return;
}
if (!only_keepalive) {
Expand All @@ -133,8 +133,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
LOG(WARNING) << "last_offset " << last_offset.ToString() << " NOT equal to pb prev_offset "
<< prev_offset.ToString();
slave_partition->SetReplState(ReplState::kTryConnect);
delete index;
delete task_arg;
return;
}
}
Expand All @@ -146,8 +144,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
// BinlogSync state, we drop remain write binlog task
if ((!(g_pika_server->role() & PIKA_ROLE_SLAVE)) ||
((slave_partition->State() != ReplState::kConnected) && (slave_partition->State() != ReplState::kWaitDBSync))) {
delete index;
delete task_arg;
return;
}

Expand All @@ -159,8 +155,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
LOG(WARNING) << "Check Session failed " << binlog_res.partition().table_name() << "_"
<< binlog_res.partition().partition_id();
slave_partition->SetReplState(ReplState::kTryConnect);
delete index;
delete task_arg;
return;
}

Expand All @@ -171,8 +165,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
if (!PikaBinlogTransverter::BinlogItemWithoutContentDecode(TypeFirst, binlog_res.binlog(), &worker->binlog_item_)) {
LOG(WARNING) << "Binlog item decode failed";
slave_partition->SetReplState(ReplState::kTryConnect);
delete index;
delete task_arg;
return;
}
const char* redis_parser_start = binlog_res.binlog().data() + BINLOG_ENCODE_LEN;
Expand All @@ -183,13 +175,9 @@ void PikaReplBgWorker::HandleBGWorkerWriteBinlog(void* arg) {
if (ret != net::kRedisParserDone) {
LOG(WARNING) << "Redis parser failed";
slave_partition->SetReplState(ReplState::kTryConnect);
delete index;
delete task_arg;
return;
}
}
delete index;
delete task_arg;

if (res->has_consensus_meta()) {
LogOffset leader_commit;
Expand Down Expand Up @@ -255,7 +243,7 @@ int PikaReplBgWorker::HandleWriteBinlog(net::RedisParser* parser, const net::Red
}

void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
ReplClientWriteDBTaskArg* task_arg = static_cast<ReplClientWriteDBTaskArg*>(arg);
std::unique_ptr<ReplClientWriteDBTaskArg> task_arg(static_cast<ReplClientWriteDBTaskArg*>(arg));
const std::shared_ptr<Cmd> c_ptr = task_arg->cmd_ptr;
const PikaCmdArgsType& argv = c_ptr->argv();
LogOffset offset = task_arg->offset;
Expand Down Expand Up @@ -289,7 +277,6 @@ void PikaReplBgWorker::HandleBGWorkerWriteDB(void* arg) {
}
}

delete task_arg;

if (g_pika_conf->consensus_level() != 0) {
std::shared_ptr<SyncMasterPartition> partition =
Expand Down
22 changes: 4 additions & 18 deletions src/pika_repl_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ int PikaReplClientConn::DealMessage() {
}

void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
ReplClientTaskArg* task_arg = static_cast<ReplClientTaskArg*>(arg);
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

Expand All @@ -104,7 +104,6 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
LOG(WARNING) << "Meta Sync Failed: " << reply;
g_pika_server->SyncError();
conn->NotifyClose();
delete task_arg;
return;
}

Expand All @@ -123,19 +122,17 @@ void PikaReplClientConn::HandleMetaSyncResponse(void* arg) {
<< "), failed to establish master-slave relationship";
g_pika_server->SyncError();
conn->NotifyClose();
delete task_arg;
return;
}

g_pika_conf->SetWriteBinlog("yes");
g_pika_server->PreparePartitionTrySync();
g_pika_server->FinishMetaSync();
LOG(INFO) << "Finish to handle meta sync response";
delete task_arg;
}

void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
ReplClientTaskArg* task_arg = static_cast<ReplClientTaskArg*>(arg);
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

Expand All @@ -149,15 +146,13 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id));
if (!slave_partition) {
LOG(WARNING) << "Slave Partition: " << table_name << ":" << partition_id << " Not Found";
delete task_arg;
return;
}

if (response->code() != InnerMessage::kOk) {
slave_partition->SetReplState(ReplState::kError);
std::string reply = response->has_reply() ? response->reply() : "";
LOG(WARNING) << "DBSync Failed: " << reply;
delete task_arg;
return;
}

Expand All @@ -166,18 +161,16 @@ void PikaReplClientConn::HandleDBSyncResponse(void* arg) {
std::string partition_name = slave_partition->PartitionName();
slave_partition->SetReplState(ReplState::kWaitDBSync);
LOG(INFO) << "Partition: " << partition_name << " Need Wait To Sync";
delete task_arg;
}

void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
ReplClientTaskArg* task_arg = static_cast<ReplClientTaskArg*>(arg);
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;

if (response->code() != InnerMessage::kOk) {
std::string reply = response->has_reply() ? response->reply() : "";
LOG(WARNING) << "TrySync Failed: " << reply;
delete task_arg;
return;
}

Expand All @@ -189,15 +182,13 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
g_pika_rm->GetSyncMasterPartitionByName(PartitionInfo(table_name, partition_id));
if (!partition) {
LOG(WARNING) << "Partition: " << table_name << ":" << partition_id << " Not Found";
delete task_arg;
return;
}

std::shared_ptr<SyncSlavePartition> slave_partition =
g_pika_rm->GetSyncSlavePartitionByName(PartitionInfo(table_name, partition_id));
if (!slave_partition) {
LOG(WARNING) << "Slave Partition: " << table_name << ":" << partition_id << " Not Found";
delete task_arg;
return;
}

Expand All @@ -211,7 +202,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
} else if (meta.term() < partition->ConsensusTerm()) /*outdated pb*/ {
LOG(WARNING) << "Drop outdated trysync response " << table_name << ":" << partition_id
<< " recv term: " << meta.term() << " local term: " << partition->ConsensusTerm();
delete task_arg;
return;
}

Expand All @@ -221,7 +211,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
slave_partition->SetReplState(ReplState::kError);
LOG(WARNING) << "Consensus Check failed " << s.ToString();
}
delete task_arg;
return;
}

Expand Down Expand Up @@ -251,7 +240,6 @@ void PikaReplClientConn::HandleTrySyncResponse(void* arg) {
slave_partition->SetReplState(ReplState::kError);
LOG(WARNING) << "Partition: " << partition_name << " TrySync Error";
}
delete task_arg;
}

Status PikaReplClientConn::TrySyncConsensusCheck(const InnerMessage::ConsensusMeta& consensus_meta,
Expand Down Expand Up @@ -308,14 +296,12 @@ void PikaReplClientConn::DispatchBinlogRes(const std::shared_ptr<InnerMessage::I
}

void PikaReplClientConn::HandleRemoveSlaveNodeResponse(void* arg) {
ReplClientTaskArg* task_arg = static_cast<ReplClientTaskArg*>(arg);
std::unique_ptr<ReplClientTaskArg> task_arg(static_cast<ReplClientTaskArg*>(arg));
std::shared_ptr<net::PbConn> conn = task_arg->conn;
std::shared_ptr<InnerMessage::InnerResponse> response = task_arg->res;
if (response->code() != InnerMessage::kOk) {
std::string reply = response->has_reply() ? response->reply() : "";
LOG(WARNING) << "Remove slave node Failed: " << reply;
delete task_arg;
return;
}
delete task_arg;
}
Loading

0 comments on commit d2e9cfd

Please sign in to comment.