diff options
Diffstat (limited to 'src/node_worker.cc')
-rw-r--r-- | src/node_worker.cc | 137 |
1 files changed, 75 insertions, 62 deletions
diff --git a/src/node_worker.cc b/src/node_worker.cc index 2c8222f7f5..0f1535074c 100644 --- a/src/node_worker.cc +++ b/src/node_worker.cc @@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) { } // anonymous namespace +void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) { + Mutex::ScopedLock lock(mutex_); + env_ = env; + async_ = new uv_async_t; + if (data != nullptr) async_->data = data; + CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0); +} + +void AsyncRequest::Uninstall() { + Mutex::ScopedLock lock(mutex_); + if (async_ != nullptr) + env_->CloseHandle(async_, [](uv_async_t* async) { delete async; }); +} + +void AsyncRequest::Stop() { + Mutex::ScopedLock lock(mutex_); + stop_ = true; + if (async_ != nullptr) uv_async_send(async_); +} + +void AsyncRequest::SetStopped(bool flag) { + Mutex::ScopedLock lock(mutex_); + stop_ = flag; +} + +bool AsyncRequest::IsStopped() const { + Mutex::ScopedLock lock(mutex_); + return stop_; +} + +uv_async_t* AsyncRequest::GetHandle() { + Mutex::ScopedLock lock(mutex_); + return async_; +} + +void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const { + Mutex::ScopedLock lock(mutex_); + if (async_ != nullptr) tracker->TrackField("async_request", *async_); +} + Worker::Worker(Environment* env, Local<Object> wrap, const std::string& url, @@ -98,8 +138,7 @@ Worker::Worker(Environment* env, } bool Worker::is_stopped() const { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - return stopped_; + return thread_stopper_.IsStopped(); } // This class contains data that is only relevant to the child thread itself, @@ -207,6 +246,8 @@ void Worker::Run() { Context::Scope context_scope(env_->context()); if (child_port != nullptr) child_port->Close(); + thread_stopper_.Uninstall(); + thread_stopper_.SetStopped(true); env_->stop_sub_worker_contexts(); env_->RunCleanup(); RunAtExit(env_.get()); @@ -215,11 +256,6 @@ void Worker::Run() { WaitForWorkerInspectorToStop(env_.get()); #endif - { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - stopped_ = true; - } - // This call needs to be made while the `Environment` is still alive // because we assume that it is available for async tracking in the // NodePlatform implementation. @@ -227,11 +263,12 @@ void Worker::Run() { } }); + if (thread_stopper_.IsStopped()) return; { HandleScope handle_scope(isolate_); Local<Context> context = NewContext(isolate_); - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; CHECK(!context.IsEmpty()); Context::Scope context_scope(context); { @@ -253,6 +290,14 @@ void Worker::Run() { Debug(this, "Created Environment for worker with id %llu", thread_id_); if (is_stopped()) return; + thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) { + Environment* env_ = static_cast<Environment*>(handle->data); + uv_stop(env_->event_loop()); + }); + uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle())); + + Debug(this, "Created Environment for worker with id %llu", thread_id_); + if (thread_stopper_.IsStopped()) return; { HandleScope handle_scope(isolate_); Mutex::ScopedLock lock(mutex_); @@ -268,7 +313,7 @@ void Worker::Run() { Debug(this, "Created message port for worker %llu", thread_id_); } - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; { #if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR StartWorkerInspector(env_.get(), @@ -289,22 +334,21 @@ void Worker::Run() { Debug(this, "Loaded environment for worker %llu", thread_id_); } - if (is_stopped()) return; + if (thread_stopper_.IsStopped()) return; { SealHandleScope seal(isolate_); bool more; env_->performance_state()->Mark( node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START); do { - if (is_stopped()) break; + if (thread_stopper_.IsStopped()) break; uv_run(&data.loop_, UV_RUN_DEFAULT); - if (is_stopped()) break; + if (thread_stopper_.IsStopped()) break; platform_->DrainTasks(isolate_); more = uv_loop_alive(&data.loop_); - if (more && !is_stopped()) - continue; + if (more && !thread_stopper_.IsStopped()) continue; EmitBeforeExit(env_.get()); @@ -319,7 +363,7 @@ void Worker::Run() { { int exit_code; - bool stopped = is_stopped(); + bool stopped = thread_stopper_.IsStopped(); if (!stopped) exit_code = EmitExit(env_.get()); Mutex::ScopedLock lock(mutex_); @@ -341,35 +385,12 @@ void Worker::JoinThread() { thread_joined_ = true; env()->remove_sub_worker_context(this); - - if (thread_exit_async_) { - env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) { - delete async; - }); - - if (scheduled_on_thread_stopped_) - OnThreadStopped(); - } + OnThreadStopped(); + on_thread_finished_.Uninstall(); } void Worker::OnThreadStopped() { { - Mutex::ScopedLock lock(mutex_); - scheduled_on_thread_stopped_ = false; - - Debug(this, "Worker %llu thread stopped", thread_id_); - - { - Mutex::ScopedLock stopped_lock(stopped_mutex_); - CHECK(stopped_); - } - - parent_port_ = nullptr; - } - - JoinThread(); - - { HandleScope handle_scope(env()->isolate()); Context::Scope context_scope(env()->context()); @@ -391,7 +412,7 @@ Worker::~Worker() { Mutex::ScopedLock lock(mutex_); JoinThread(); - CHECK(stopped_); + CHECK(thread_stopper_.IsStopped()); CHECK(thread_joined_); // This has most likely already happened within the worker thread -- this @@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) { Mutex::ScopedLock lock(w->mutex_); w->env()->add_sub_worker_context(w); - w->stopped_ = false; w->thread_joined_ = false; + w->thread_stopper_.SetStopped(false); - w->thread_exit_async_.reset(new uv_async_t); - w->thread_exit_async_->data = w; - CHECK_EQ(uv_async_init(w->env()->event_loop(), - w->thread_exit_async_.get(), - [](uv_async_t* handle) { - static_cast<Worker*>(handle->data)->OnThreadStopped(); - }), 0); + w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) { + Worker* w_ = static_cast<Worker*>(handle->data); + CHECK(w_->thread_stopper_.IsStopped()); + w_->parent_port_ = nullptr; + w_->JoinThread(); + }); uv_thread_options_t thread_options; thread_options.flags = UV_THREAD_HAS_STACK_SIZE; @@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) { w->Run(); Mutex::ScopedLock lock(w->mutex_); - CHECK(w->thread_exit_async_); - w->scheduled_on_thread_stopped_ = true; - uv_async_send(w->thread_exit_async_.get()); + w->on_thread_finished_.Stop(); }, static_cast<void*>(w)), 0); } @@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) { void Worker::Ref(const FunctionCallbackInfo<Value>& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - if (w->thread_exit_async_) - uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get())); + uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle())); } void Worker::Unref(const FunctionCallbackInfo<Value>& args) { Worker* w; ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); - if (w->thread_exit_async_) - uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get())); + uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle())); } void Worker::Exit(int code) { Mutex::ScopedLock lock(mutex_); - Mutex::ScopedLock stopped_lock(stopped_mutex_); Debug(this, "Worker %llu called Exit(%d)", thread_id_, code); - - if (!stopped_) { - stopped_ = true; + if (!thread_stopper_.IsStopped()) { exit_code_ = code; - if (child_port_ != nullptr) - child_port_->StopEventLoop(); + Debug(this, "Received StopEventLoop request"); + thread_stopper_.Stop(); if (isolate_ != nullptr) isolate_->TerminateExecution(); } |