summaryrefslogtreecommitdiff
path: root/src/node_platform.cc
diff options
context:
space:
mode:
authorAlexey Kozyatinskiy <kozyatinskiy@chromium.org>2018-08-17 18:47:05 -0700
committerAlexey Kozyatinskiy <kozyatinskiy@chromium.org>2018-08-20 18:10:00 -0700
commitb1e26128f317a6f5a5808a0a727e98f80f088b84 (patch)
treeb75efbdab9c37fd573e75d325b848ea84b1ee819 /src/node_platform.cc
parentf1d3f97c3bc813528e85b9c3e6506fc75b931d92 (diff)
downloadandroid-node-v8-b1e26128f317a6f5a5808a0a727e98f80f088b84.tar.gz
android-node-v8-b1e26128f317a6f5a5808a0a727e98f80f088b84.tar.bz2
android-node-v8-b1e26128f317a6f5a5808a0a727e98f80f088b84.zip
src: implement v8::Platform::CallDelayedOnWorkerThread
This method is crucial for Runtime.evaluate protocol command with timeout flag. At least Chrome DevTools frontend uses this method for every execution in console. PR-URL: https://github.com/nodejs/node/pull/22383 Fixes: https://github.com/nodejs/node/issues/22157 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'src/node_platform.cc')
-rw-r--r--src/node_platform.cc124
1 files changed, 123 insertions, 1 deletions
diff --git a/src/node_platform.cc b/src/node_platform.cc
index 6a3ae2e5dc..92e9b371c5 100644
--- a/src/node_platform.cc
+++ b/src/node_platform.cc
@@ -2,6 +2,7 @@
#include "node_internals.h"
#include "env-inl.h"
+#include "debug_utils.h"
#include "util.h"
#include <algorithm>
@@ -29,7 +30,127 @@ static void PlatformWorkerThread(void* data) {
} // namespace
+class WorkerThreadsTaskRunner::DelayedTaskScheduler {
+ public:
+ explicit DelayedTaskScheduler(TaskQueue<Task>* tasks)
+ : pending_worker_tasks_(tasks) {}
+
+ std::unique_ptr<uv_thread_t> Start() {
+ auto start_thread = [](void* data) {
+ static_cast<DelayedTaskScheduler*>(data)->Run();
+ };
+ std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
+ uv_sem_init(&ready_, 0);
+ CHECK_EQ(0, uv_thread_create(t.get(), start_thread, this));
+ uv_sem_wait(&ready_);
+ uv_sem_destroy(&ready_);
+ return t;
+ }
+
+ void PostDelayedTask(std::unique_ptr<Task> task, double delay_in_seconds) {
+ tasks_.Push(std::unique_ptr<Task>(new ScheduleTask(this, std::move(task),
+ delay_in_seconds)));
+ uv_async_send(&flush_tasks_);
+ }
+
+ void Stop() {
+ tasks_.Push(std::unique_ptr<Task>(new StopTask(this)));
+ uv_async_send(&flush_tasks_);
+ }
+
+ private:
+ void Run() {
+ TRACE_EVENT_METADATA1("__metadata", "thread_name", "name",
+ "WorkerThreadsTaskRunner::DelayedTaskScheduler");
+ loop_.data = this;
+ CHECK_EQ(0, uv_loop_init(&loop_));
+ flush_tasks_.data = this;
+ CHECK_EQ(0, uv_async_init(&loop_, &flush_tasks_, FlushTasks));
+ uv_sem_post(&ready_);
+
+ uv_run(&loop_, UV_RUN_DEFAULT);
+ CheckedUvLoopClose(&loop_);
+ }
+
+ static void FlushTasks(uv_async_t* flush_tasks) {
+ DelayedTaskScheduler* scheduler =
+ ContainerOf(&DelayedTaskScheduler::loop_, flush_tasks->loop);
+ while (std::unique_ptr<Task> task = scheduler->tasks_.Pop())
+ task->Run();
+ }
+
+ class StopTask : public Task {
+ public:
+ explicit StopTask(DelayedTaskScheduler* scheduler): scheduler_(scheduler) {}
+
+ void Run() override {
+ std::vector<uv_timer_t*> timers;
+ for (uv_timer_t* timer : scheduler_->timers_)
+ timers.push_back(timer);
+ for (uv_timer_t* timer : timers)
+ scheduler_->TakeTimerTask(timer);
+ uv_close(reinterpret_cast<uv_handle_t*>(&scheduler_->flush_tasks_),
+ [](uv_handle_t* handle) {});
+ }
+
+ private:
+ DelayedTaskScheduler* scheduler_;
+ };
+
+ class ScheduleTask : public Task {
+ public:
+ ScheduleTask(DelayedTaskScheduler* scheduler,
+ std::unique_ptr<Task> task,
+ double delay_in_seconds)
+ : scheduler_(scheduler),
+ task_(std::move(task)),
+ delay_in_seconds_(delay_in_seconds) {}
+
+ void Run() override {
+ uint64_t delay_millis =
+ static_cast<uint64_t>(delay_in_seconds_ + 0.5) * 1000;
+ std::unique_ptr<uv_timer_t> timer(new uv_timer_t());
+ CHECK_EQ(0, uv_timer_init(&scheduler_->loop_, timer.get()));
+ timer->data = task_.release();
+ CHECK_EQ(0, uv_timer_start(timer.get(), RunTask, delay_millis, 0));
+ scheduler_->timers_.insert(timer.release());
+ }
+
+ private:
+ DelayedTaskScheduler* scheduler_;
+ std::unique_ptr<Task> task_;
+ double delay_in_seconds_;
+ };
+
+ static void RunTask(uv_timer_t* timer) {
+ DelayedTaskScheduler* scheduler =
+ ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
+ scheduler->pending_worker_tasks_->Push(scheduler->TakeTimerTask(timer));
+ }
+
+ std::unique_ptr<Task> TakeTimerTask(uv_timer_t* timer) {
+ std::unique_ptr<Task> task(static_cast<Task*>(timer->data));
+ uv_timer_stop(timer);
+ uv_close(reinterpret_cast<uv_handle_t*>(timer), [](uv_handle_t* handle) {
+ delete reinterpret_cast<uv_timer_t*>(handle);
+ });
+ timers_.erase(timer);
+ return task;
+ }
+
+ uv_sem_t ready_;
+ TaskQueue<v8::Task>* pending_worker_tasks_;
+
+ TaskQueue<v8::Task> tasks_;
+ uv_loop_t loop_;
+ uv_async_t flush_tasks_;
+ std::unordered_set<uv_timer_t*> timers_;
+};
+
WorkerThreadsTaskRunner::WorkerThreadsTaskRunner(int thread_pool_size) {
+ delayed_task_scheduler_.reset(
+ new DelayedTaskScheduler(&pending_worker_tasks_));
+ threads_.push_back(delayed_task_scheduler_->Start());
for (int i = 0; i < thread_pool_size; i++) {
std::unique_ptr<uv_thread_t> t { new uv_thread_t() };
if (uv_thread_create(t.get(), PlatformWorkerThread,
@@ -46,7 +167,7 @@ void WorkerThreadsTaskRunner::PostTask(std::unique_ptr<Task> task) {
void WorkerThreadsTaskRunner::PostDelayedTask(std::unique_ptr<v8::Task> task,
double delay_in_seconds) {
- UNREACHABLE();
+ delayed_task_scheduler_->PostDelayedTask(std::move(task), delay_in_seconds);
}
void WorkerThreadsTaskRunner::BlockingDrain() {
@@ -55,6 +176,7 @@ void WorkerThreadsTaskRunner::BlockingDrain() {
void WorkerThreadsTaskRunner::Shutdown() {
pending_worker_tasks_.Stop();
+ delayed_task_scheduler_->Stop();
for (size_t i = 0; i < threads_.size(); i++) {
CHECK_EQ(0, uv_thread_join(threads_[i].get()));
}