summaryrefslogtreecommitdiff
path: root/src/node_worker.cc
diff options
context:
space:
mode:
authorGireesh Punathil <gpunathi@in.ibm.com>2019-02-17 05:24:15 -0500
committerAnna Henningsen <anna@addaleax.net>2019-03-01 10:14:55 +0100
commitd14cba401a425dc184f929b38442b1d996cdd5f6 (patch)
treee93f00fee2d16af2bf873a793309cd4c2a64016b /src/node_worker.cc
parent584305841d0fabee5d96ae43badfa271da99a19f (diff)
downloadandroid-node-v8-d14cba401a425dc184f929b38442b1d996cdd5f6.tar.gz
android-node-v8-d14cba401a425dc184f929b38442b1d996cdd5f6.tar.bz2
android-node-v8-d14cba401a425dc184f929b38442b1d996cdd5f6.zip
worker: refactor thread life cycle management
The current mechanism of uses two async handles, one owned by the creator of the worker thread to terminate a running worker, and another one employed by the worker to interrupt its creator on its natural termination. The force termination piggybacks on the message- passing mechanism to inform the worker to quiesce. Also there are few flags that represent the other thread's state / request state because certain code path is shared by multiple control flows, and there are certain code path where the async handles may not have come to life. Refactor into an AsyncRequest abstraction that exposes routines to install a handle as well as to save a state. PR-URL: https://github.com/nodejs/node/pull/26099 Refs: https://github.com/nodejs/node/pull/21283 Reviewed-By: Anna Henningsen <anna@addaleax.net>
Diffstat (limited to 'src/node_worker.cc')
-rw-r--r--src/node_worker.cc137
1 files changed, 75 insertions, 62 deletions
diff --git a/src/node_worker.cc b/src/node_worker.cc
index 2c8222f7f5..0f1535074c 100644
--- a/src/node_worker.cc
+++ b/src/node_worker.cc
@@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {
} // anonymous namespace
+void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
+ Mutex::ScopedLock lock(mutex_);
+ env_ = env;
+ async_ = new uv_async_t;
+ if (data != nullptr) async_->data = data;
+ CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
+}
+
+void AsyncRequest::Uninstall() {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr)
+ env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
+}
+
+void AsyncRequest::Stop() {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = true;
+ if (async_ != nullptr) uv_async_send(async_);
+}
+
+void AsyncRequest::SetStopped(bool flag) {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = flag;
+}
+
+bool AsyncRequest::IsStopped() const {
+ Mutex::ScopedLock lock(mutex_);
+ return stop_;
+}
+
+uv_async_t* AsyncRequest::GetHandle() {
+ Mutex::ScopedLock lock(mutex_);
+ return async_;
+}
+
+void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr) tracker->TrackField("async_request", *async_);
+}
+
Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
@@ -98,8 +138,7 @@ Worker::Worker(Environment* env,
}
bool Worker::is_stopped() const {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- return stopped_;
+ return thread_stopper_.IsStopped();
}
// This class contains data that is only relevant to the child thread itself,
@@ -207,6 +246,8 @@ void Worker::Run() {
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
+ thread_stopper_.Uninstall();
+ thread_stopper_.SetStopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
@@ -215,11 +256,6 @@ void Worker::Run() {
WaitForWorkerInspectorToStop(env_.get());
#endif
- {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- stopped_ = true;
- }
-
// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
// NodePlatform implementation.
@@ -227,11 +263,12 @@ void Worker::Run() {
}
});
+ if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
@@ -253,6 +290,14 @@ void Worker::Run() {
Debug(this, "Created Environment for worker with id %llu", thread_id_);
if (is_stopped()) return;
+ thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
+ Environment* env_ = static_cast<Environment*>(handle->data);
+ uv_stop(env_->event_loop());
+ });
+ uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));
+
+ Debug(this, "Created Environment for worker with id %llu", thread_id_);
+ if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
@@ -268,7 +313,7 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
@@ -289,22 +334,21 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
- if (is_stopped()) break;
+ if (thread_stopper_.IsStopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
- if (is_stopped()) break;
+ if (thread_stopper_.IsStopped()) break;
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
- if (more && !is_stopped())
- continue;
+ if (more && !thread_stopper_.IsStopped()) continue;
EmitBeforeExit(env_.get());
@@ -319,7 +363,7 @@ void Worker::Run() {
{
int exit_code;
- bool stopped = is_stopped();
+ bool stopped = thread_stopper_.IsStopped();
if (!stopped)
exit_code = EmitExit(env_.get());
Mutex::ScopedLock lock(mutex_);
@@ -341,35 +385,12 @@ void Worker::JoinThread() {
thread_joined_ = true;
env()->remove_sub_worker_context(this);
-
- if (thread_exit_async_) {
- env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
- delete async;
- });
-
- if (scheduled_on_thread_stopped_)
- OnThreadStopped();
- }
+ OnThreadStopped();
+ on_thread_finished_.Uninstall();
}
void Worker::OnThreadStopped() {
{
- Mutex::ScopedLock lock(mutex_);
- scheduled_on_thread_stopped_ = false;
-
- Debug(this, "Worker %llu thread stopped", thread_id_);
-
- {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- CHECK(stopped_);
- }
-
- parent_port_ = nullptr;
- }
-
- JoinThread();
-
- {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
@@ -391,7 +412,7 @@ Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);
JoinThread();
- CHECK(stopped_);
+ CHECK(thread_stopper_.IsStopped());
CHECK(thread_joined_);
// This has most likely already happened within the worker thread -- this
@@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Mutex::ScopedLock lock(w->mutex_);
w->env()->add_sub_worker_context(w);
- w->stopped_ = false;
w->thread_joined_ = false;
+ w->thread_stopper_.SetStopped(false);
- w->thread_exit_async_.reset(new uv_async_t);
- w->thread_exit_async_->data = w;
- CHECK_EQ(uv_async_init(w->env()->event_loop(),
- w->thread_exit_async_.get(),
- [](uv_async_t* handle) {
- static_cast<Worker*>(handle->data)->OnThreadStopped();
- }), 0);
+ w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
+ Worker* w_ = static_cast<Worker*>(handle->data);
+ CHECK(w_->thread_stopper_.IsStopped());
+ w_->parent_port_ = nullptr;
+ w_->JoinThread();
+ });
uv_thread_options_t thread_options;
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
@@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->Run();
Mutex::ScopedLock lock(w->mutex_);
- CHECK(w->thread_exit_async_);
- w->scheduled_on_thread_stopped_ = true;
- uv_async_send(w->thread_exit_async_.get());
+ w->on_thread_finished_.Stop();
}, static_cast<void*>(w)), 0);
}
@@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
- if (w->thread_exit_async_)
- uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+ uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
- if (w->thread_exit_async_)
- uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+ uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}
void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
-
- if (!stopped_) {
- stopped_ = true;
+ if (!thread_stopper_.IsStopped()) {
exit_code_ = code;
- if (child_port_ != nullptr)
- child_port_->StopEventLoop();
+ Debug(this, "Received StopEventLoop request");
+ thread_stopper_.Stop();
if (isolate_ != nullptr)
isolate_->TerminateExecution();
}