Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: changed the calculating logic of epoll timeout provided by TimerTaskManager #2794

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/net/src/dispatch_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ int DispatchThread::StartThread() {
// Adding timer tasks and run timertaskThread
timer_task_thread_.AddTimerTask("blrpop_blocking_info_scan", 250, true,
[this] { this->ScanExpiredBlockedConnsOfBlrpop(); });
timer_task_thread_.set_thread_name("TimerTaskThread");
timer_task_thread_.set_thread_name("DispacherTimerTaskThread");
timer_task_thread_.StartThread();
return ServerThread::StartThread();
}
Expand Down
55 changes: 23 additions & 32 deletions src/net/src/net_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,39 +27,39 @@ int Setnonblocking(int sockfd) {
return flags;
}

uint32_t TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
TimerTaskID TimerTaskManager::AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec,
const std::function<void()>& task) {
TimedTask new_task = {last_task_id_++, task_name, interval_ms, repeat_exec, task};
id_to_task_[new_task.task_id] = new_task;

int64_t next_expired_time = NowInMs() + interval_ms;
exec_queue_.insert({next_expired_time, new_task.task_id});

if (min_interval_ms_ > interval_ms || min_interval_ms_ == -1) {
min_interval_ms_ = interval_ms;
}
// return the id of this task
return new_task.task_id;
}

