summaryrefslogtreecommitdiff
path: root/src/node_worker.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/node_worker.cc')
-rw-r--r--src/node_worker.cc137
1 files changed, 75 insertions, 62 deletions
diff --git a/src/node_worker.cc b/src/node_worker.cc
index 2c8222f7f5..0f1535074c 100644
--- a/src/node_worker.cc
+++ b/src/node_worker.cc
@@ -58,6 +58,46 @@ void WaitForWorkerInspectorToStop(Environment* child) {
} // anonymous namespace
+void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
+ Mutex::ScopedLock lock(mutex_);
+ env_ = env;
+ async_ = new uv_async_t;
+ if (data != nullptr) async_->data = data;
+ CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
+}
+
+void AsyncRequest::Uninstall() {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr)
+ env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
+}
+
+void AsyncRequest::Stop() {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = true;
+ if (async_ != nullptr) uv_async_send(async_);
+}
+
+void AsyncRequest::SetStopped(bool flag) {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = flag;
+}
+
+bool AsyncRequest::IsStopped() const {
+ Mutex::ScopedLock lock(mutex_);
+ return stop_;
+}
+
+uv_async_t* AsyncRequest::GetHandle() {
+ Mutex::ScopedLock lock(mutex_);
+ return async_;
+}
+
+void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr) tracker->TrackField("async_request", *async_);
+}
+
Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
@@ -98,8 +138,7 @@ Worker::Worker(Environment* env,
}
bool Worker::is_stopped() const {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- return stopped_;
+ return thread_stopper_.IsStopped();
}
// This class contains data that is only relevant to the child thread itself,
@@ -207,6 +246,8 @@ void Worker::Run() {
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
+ thread_stopper_.Uninstall();
+ thread_stopper_.SetStopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
@@ -215,11 +256,6 @@ void Worker::Run() {
WaitForWorkerInspectorToStop(env_.get());
#endif
- {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- stopped_ = true;
- }
-
// This call needs to be made while the `Environment` is still alive
// because we assume that it is available for async tracking in the
// NodePlatform implementation.
@@ -227,11 +263,12 @@ void Worker::Run() {
}
});
+ if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
@@ -253,6 +290,14 @@ void Worker::Run() {
Debug(this, "Created Environment for worker with id %llu", thread_id_);
if (is_stopped()) return;
+ thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
+ Environment* env_ = static_cast<Environment*>(handle->data);
+ uv_stop(env_->event_loop());
+ });
+ uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));
+
+ Debug(this, "Created Environment for worker with id %llu", thread_id_);
+ if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
@@ -268,7 +313,7 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
@@ -289,22 +334,21 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}
- if (is_stopped()) return;
+ if (thread_stopper_.IsStopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
- if (is_stopped()) break;
+ if (thread_stopper_.IsStopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
- if (is_stopped()) break;
+ if (thread_stopper_.IsStopped()) break;
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
- if (more && !is_stopped())
- continue;
+ if (more && !thread_stopper_.IsStopped()) continue;
EmitBeforeExit(env_.get());
@@ -319,7 +363,7 @@ void Worker::Run() {
{
int exit_code;
- bool stopped = is_stopped();
+ bool stopped = thread_stopper_.IsStopped();
if (!stopped)
exit_code = EmitExit(env_.get());
Mutex::ScopedLock lock(mutex_);
@@ -341,35 +385,12 @@ void Worker::JoinThread() {
thread_joined_ = true;
env()->remove_sub_worker_context(this);
-
- if (thread_exit_async_) {
- env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
- delete async;
- });
-
- if (scheduled_on_thread_stopped_)
- OnThreadStopped();
- }
+ OnThreadStopped();
+ on_thread_finished_.Uninstall();
}
void Worker::OnThreadStopped() {
{
- Mutex::ScopedLock lock(mutex_);
- scheduled_on_thread_stopped_ = false;
-
- Debug(this, "Worker %llu thread stopped", thread_id_);
-
- {
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
- CHECK(stopped_);
- }
-
- parent_port_ = nullptr;
- }
-
- JoinThread();
-
- {
HandleScope handle_scope(env()->isolate());
Context::Scope context_scope(env()->context());
@@ -391,7 +412,7 @@ Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);
JoinThread();
- CHECK(stopped_);
+ CHECK(thread_stopper_.IsStopped());
CHECK(thread_joined_);
// This has most likely already happened within the worker thread -- this
@@ -480,16 +501,15 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
Mutex::ScopedLock lock(w->mutex_);
w->env()->add_sub_worker_context(w);
- w->stopped_ = false;
w->thread_joined_ = false;
+ w->thread_stopper_.SetStopped(false);
- w->thread_exit_async_.reset(new uv_async_t);
- w->thread_exit_async_->data = w;
- CHECK_EQ(uv_async_init(w->env()->event_loop(),
- w->thread_exit_async_.get(),
- [](uv_async_t* handle) {
- static_cast<Worker*>(handle->data)->OnThreadStopped();
- }), 0);
+ w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
+ Worker* w_ = static_cast<Worker*>(handle->data);
+ CHECK(w_->thread_stopper_.IsStopped());
+ w_->parent_port_ = nullptr;
+ w_->JoinThread();
+ });
uv_thread_options_t thread_options;
thread_options.flags = UV_THREAD_HAS_STACK_SIZE;
@@ -505,9 +525,7 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->Run();
Mutex::ScopedLock lock(w->mutex_);
- CHECK(w->thread_exit_async_);
- w->scheduled_on_thread_stopped_ = true;
- uv_async_send(w->thread_exit_async_.get());
+ w->on_thread_finished_.Stop();
}, static_cast<void*>(w)), 0);
}
@@ -523,28 +541,23 @@ void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
- if (w->thread_exit_async_)
- uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+ uv_ref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}
void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
Worker* w;
ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
- if (w->thread_exit_async_)
- uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+ uv_unref(reinterpret_cast<uv_handle_t*>(w->on_thread_finished_.GetHandle()));
}
void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
- Mutex::ScopedLock stopped_lock(stopped_mutex_);
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
-
- if (!stopped_) {
- stopped_ = true;
+ if (!thread_stopper_.IsStopped()) {
exit_code_ = code;
- if (child_port_ != nullptr)
- child_port_->StopEventLoop();
+ Debug(this, "Received StopEventLoop request");
+ thread_stopper_.Stop();
if (isolate_ != nullptr)
isolate_->TerminateExecution();
}