Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge upstream unstable and format codes #7

Merged
merged 6 commits into from
May 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions include/pika_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
# include <sys/statfs.h>
#endif
#include <memory>
#include <set>

#include "net/include/bg_thread.h"
#include "net/include/net_pubsub.h"
Expand All @@ -30,7 +31,6 @@
#include "include/pika_conf.h"
#include "include/pika_define.h"
#include "include/pika_dispatch_thread.h"
#include "include/pika_monitor_thread.h"
#include "include/pika_repl_client.h"
#include "include/pika_repl_server.h"
#include "include/pika_rsync_service.h"
Expand Down Expand Up @@ -266,9 +266,9 @@ class PikaServer : public pstd::noncopyable {
/*
* Monitor used
*/
bool HasMonitorClients();
bool HasMonitorClients() const;
void AddMonitorMessage(const std::string& monitor_message);
void AddMonitorClient(const std::shared_ptr<PikaClientConn> &client_ptr);
void AddMonitorClient(std::shared_ptr<PikaClientConn> client_ptr);

/*
* Slowlog used
Expand Down Expand Up @@ -410,7 +410,8 @@ class PikaServer : public pstd::noncopyable {
/*
* Monitor used
*/
std::unique_ptr<PikaMonitorThread> pika_monitor_thread_;
mutable pstd::Mutex monitor_mutex_protector_;
std::set<std::weak_ptr<PikaClientConn>, std::owner_less<std::weak_ptr<PikaClientConn>>> pika_monitor_clients_;

/*
* Rsync used
Expand Down
1 change: 0 additions & 1 deletion src/net/include/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ class ThreadPool : public pstd::noncopyable {
void cur_time_queue_size(size_t* qsize);
std::string thread_pool_name();


private:
void runInThread();

Expand Down
2 changes: 1 addition & 1 deletion src/net/src/backend_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ void* BackendThread::ThreadMain() {
}
}

if ((pfe->mask & kErrorEvent) || (should_close)) {
if ((pfe->mask & kErrorEvent) || should_close) {
{
LOG(INFO) << "close connection " << pfe->fd << " reason " << pfe->mask << " " << should_close;
net_multiplexer_->NetDelEvent(pfe->fd, 0);
Expand Down
8 changes: 4 additions & 4 deletions src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ DispatchThread::~DispatchThread() = default;
int DispatchThread::StartThread() {
for (int i = 0; i < work_num_; i++) {
int ret = handle_->CreateWorkerSpecificData(&(worker_thread_[i]->private_data_));
if (ret) {
if (ret) {
return ret;
}

if (!thread_name().empty()) {
worker_thread_[i]->set_thread_name("WorkerThread");
}
ret = worker_thread_[i]->StartThread();
if (ret) {
if (ret) {
return ret;
}
}
Expand All @@ -76,12 +76,12 @@ int DispatchThread::StopThread() {
}
for (int i = 0; i < work_num_; i++) {
int ret = worker_thread_[i]->StopThread();
if (ret) {
if (ret) {
return ret;
}
if (worker_thread_[i]->private_data_) {
ret = handle_->DeleteWorkerSpecificData(worker_thread_[i]->private_data_);
if (ret) {
if (ret) {
return ret;
}
worker_thread_[i]->private_data_ = nullptr;
Expand Down
20 changes: 10 additions & 10 deletions src/net/src/holy_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ std::shared_ptr<NetConn> HolyThread::get_conn(int fd) {

int HolyThread::StartThread() {
int ret = handle_->CreateWorkerSpecificData(&private_data_);
if (ret) {
if (ret) {
return ret;
}
return ServerThread::StartThread();
Expand All @@ -81,7 +81,7 @@ int HolyThread::StartThread() {
int HolyThread::StopThread() {
if (private_data_) {
int ret = handle_->DeleteWorkerSpecificData(private_data_);
if (ret) {
if (ret) {
return ret;
}
private_data_ = nullptr;
Expand Down Expand Up @@ -118,7 +118,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) {
}

if (async_) {
if ((pfe->mask & kReadable) != 0) {
if (pfe->mask & kReadable) {
ReadStatus read_status = in_conn->GetRequest();
struct timeval now;
gettimeofday(&now, nullptr);
Expand All @@ -132,7 +132,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) {
should_close = 1;
}
}
if (((pfe->mask & kWritable) != 0) && in_conn->is_reply()) {
if ((pfe->mask & kWritable) && in_conn->is_reply()) {
WriteStatus write_status = in_conn->SendReply();
if (write_status == kWriteAll) {
in_conn->set_is_reply(false);
Expand All @@ -144,7 +144,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) {
}
}
} else {
if ((pfe->mask & kReadable) != 0) {
if (pfe->mask & kReadable) {
ReadStatus getRes = in_conn->GetRequest();
struct timeval now;
gettimeofday(&now, nullptr);
Expand All @@ -158,7 +158,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) {
return;
}
}
if ((pfe->mask & kWritable) != 0) {
if (pfe->mask & kWritable) {
WriteStatus write_status = in_conn->SendReply();
if (write_status == kWriteAll) {
in_conn->set_is_reply(false);
Expand All @@ -170,7 +170,7 @@ void HolyThread::HandleConnEvent(NetFiredEvent* pfe) {
}
}
}
if (((pfe->mask & kErrorEvent) != 0) || (should_close != 0)) {
if ((pfe->mask & kErrorEvent) || should_close) {
net_multiplexer_->NetDelEvent(pfe->fd, 0);
CloseFd(in_conn);
in_conn = nullptr;
Expand All @@ -192,7 +192,7 @@ void HolyThread::DoCronTask() {

// Check whether close all connection
std::lock_guard kl(killer_mutex_);
if (deleting_conn_ipport_.count(kKillAllConnsTask) != 0U) {
if (deleting_conn_ipport_.count(kKillAllConnsTask)) {
for (auto& conn : conns_) {
to_close.push_back(conn.second);
}
Expand All @@ -208,7 +208,7 @@ void HolyThread::DoCronTask() {
while (iter != conns_.end()) {
std::shared_ptr<NetConn> conn = iter->second;
// Check connection should be closed
if (deleting_conn_ipport_.count(conn->ip_port()) != 0U) {
if (deleting_conn_ipport_.count(conn->ip_port())) {
to_close.push_back(conn);
deleting_conn_ipport_.erase(conn->ip_port());
iter = conns_.erase(iter);
Expand Down Expand Up @@ -277,7 +277,7 @@ bool HolyThread::KillConn(const std::string& ip_port) {
}

void HolyThread::ProcessNotifyEvents(const net::NetFiredEvent* pfe) {
if ((pfe->mask & kReadable) != 0) {
if (pfe->mask & kReadable) {
char bb[2048];
int32_t nread = read(net_multiplexer_->NotifyReceiveFd(), bb, 2048);
if (nread == 0) {
Expand Down
6 changes: 3 additions & 3 deletions src/net/src/http_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ bool HTTPRequest::ParseHeadLine(const char* data, int line_start, int line_end)
bool HTTPRequest::ParseGetUrl() {
path_ = url_;
// Format path
if ((headers_.count("host")) && path_.find(headers_["host"]) != std::string::npos &&
if (headers_.count("host") && path_.find(headers_["host"]) != std::string::npos &&
path_.size() > (7 + headers_["host"].size())) {
// http://www.xxx.xxx/path_/to
path_.assign(path_.substr(7 + headers_["host"].size()));
Expand Down Expand Up @@ -206,7 +206,7 @@ int HTTPRequest::ParseHeader() {
content_type_.assign(headers_.at("content-type"));
}

if ((headers_.count("expect")) &&
if (headers_.count("expect") &&
(headers_.at("expect") == "100-Continue" || headers_.at("expect") == "100-continue")) {
reply_100continue_ = true;
}
Expand Down Expand Up @@ -498,7 +498,7 @@ void HTTPResponse::SetHeaders(const std::string& key, const size_t value) { head

void HTTPResponse::SetContentLength(uint64_t size) {
remain_send_len_ = size;
if ((headers_.count("Content-Length")) || (headers_.count("content-length"))) {
if (headers_.count("Content-Length") || (headers_.count("content-length"))) {
return;
}
SetHeaders("Content-Length", size);
Expand Down
2 changes: 1 addition & 1 deletion src/pika.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ static void PikaGlogInit() {
}

static void daemonize() {
if (fork() != 0) {
if (fork()) {
exit(0); /* parent exits */
}
setsid(); /* create a new session */
Expand Down
14 changes: 5 additions & 9 deletions src/pika_admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -590,8 +590,7 @@ void ClientCmd::Do(std::shared_ptr<Partition> partition) {
} else {
res_.SetRes(CmdRes::kErrOther, "No such client");
}

}
}

void ShutdownCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down Expand Up @@ -1985,12 +1984,9 @@ void MonitorCmd::Do(std::shared_ptr<Partition> partition) {
LOG(WARNING) << name_ << " weak ptr is empty";
return;
}
std::shared_ptr<net::NetConn> conn =
std::dynamic_pointer_cast<PikaClientConn>(conn_repl)->server_thread()->MoveConnOut(conn_repl->fd());
assert(conn.get() == conn_repl.get());
g_pika_server->AddMonitorClient(std::dynamic_pointer_cast<PikaClientConn>(conn));
g_pika_server->AddMonitorMessage("OK");
// Monitor thread will return "OK"

g_pika_server->AddMonitorClient(std::dynamic_pointer_cast<PikaClientConn>(conn_repl));
res_.SetRes(CmdRes::kOk);
}

void DbsizeCmd::DoInitial() {
Expand Down Expand Up @@ -2177,7 +2173,7 @@ void SlowlogCmd::Do(std::shared_ptr<Partition> partition) {
}
}
}
}
}

void PaddingCmd::DoInitial() {
if (!CheckArg(argv_.size())) {
Expand Down
4 changes: 2 additions & 2 deletions src/pika_bit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ void BitSetCmd::DoInitial() {
res_.SetRes(CmdRes::kInvalidBitInt);
return;
}
}
}