int64_t TimerTaskManager::NowInMs() {
auto now = std::chrono::system_clock::now();
return std::chrono::time_point_cast<std::chrono::milliseconds>(now).time_since_epoch().count();
}
int TimerTaskManager::ExecTimerTask() {

int64_t TimerTaskManager::ExecTimerTask() {
std::vector<ExecTsWithId> fired_tasks_;
int64_t now_in_ms = NowInMs();
// traverse in ascending order
for (auto pair = exec_queue_.begin(); pair != exec_queue_.end(); pair++) {
if (pair->exec_ts <= now_in_ms) {
auto it = id_to_task_.find(pair->id);
// traverse in ascending order, and exec expired tasks
for (const auto& task : exec_queue_) {
if (task.exec_ts <= now_in_ms) {
auto it = id_to_task_.find(task.id);
assert(it != id_to_task_.end());
it->second.fun();
fired_tasks_.push_back({pair->exec_ts, pair->id});
fired_tasks_.push_back({task.exec_ts, task.id});
now_in_ms = NowInMs();
} else {
break;
}
}

for (auto task : fired_tasks_) {
exec_queue_.erase(task);
auto it = id_to_task_.find(task.id);
Expand All @@ -69,16 +69,21 @@ int TimerTaskManager::ExecTimerTask() {
exec_queue_.insert({now_in_ms + it->second.interval_ms, task.id});
} else {
// this task only need to be exec once, completely remove this task
int interval_del = it->second.interval_ms;
id_to_task_.erase(task.id);
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}
}
}
return min_interval_ms_;

if (exec_queue_.empty()) {
//to avoid wasting of cpu resources, epoll use 5000ms as timeout value when no task to exec
return 5000;
}
Comment on lines +76 to +79
Copy link

@coderabbitai coderabbitai bot Jul 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider reducing the default epoll timeout value.

The default timeout value of 5000ms might be too high and could lead to inefficiencies. Consider reducing it to a lower value.

-    return 5000;
+    return 1000; // or a suitable lower value
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (exec_queue_.empty()) {
//to avoid wasting of cpu resources, epoll use 5000ms as timeout value when no task to exec
return 5000;
}
if (exec_queue_.empty()) {
//to avoid wasting of cpu resources, epoll use 5000ms as timeout value when no task to exec
return 1000; // or a suitable lower value
}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, this 5000 will be returned only when the timer has no registered task inside. there's no need of reduce it to 1000 to use more cpu.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cheniujh, thank you for the clarification. Given that the 5000ms timeout is only used when there are no registered tasks, it makes sense to keep it as is to avoid unnecessary CPU usage. Your rationale is clear and well understood.


Learnings added
Learnt from: cheniujh
PR: OpenAtomFoundation/pika#2794
File: src/net/src/net_util.cc:76-79
Timestamp: 2024-07-10T10:05:58.987Z
Learning: When the TimerTaskManager in the Pika project has no registered tasks, it returns a timeout value of 5000ms to the epoll_wait function to avoid wasting CPU resources.

Note: Learnings are effective only in the context of similar code segments. To apply general coding guidelines, please configure review instructions. You can manage existing learnings in the app.


int64_t gap_between_now_and_next_task = exec_queue_.begin()->exec_ts - NowInMs();
gap_between_now_and_next_task = gap_between_now_and_next_task < 0 ? 0 : gap_between_now_and_next_task;
return gap_between_now_and_next_task;
}
bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {

bool TimerTaskManager::DelTimerTaskByTaskId(TimerTaskID task_id) {
// remove the task
auto task_to_del = id_to_task_.find(task_id);
if (task_to_del == id_to_task_.end()) {
Expand All @@ -87,11 +92,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
int interval_del = task_to_del->second.interval_ms;
id_to_task_.erase(task_to_del);

// renew the min_interval_ms_
if (interval_del == min_interval_ms_) {
RenewMinIntervalMs();
}

// remove from exec queue
ExecTsWithId target_key = {-1, 0};
for (auto pair : exec_queue_) {
Expand All @@ -106,15 +106,6 @@ bool TimerTaskManager::DelTimerTaskByTaskId(uint32_t task_id) {
return true;
}

void TimerTaskManager::RenewMinIntervalMs() {
min_interval_ms_ = -1;
for (auto pair : id_to_task_) {
if (pair.second.interval_ms < min_interval_ms_ || min_interval_ms_ == -1) {
min_interval_ms_ = pair.second.interval_ms;
}
}
}

TimerTaskThread::~TimerTaskThread() {
if (!timer_task_manager_.Empty()) {
LOG(INFO) << "TimerTaskThread exit !!!";
Expand All @@ -140,9 +131,9 @@ int TimerTaskThread::StopThread() {
}

void* TimerTaskThread::ThreadMain() {
int timeout;
int32_t timeout;
while (!should_stop()) {
timeout = timer_task_manager_.ExecTimerTask();
timeout = static_cast<int32_t>(timer_task_manager_.ExecTimerTask());
net_multiplexer_->NetPoll(timeout);
}
return nullptr;
Expand Down
40 changes: 16 additions & 24 deletions src/net/src/net_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
namespace net {

int Setnonblocking(int sockfd);

using TimerTaskID = int64_t;
struct TimedTask{
uint32_t task_id;
TimerTaskID task_id;
std::string task_name;
int interval_ms;
bool repeat_exec;
Expand All @@ -34,7 +34,7 @@ struct ExecTsWithId {
//the next exec time of the task, unit in ms
int64_t exec_ts;
//id of the task to be exec
uint32_t id;
TimerTaskID id;

bool operator<(const ExecTsWithId& other) const{
if(exec_ts == other.exec_ts){
Expand All @@ -47,36 +47,28 @@ struct ExecTsWithId {
}
};

/*
* For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started,
* but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect
* the timer_task_manager_ and also a pipe to wake up the maybe being endless-wait epoll(if all task consumed, epoll will sink into
* endless wait) to implement the feature.
*/
class TimerTaskManager {
public:
TimerTaskManager() = default;
~TimerTaskManager() = default;

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
//return the newest min_minterval_ms
int ExecTimerTask();
bool DelTimerTaskByTaskId(uint32_t task_id);
int GetMinIntervalMs() const { return min_interval_ms_; }
TimerTaskID AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task);
//return the time gap between now and next task-expired time, which can be used as the timeout value of epoll
int64_t ExecTimerTask();
bool DelTimerTaskByTaskId(TimerTaskID task_id);
int64_t NowInMs();
void RenewMinIntervalMs();
bool Empty(){ return 0 == last_task_id_; }

bool Empty() const { return exec_queue_.empty(); }
private:
//items stored in std::set are ascending ordered, we regard it as an auto sorted queue
std::set<ExecTsWithId> exec_queue_;
std::unordered_map<uint32_t, TimedTask> id_to_task_;
uint32_t last_task_id_{0};
int min_interval_ms_{-1};
std::unordered_map<TimerTaskID, TimedTask> id_to_task_;
TimerTaskID last_task_id_{0};
};



/*
* For simplicity, current version of TimerTaskThread has no lock inside and all task should be registered before TimerTaskThread started,
* but if you have the needs of dynamically add/remove timer task after TimerTaskThread started, you can simply add a mutex to protect the timer_task_manager_
*/
class TimerTaskThread : public Thread {
public:
TimerTaskThread(){
Expand All @@ -88,11 +80,11 @@ class TimerTaskThread : public Thread {
int StopThread() override;
void set_thread_name(const std::string& name) override { Thread::set_thread_name(name); }

uint32_t AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
TimerTaskID AddTimerTask(const std::string& task_name, int interval_ms, bool repeat_exec, const std::function<void()> &task){
return timer_task_manager_.AddTimerTask(task_name, interval_ms, repeat_exec, task);
};

bool DelTimerTaskByTaskId(uint32_t task_id){
bool DelTimerTaskByTaskId(TimerTaskID task_id){
return timer_task_manager_.DelTimerTaskByTaskId(task_id);
};

Expand Down
Loading