From 0f3c2c64d2fe73394e5a40c79f626d9d09c7cd5d Mon Sep 17 00:00:00 2001 From: Gabriel Charette Date: Wed, 9 May 2018 12:20:04 -0400 Subject: [PATCH] src: use modern v8::Platform worker threads APIs Precursor to removing deprecated APIs on the v8 side @ https://chromium-review.googlesource.com/c/v8/v8/+/1045310 PR-URL: https://github.com/nodejs/node/pull/21079 Reviewed-By: James M Snell Reviewed-By: Matteo Collina Reviewed-By: Colin Ihrig Reviewed-By: Yang Guo --- src/node.cc | 2 +- src/node.h | 2 +- src/node_platform.cc | 76 ++++++++++++++++++++++---------------------- src/node_platform.h | 30 ++++++++--------- src/node_worker.cc | 4 +-- 5 files changed, 56 insertions(+), 58 deletions(-) diff --git a/src/node.cc b/src/node.cc index 005b17c2761..1f4d39a56b0 100644 --- a/src/node.cc +++ b/src/node.cc @@ -404,7 +404,7 @@ static struct { } void DrainVMTasks(Isolate* isolate) { - platform_->DrainBackgroundTasks(isolate); + platform_->DrainTasks(isolate); } void CancelVMTasks(Isolate* isolate) { diff --git a/src/node.h b/src/node.h index 305d605ef4a..a3f9c92fe0d 100644 --- a/src/node.h +++ b/src/node.h @@ -229,7 +229,7 @@ class MultiIsolatePlatform : public v8::Platform { // posted during flushing of the queue are postponed until the next // flushing. virtual bool FlushForegroundTasks(v8::Isolate* isolate) = 0; - virtual void DrainBackgroundTasks(v8::Isolate* isolate) = 0; + virtual void DrainTasks(v8::Isolate* isolate) = 0; virtual void CancelPendingDelayedTasks(v8::Isolate* isolate) = 0; // These will be called by the `IsolateData` creation/destruction functions. diff --git a/src/node_platform.cc b/src/node_platform.cc index 947622a219c..ce9117ae38e 100644 --- a/src/node_platform.cc +++ b/src/node_platform.cc @@ -15,50 +15,52 @@ using v8::Platform; using v8::Task; using v8::TracingController; -static void BackgroundRunner(void* data) { +namespace { + +static void WorkerThreadMain(void* data) { TRACE_EVENT_METADATA1("__metadata", "thread_name", "name", "BackgroundTaskRunner"); - TaskQueue *background_tasks = static_cast *>(data); - while (std::unique_ptr task = background_tasks->BlockingPop()) { + TaskQueue* pending_worker_tasks = static_cast*>(data); + while (std::unique_ptr task = pending_worker_tasks->BlockingPop()) { task->Run(); - background_tasks->NotifyOfCompletion(); + pending_worker_tasks->NotifyOfCompletion(); } } -BackgroundTaskRunner::BackgroundTaskRunner(int thread_pool_size) { +} // namespace + +WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) { for (int i = 0; i < thread_pool_size; i++) { std::unique_ptr t { new uv_thread_t() }; - if (uv_thread_create(t.get(), BackgroundRunner, &background_tasks_) != 0) + if (uv_thread_create(t.get(), WorkerThreadMain, + &pending_worker_tasks_) != 0) { break; + } threads_.push_back(std::move(t)); } } -void BackgroundTaskRunner::PostTask(std::unique_ptr task) { - background_tasks_.Push(std::move(task)); -} - -void BackgroundTaskRunner::PostIdleTask(std::unique_ptr task) { - UNREACHABLE(); +void WorkerThreadsTaskRunner::PostTask(std::unique_ptr task) { + pending_worker_tasks_.Push(std::move(task)); } -void BackgroundTaskRunner::PostDelayedTask(std::unique_ptr task, - double delay_in_seconds) { +void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr task, + double delay_in_seconds) { UNREACHABLE(); } -void BackgroundTaskRunner::BlockingDrain() { - background_tasks_.BlockingDrain(); +void WorkerThreadsTaskRunner::BlockingDrain() { + pending_worker_tasks_.BlockingDrain(); } -void BackgroundTaskRunner::Shutdown() { - background_tasks_.Stop(); +void WorkerThreadsTaskRunner::Shutdown() { + pending_worker_tasks_.Stop(); for (size_t i = 0; i < threads_.size(); i++) { CHECK_EQ(0, uv_thread_join(threads_[i].get())); } } -size_t BackgroundTaskRunner::NumberOfAvailableBackgroundThreads() const { +int WorkerThreadsTaskRunner::NumberOfWorkerThreads() const { return threads_.size(); } @@ -131,8 +133,8 @@ NodePlatform::NodePlatform(int thread_pool_size, TracingController* controller = new TracingController(); tracing_controller_.reset(controller); } - background_task_runner_ = - std::make_shared(thread_pool_size); + worker_thread_task_runner_ = + std::make_shared(thread_pool_size); } void NodePlatform::RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) { @@ -160,7 +162,7 @@ void NodePlatform::UnregisterIsolate(IsolateData* isolate_data) { } void NodePlatform::Shutdown() { - background_task_runner_->Shutdown(); + worker_thread_task_runner_->Shutdown(); { Mutex::ScopedLock lock(per_isolate_mutex_); @@ -168,8 +170,8 @@ void NodePlatform::Shutdown() { } } -size_t NodePlatform::NumberOfAvailableBackgroundThreads() { - return background_task_runner_->NumberOfAvailableBackgroundThreads(); +int NodePlatform::NumberOfWorkerThreads() { + return worker_thread_task_runner_->NumberOfWorkerThreads(); } void PerIsolatePlatformData::RunForegroundTask(std::unique_ptr task) { @@ -201,15 +203,12 @@ void PerIsolatePlatformData::CancelPendingDelayedTasks() { scheduled_delayed_tasks_.clear(); } -void NodePlatform::DrainBackgroundTasks(Isolate* isolate) { +void NodePlatform::DrainTasks(Isolate* isolate) { std::shared_ptr per_isolate = ForIsolate(isolate); do { - // Right now, there is no way to drain only background tasks associated - // with a specific isolate, so this sometimes does more work than - // necessary. In the long run, that functionality is probably going to - // be available anyway, though. - background_task_runner_->BlockingDrain(); + // Worker tasks aren't associated with an Isolate. + worker_thread_task_runner_->BlockingDrain(); } while (per_isolate->FlushForegroundTasksInternal()); } @@ -249,11 +248,17 @@ bool PerIsolatePlatformData::FlushForegroundTasksInternal() { return did_work; } -void NodePlatform::CallOnBackgroundThread(Task* task, - ExpectedRuntime expected_runtime) { - background_task_runner_->PostTask(std::unique_ptr(task)); +void NodePlatform::CallOnWorkerThread(std::unique_ptr task) { + worker_thread_task_runner_->PostTask(std::move(task)); } +void NodePlatform::CallDelayedOnWorkerThread(std::unique_ptr task, + double delay_in_seconds) { + worker_thread_task_runner_->PostDelayedTask(std::move(task), + delay_in_seconds); +} + + std::shared_ptr NodePlatform::ForIsolate(Isolate* isolate) { Mutex::ScopedLock lock(per_isolate_mutex_); @@ -283,11 +288,6 @@ void NodePlatform::CancelPendingDelayedTasks(v8::Isolate* isolate) { bool NodePlatform::IdleTasksEnabled(Isolate* isolate) { return false; } -std::shared_ptr -NodePlatform::GetBackgroundTaskRunner(Isolate* isolate) { - return background_task_runner_; -} - std::shared_ptr NodePlatform::GetForegroundTaskRunner(Isolate* isolate) { return ForIsolate(isolate); diff --git a/src/node_platform.h b/src/node_platform.h index 77826e05777..62301a302b2 100644 --- a/src/node_platform.h +++ b/src/node_platform.h @@ -93,23 +93,22 @@ class PerIsolatePlatformData : std::vector scheduled_delayed_tasks_; }; -// This acts as the single background task runner for all Isolates. -class BackgroundTaskRunner : public v8::TaskRunner { +// This acts as the single worker thread task runner for all Isolates. +class WorkerThreadsTaskRunner { public: - explicit BackgroundTaskRunner(int thread_pool_size); + explicit WorkerThreadsTaskRunner(int thread_pool_size); - void PostTask(std::unique_ptr task) override; - void PostIdleTask(std::unique_ptr task) override; + void PostTask(std::unique_ptr task); void PostDelayedTask(std::unique_ptr task, - double delay_in_seconds) override; - bool IdleTasksEnabled() override { return false; }; + double delay_in_seconds); void BlockingDrain(); void Shutdown(); - size_t NumberOfAvailableBackgroundThreads() const; + int NumberOfWorkerThreads() const; + private: - TaskQueue background_tasks_; + TaskQueue pending_worker_tasks_; std::vector> threads_; }; @@ -118,14 +117,15 @@ class NodePlatform : public MultiIsolatePlatform { NodePlatform(int thread_pool_size, v8::TracingController* tracing_controller); virtual ~NodePlatform() {} - void DrainBackgroundTasks(v8::Isolate* isolate) override; + void DrainTasks(v8::Isolate* isolate) override; void CancelPendingDelayedTasks(v8::Isolate* isolate) override; void Shutdown(); // v8::Platform implementation. - size_t NumberOfAvailableBackgroundThreads() override; - void CallOnBackgroundThread(v8::Task* task, - ExpectedRuntime expected_runtime) override; + int NumberOfWorkerThreads() override; + void CallOnWorkerThread(std::unique_ptr task) override; + void CallDelayedOnWorkerThread(std::unique_ptr task, + double delay_in_seconds) override; void CallOnForegroundThread(v8::Isolate* isolate, v8::Task* task) override; void CallDelayedOnForegroundThread(v8::Isolate* isolate, v8::Task* task, double delay_in_seconds) override; @@ -138,8 +138,6 @@ class NodePlatform : public MultiIsolatePlatform { void RegisterIsolate(IsolateData* isolate_data, uv_loop_t* loop) override; void UnregisterIsolate(IsolateData* isolate_data) override; - std::shared_ptr GetBackgroundTaskRunner( - v8::Isolate* isolate) override; std::shared_ptr GetForegroundTaskRunner( v8::Isolate* isolate) override; @@ -151,7 +149,7 @@ class NodePlatform : public MultiIsolatePlatform { std::shared_ptr> per_isolate_; std::unique_ptr tracing_controller_; - std::shared_ptr background_task_runner_; + std::shared_ptr worker_thread_task_runner_; }; } // namespace node diff --git a/src/node_worker.cc b/src/node_worker.cc index 3768d80a9c5..d3bffba51f3 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -173,7 +173,7 @@ void Worker::Run() { uv_run(&loop_, UV_RUN_DEFAULT); if (is_stopped()) break; - platform->DrainBackgroundTasks(isolate_); + platform->DrainTasks(isolate_); more = uv_loop_alive(&loop_); if (more && !is_stopped()) @@ -232,7 +232,7 @@ void Worker::Run() { // This call needs to be made while the `Environment` is still alive // because we assume that it is available for async tracking in the // NodePlatform implementation. - platform->DrainBackgroundTasks(isolate_); + platform->DrainTasks(isolate_); } env_.reset();