void BitSetCmd::Do(std::shared_ptr<Partition> partition) {
std::string value;
Expand Down Expand Up @@ -151,7 +151,7 @@ void BitPosCmd::DoInitial() {
}
} else {
res_.SetRes(CmdRes::kSyntaxErr, kCmdNameBitPos);
}
}
}

void BitPosCmd::Do(std::shared_ptr<Partition> partition) {
Expand Down
2 changes: 1 addition & 1 deletion src/pika_client_conn.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ std::shared_ptr<Cmd> PikaClientConn::DoCmd(const PikaCmdArgsType& argv, const st
tmp_ptr->res().SetRes(CmdRes::kErrOther, "unknown command \"" + opt + "\"");
return tmp_ptr;
}
c_ptr->SetConn(std::dynamic_pointer_cast<PikaClientConn>(shared_from_this()));
c_ptr->SetConn(shared_from_this());
c_ptr->SetResp(resp_ptr);

// Check authed
Expand Down
2 changes: 1 addition & 1 deletion src/pika_conf.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ Status PikaConf::DelTableSanityCheck(const std::string& table_name) {

int PikaConf::Load() {
int ret = LoadConf();
if (ret) {
if (ret) {
return ret;
}

Expand Down
2 changes: 1 addition & 1 deletion src/pika_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ void GeoAddCmd::DoInitial() {
point.latitude = latitude;
pos_.push_back(point);
}
}
}

void GeoAddCmd::Do(std::shared_ptr<Partition> partition) {
std::vector<storage::ScoreMember> score_members;
Expand Down
Loading