summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGireesh Punathil <gpunathi@in.ibm.com>2019-02-17 05:24:15 -0500
committerAnna Henningsen <anna@addaleax.net>2019-03-01 10:14:55 +0100
commitd14cba401a425dc184f929b38442b1d996cdd5f6 (patch)
treee93f00fee2d16af2bf873a793309cd4c2a64016b
parent584305841d0fabee5d96ae43badfa271da99a19f (diff)
downloadandroid-node-v8-d14cba401a425dc184f929b38442b1d996cdd5f6.tar.gz
android-node-v8-d14cba401a425dc184f929b38442b1d996cdd5f6.tar.bz2
android-node-v8-d14cba401a425dc184f929b38442b1d996cdd5f6.zip
worker: refactor thread life cycle management
The current mechanism of uses two async handles, one owned by the creator of the worker thread to terminate a running worker, and another one employed by the worker to interrupt its creator on its natural termination. The force termination piggybacks on the message- passing mechanism to inform the worker to quiesce. Also there are few flags that represent the other thread's state / request state because certain code path is shared by multiple control flows, and there are certain code path where the async handles may not have come to life. Refactor into an AsyncRequest abstraction that exposes routines to install a handle as well as to save a state. PR-URL: https://github.com/nodejs/node/pull/26099 Refs: https://github.com/nodejs/node/pull/21283 Reviewed-By: Anna Henningsen <anna@addaleax.net>
-rw-r--r--src/node_messaging.cc16
-rw-r--r--src/node_messaging.h4
-rw-r--r--src/node_worker.cc137
-rw-r--r--src/node_worker.h42
-rw-r--r--test/pummel/test-heapdump-worker.js4
5 files changed, 104 insertions, 99 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 34977557c5..7ca3ad14d0 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -584,13 +584,6 @@ void MessagePort::OnMessage() {
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
- if (stop_event_loop_) {
- Debug(this, "MessagePort stops loop as requested");
- CHECK(!data_->receiving_messages_);
- uv_stop(env()->event_loop());
- break;
- }
-
Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(data_->receiving_messages_));
@@ -740,15 +733,6 @@ void MessagePort::Stop() {
data_->receiving_messages_ = false;
}
-void MessagePort::StopEventLoop() {
- Mutex::ScopedLock lock(data_->mutex_);
- data_->receiving_messages_ = false;
- stop_event_loop_ = true;
-
- Debug(this, "Received StopEventLoop request");
- TriggerAsync();
-}
-
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
diff --git a/src/node_messaging.h b/src/node_messaging.h
index cfda69ae7f..9055a8bf96 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -159,9 +159,6 @@ class MessagePort : public HandleWrap {
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
- // Stop processing messages on this port as a receiving end,
- // and stop the event loop that this port is associated with.
- void StopEventLoop();
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -206,7 +203,6 @@ class MessagePort : public HandleWrap {
inline uv_async_t* async();
std::unique_ptr<MessagePortData> data_ = nullptr;
- bool stop_event_loop_ = false;
friend class MessagePortData;
};
diff --git a/src/node_worker.cc b/src/node_worker.cc
index 2c8222f7f5..0f1535074c 100644
--- a/src/node_worker.cc
+++ b/src/node_worker.cc
@@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {
} // anonymous namespace
+void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
+ Mutex::ScopedLock lock(mutex_);
+ env_ = env;
+ async_ = new uv_async_t;
+ if (data != nullptr) async_->data = data;
+ CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
+}
+
+void AsyncRequest::Uninstall() {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr)
+ env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
+}
+
+void AsyncRequest::Stop() {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = true;
+ if (async_ != nullptr) uv_async_send(async_);
+}
+
+void AsyncRequest::SetStopped(bool flag) {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = flag;
+}
+
+bool AsyncRequest::IsStopped() const {
+ Mutex::ScopedLock lock(mutex_);
+ return stop_;
+}
+
+uv_async_t* AsyncRequest::GetHandle() {
+ Mutex::ScopedLock lock(mutex_);
+ return async_;
+}
+
+void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr) tracker->TrackField("async_request", *async_);
+}
+
Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
@@ -98,8 +138,7 @@ Worker::Worker(Environment* env,
}
bool Worker::is_stopped() const {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- return stopped_;
+ return thread_stopper_.IsStopped();
}
// This class contains data that is only relevant to the child thread itself,
@@ -207,6 +246,8 @@ void Worker::Run() {
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
+ thread_stopper_.Uninstall();
+ thread_stopper_.SetStopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
@@ -215,11 +256,6 @@ void Worker::Run() {
WaitForWorkerInspectorToStop(env_.get());
#endif
- {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- stopped_ = true;
- }
-
// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
// NodePlatform implementation.
@@ -227,11 +263,12 @@ void Worker::Run() {
}
});
+ if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
@@ -253,6 +290,14 @@ void Worker::Run() {
Debug(this, "Created Environment for worker with id %llu", thread_id_);
if (is_stopped()) return;
+ thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
+ Environment* env_ = static_cast<Environment*>(handle->data);
+ uv_stop(env_->event_loop());
+ });
+ uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));
+
+ Debug(this, "Created Environment for worker with id %llu", thread_id_);
+ if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
@@ -268,7 +313,7 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
@@ -289,22 +334,21 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
- if (is_stopped()) break;
+ if (thread_stopper_.IsStopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
- if (is_stopped()) break;
+ if (thread_stopper_.IsStopped()) break;
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
- if (more && !is_stopped())
- continue;
+ if (more && !thread_stopper_.IsStopped()) continue;
EmitBeforeExit(env_.get());
@@ -319,7 +363,7 @@ void Worker::Run() {
{
int exit_code;
- bool stopped = is_stopped();
+ bool stopped = thread_stopper_.IsStopped();
if (!stopped)
exit_code = EmitExit(env_.get());
Mutex::ScopedLock lock(mutex_);
@@ -341,35 +385,12 @@ void Worker::JoinThread() {
thread_joined_ = true;
env()->remove_sub_worker_context(this);
-
- if (thread_exit_async_) {
- env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
- delete async;
- });
-
- if (scheduled_on_thread_stopped_)
- OnThreadStopped();
- }
+ OnThreadStopped();
+ on_thread_finished_.Uninstall();
}
void Worker::OnThreadStopped() {
{
- Mutex::ScopedLock lock(mutex_);
- scheduled_on_thread_stopped_ = false;
-
- Debug(this, "Worker %llu thread stopped", thread_id_);
-
- {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- CHECK(stopped_);
- }
-
- parent_port_ = nullptr;
- }
-
- JoinThread();
-
- {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
@@ -391,7 +412,7 @@ Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);
JoinThread();
- CHECK(stopped_);
+ CHECK(thread_stopper_.IsStopped());
CHECK(thread_joined_);
// This has most likely already happened within the worker thread -- this
@@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Mutex::ScopedLock lock(w->mutex_);
w->env()->add_sub_worker_context(w);
- w->stopped_ = false;
w->thread_joined_ = false;
+ w->thread_stopper_.SetStopped(false);
- w->thread_exit_async_.reset(new uv_async_t);
- w->thread_exit_async_->data = w;
- CHECK_EQ(uv_async_init(w->env()->event_loop(),
- w->thread_exit_async_.get(),
- [](uv_async_t* handle) {
- static_cast<Worker*>(handle->data)->OnThreadStopped();
- }), 0);
+ w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
+ Worker* w_ = static_cast<Worker*>(handle->data);
+ CHECK(w_->thread_stopper_.IsStopped());
+ w_->parent_port_ = nullptr;
+ w_->JoinThread();
+ });
uv_thread_options_t thread_options;
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
@@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->Run();
Mutex::ScopedLock lock(w->mutex_);
- CHECK(w->thread_exit_async_);
- w->scheduled_on_thread_stopped_ = true;
- uv_async_send(w->thread_exit_async_.get());
+ w->on_thread_finished_.Stop();
}, static_cast<void*>(w)), 0);
}
@@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
- if (w->thread_exit_async_)
- uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+ uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
- if (w->thread_exit_async_)
- uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+ uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}
void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
-
- if (!stopped_) {
- stopped_ = true;
+ if (!thread_stopper_.IsStopped()) {
exit_code_ = code;
- if (child_port_ != nullptr)
- child_port_->StopEventLoop();
+ Debug(this, "Received StopEventLoop request");
+ thread_stopper_.Stop();
if (isolate_ != nullptr)
isolate_->TerminateExecution();
}
diff --git a/src/node_worker.h b/src/node_worker.h
index dad0713fd9..fb94bdc307 100644
--- a/src/node_worker.h
+++ b/src/node_worker.h
@@ -3,14 +3,35 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
-#include "node_messaging.h"
#include <unordered_map>
+#include "node_messaging.h"
+#include "uv.h"
namespace node {
namespace worker {
class WorkerThreadData;
+class AsyncRequest : public MemoryRetainer {
+ public:
+ AsyncRequest() {}
+ void Install(Environment* env, void* data, uv_async_cb target);
+ void Uninstall();
+ void Stop();
+ void SetStopped(bool flag);
+ bool IsStopped() const;
+ uv_async_t* GetHandle();
+ void MemoryInfo(MemoryTracker* tracker) const override;
+ SET_MEMORY_INFO_NAME(AsyncRequest)
+ SET_SELF_SIZE(AsyncRequest)
+
+ private:
+ Environment* env_;
+ uv_async_t* async_ = nullptr;
+ mutable Mutex mutex_;
+ bool stop_ = true;
+};
+
// A worker thread, as represented in its parent thread.
class Worker : public AsyncWrap {
public:
@@ -31,11 +52,9 @@ class Worker : public AsyncWrap {
void JoinThread();
void MemoryInfo(MemoryTracker* tracker) const override {
- tracker->TrackFieldWithSize(
- "isolate_data", sizeof(IsolateData), "IsolateData");
- tracker->TrackFieldWithSize("env", sizeof(Environment), "Environment");
- tracker->TrackField("thread_exit_async", *thread_exit_async_);
tracker->TrackField("parent_port", parent_port_);
+ tracker->TrackInlineField(&thread_stopper_, "thread_stopper_");
+ tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
}
SET_MEMORY_INFO_NAME(Worker)
@@ -67,16 +86,6 @@ class Worker : public AsyncWrap {
// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;
- // Currently only used for telling the parent thread that the child
- // thread exited.
- std::unique_ptr<uv_async_t> thread_exit_async_;
- bool scheduled_on_thread_stopped_ = false;
-
- // This mutex only protects stopped_. If both locks are acquired, this needs
- // to be the latter one.
- mutable Mutex stopped_mutex_;
- bool stopped_ = true;
-
bool thread_joined_ = true;
int exit_code_ = 0;
uint64_t thread_id_ = -1;
@@ -96,6 +105,9 @@ class Worker : public AsyncWrap {
// instance refers to it via its [kPort] property.
MessagePort* parent_port_ = nullptr;
+ AsyncRequest thread_stopper_;
+ AsyncRequest on_thread_finished_;
+
friend class WorkerThreadData;
};
diff --git a/test/pummel/test-heapdump-worker.js b/test/pummel/test-heapdump-worker.js
index 06679964a2..2a3c93a7ad 100644
--- a/test/pummel/test-heapdump-worker.js
+++ b/test/pummel/test-heapdump-worker.js
@@ -9,8 +9,8 @@ const worker = new Worker('setInterval(() => {}, 100);', { eval: true });
validateSnapshotNodes('Node / Worker', [
{
children: [
- { node_name: 'Node / uv_async_t', edge_name: 'thread_exit_async' },
- { node_name: 'Node / Environment', edge_name: 'env' },
+ { node_name: 'Node / AsyncRequest', edge_name: 'thread_stopper_' },
+ { node_name: 'Node / AsyncRequest', edge_name: 'on_thread_finished_' },
{ node_name: 'Node / MessagePort', edge_name: 'parent_port' },
{ node_name: 'Worker', edge_name: 'wrapped' }
]