summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/node_messaging.cc16
-rw-r--r--src/node_messaging.h4
-rw-r--r--src/node_worker.cc137
-rw-r--r--src/node_worker.h42
-rw-r--r--test/pummel/test-heapdump-worker.js4
5 files changed, 104 insertions, 99 deletions
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index 34977557c5..7ca3ad14d0 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -584,13 +584,6 @@ void MessagePort::OnMessage() {
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
- if (stop_event_loop_) {
- Debug(this, "MessagePort stops loop as requested");
- CHECK(!data_->receiving_messages_);
- uv_stop(env()->event_loop());
- break;
- }
-
Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(data_->receiving_messages_));
@@ -740,15 +733,6 @@ void MessagePort::Stop() {
data_->receiving_messages_ = false;
}
-void MessagePort::StopEventLoop() {
- Mutex::ScopedLock lock(data_->mutex_);
- data_->receiving_messages_ = false;
- stop_event_loop_ = true;
-
- Debug(this, "Received StopEventLoop request");
- TriggerAsync();
-}
-
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
diff --git a/src/node_messaging.h b/src/node_messaging.h
index cfda69ae7f..9055a8bf96 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -159,9 +159,6 @@ class MessagePort : public HandleWrap {
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
- // Stop processing messages on this port as a receiving end,
- // and stop the event loop that this port is associated with.
- void StopEventLoop();
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
@@ -206,7 +203,6 @@ class MessagePort : public HandleWrap {
inline uv_async_t* async();
std::unique_ptr<MessagePortData> data_ = nullptr;
- bool stop_event_loop_ = false;
friend class MessagePortData;
};
diff --git a/src/node_worker.cc b/src/node_worker.cc
index 2c8222f7f5..0f1535074c 100644
--- a/src/node_worker.cc
+++ b/src/node_worker.cc
@@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {
} // anonymous namespace
+void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
+ Mutex::ScopedLock lock(mutex_);
+ env_ = env;
+ async_ = new uv_async_t;
+ if (data != nullptr) async_->data = data;
+ CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
+}
+
+void AsyncRequest::Uninstall() {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr)
+ env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
+}
+
+void AsyncRequest::Stop() {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = true;
+ if (async_ != nullptr) uv_async_send(async_);
+}
+
+void AsyncRequest::SetStopped(bool flag) {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = flag;
+}
+
+bool AsyncRequest::IsStopped() const {
+ Mutex::ScopedLock lock(mutex_);
+ return stop_;
+}
+
+uv_async_t* AsyncRequest::GetHandle() {
+ Mutex::ScopedLock lock(mutex_);
+ return async_;
+}
+
+void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr) tracker->TrackField("async_request", *async_);
+}
+
Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
@@ -98,8 +138,7 @@ Worker::Worker(Environment* env,
}
bool Worker::is_stopped() const {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- return stopped_;
+ return thread_stopper_.IsStopped();
}
// This class contains data that is only relevant to the child thread itself,
@@ -207,6 +246,8 @@ void Worker::Run() {
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
+ thread_stopper_.Uninstall();
+ thread_stopper_.SetStopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
@@ -215,11 +256,6 @@ void Worker::Run() {
WaitForWorkerInspectorToStop(env_.get());
#endif
- {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- stopped_ = true;
- }
-
// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
// NodePlatform implementation.
@@ -227,11 +263,12 @@ void Worker::Run() {
}
});
+ if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
@@ -253,6 +290,14 @@ void Worker::Run() {
Debug(this, "Created Environment for worker with id %llu", thread_id_);
if (is_stopped()) return;
+ thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
+ Environment* env_ = static_cast<Environment*>(handle->data);
+ uv_stop(env_->event_loop());
+ });
+ uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));
+
+ Debug(this, "Created Environment for worker with id %llu", thread_id_);
+ if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
@@ -268,7 +313,7 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
@@ -289,22 +334,21 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
- if (is_stopped()) break;
+ if (thread_stopper_.IsStopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
- if (is_stopped()) break;
+ if (thread_stopper_.IsStopped()) break;
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
- if (more && !is_stopped())
- continue;
+ if (more && !thread_stopper_.IsStopped()) continue;
EmitBeforeExit(env_.get());
@@ -319,7 +363,7 @@ void Worker::Run() {
{
int exit_code;
- bool stopped = is_stopped();
+ bool stopped = thread_stopper_.IsStopped();
if (!stopped)
exit_code = EmitExit(env_.get());
Mutex::ScopedLock lock(mutex_);
@@ -341,35 +385,12 @@ void Worker::JoinThread() {
thread_joined_ = true;
env()->remove_sub_worker_context(this);
-
- if (thread_exit_async_) {
- env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
- delete async;
- });
-
- if (scheduled_on_thread_stopped_)
- OnThreadStopped();
- }
+ OnThreadStopped();
+ on_thread_finished_.Uninstall();
}
void Worker::OnThreadStopped() {
{
- Mutex::ScopedLock lock(mutex_);
- scheduled_on_thread_stopped_ = false;
-
- Debug(this, "Worker %llu thread stopped", thread_id_);
-
- {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- CHECK(stopped_);
- }
-
- parent_port_ = nullptr;
- }
-
- JoinThread();
-
- {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
@@ -391,7 +412,7 @@ Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);
JoinThread();
- CHECK(stopped_);
+ CHECK(thread_stopper_.IsStopped());
CHECK(thread_joined_);
// This has most likely already happened within the worker thread -- this
@@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Mutex::ScopedLock lock(w->mutex_);
w->env()->add_sub_worker_context(w);
- w->stopped_ = false;
w->thread_joined_ = false;
+ w->thread_stopper_.SetStopped(false);
- w->thread_exit_async_.reset(new uv_async_t);
- w->thread_exit_async_->data = w;
- CHECK_EQ(uv_async_init(w->env()->event_loop(),
- w->thread_exit_async_.get(),
- [](uv_async_t* handle) {
- static_cast<Worker*>(handle->data)->OnThreadStopped();
- }), 0);
+ w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
+ Worker* w_ = static_cast<Worker*>(handle->data);
+ CHECK(w_->thread_stopper_.IsStopped());
+ w_->parent_port_ = nullptr;
+ w_->JoinThread();
+ });
uv_thread_options_t thread_options;
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
@@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->Run();
Mutex::ScopedLock lock(w->mutex_);
- CHECK(w->thread_exit_async_);
- w->scheduled_on_thread_stopped_ = true;
- uv_async_send(w->thread_exit_async_.get());
+ w->on_thread_finished_.Stop();
}, static_cast<void*>(w)), 0);
}
@@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
- if (w->thread_exit_async_)
- uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+ uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
- if (w->thread_exit_async_)
- uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+ uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}
void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
-
- if (!stopped_) {
- stopped_ = true;
+ if (!thread_stopper_.IsStopped()) {
exit_code_ = code;
- if (child_port_ != nullptr)
- child_port_->StopEventLoop();
+ Debug(this, "Received StopEventLoop request");
+ thread_stopper_.Stop();
if (isolate_ != nullptr)
isolate_->TerminateExecution();
}
diff --git a/src/node_worker.h b/src/node_worker.h
index dad0713fd9..fb94bdc307 100644
--- a/src/node_worker.h
+++ b/src/node_worker.h
@@ -3,14 +3,35 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
-#include "node_messaging.h"
#include <unordered_map>
+#include "node_messaging.h"
+#include "uv.h"
namespace node {
namespace worker {
class WorkerThreadData;
+class AsyncRequest : public MemoryRetainer {
+ public:
+ AsyncRequest() {}
+ void Install(Environment* env, void* data, uv_async_cb target);
+ void Uninstall();
+ void Stop();
+ void SetStopped(bool flag);
+ bool IsStopped() const;
+ uv_async_t* GetHandle();
+ void MemoryInfo(MemoryTracker* tracker) const override;
+ SET_MEMORY_INFO_NAME(AsyncRequest)
+ SET_SELF_SIZE(AsyncRequest)
+
+ private:
+ Environment* env_;
+ uv_async_t* async_ = nullptr;
+ mutable Mutex mutex_;
+ bool stop_ = true;
+};
+
// A worker thread, as represented in its parent thread.
class Worker : public AsyncWrap {
public:
@@ -31,11 +52,9 @@ class Worker : public AsyncWrap {
void JoinThread();
void MemoryInfo(MemoryTracker* tracker) const override {
- tracker->TrackFieldWithSize(
- "isolate_data", sizeof(IsolateData), "IsolateData");
- tracker->TrackFieldWithSize("env", sizeof(Environment), "Environment");
- tracker->TrackField("thread_exit_async", *thread_exit_async_);
tracker->TrackField("parent_port", parent_port_);
+ tracker->TrackInlineField(&thread_stopper_, "thread_stopper_");
+ tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
}
SET_MEMORY_INFO_NAME(Worker)
@@ -67,16 +86,6 @@ class Worker : public AsyncWrap {
// This mutex protects access to all variables listed below it.
mutable Mutex mutex_;
- // Currently only used for telling the parent thread that the child
- // thread exited.
- std::unique_ptr<uv_async_t> thread_exit_async_;
- bool scheduled_on_thread_stopped_ = false;
-
- // This mutex only protects stopped_. If both locks are acquired, this needs
- // to be the latter one.
- mutable Mutex stopped_mutex_;
- bool stopped_ = true;
-
bool thread_joined_ = true;
int exit_code_ = 0;
uint64_t thread_id_ = -1;
@@ -96,6 +105,9 @@ class Worker : public AsyncWrap {
// instance refers to it via its [kPort] property.
MessagePort* parent_port_ = nullptr;
+ AsyncRequest thread_stopper_;
+ AsyncRequest on_thread_finished_;
+
friend class WorkerThreadData;
};
diff --git a/test/pummel/test-heapdump-worker.js b/test/pummel/test-heapdump-worker.js
index 06679964a2..2a3c93a7ad 100644
--- a/test/pummel/test-heapdump-worker.js
+++ b/test/pummel/test-heapdump-worker.js
@@ -9,8 +9,8 @@ const worker = new Worker('setInterval(() => {}, 100);', { eval: true });
validateSnapshotNodes('Node / Worker', [
{
children: [
- { node_name: 'Node / uv_async_t', edge_name: 'thread_exit_async' },
- { node_name: 'Node / Environment', edge_name: 'env' },
+ { node_name: 'Node / AsyncRequest', edge_name: 'thread_stopper_' },
+ { node_name: 'Node / AsyncRequest', edge_name: 'on_thread_finished_' },
{ node_name: 'Node / MessagePort', edge_name: 'parent_port' },
{ node_name: 'Worker', edge_name: 'wrapped' }
]