summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2017-09-01 17:03:41 +0200
committerAnna Henningsen <anna@addaleax.net>2018-06-06 19:43:52 +0200
commit0df031acadcc6490379d72676203a980c8d60592 (patch)
tree3f49864e72b0193ea9af937874f62c6316877ec4 /src
parent8939f36630bd718fc0b0b8557cf7f2ed9ecab312 (diff)
downloadandroid-node-v8-0df031acadcc6490379d72676203a980c8d60592.tar.gz
android-node-v8-0df031acadcc6490379d72676203a980c8d60592.tar.bz2
android-node-v8-0df031acadcc6490379d72676203a980c8d60592.zip
worker: initial implementation
Implement multi-threading support for most of the API. Thanks to Stephen Belanger for reviewing this change in its original form, to Olivia Hugger for reviewing the documentation and some of the tests coming along with it, and to Alexey Orlenko and Timothy Gu for reviewing other parts of the tests. Refs: https://github.com/ayojs/ayo/pull/110 Refs: https://github.com/ayojs/ayo/pull/114 Refs: https://github.com/ayojs/ayo/pull/117 PR-URL: https://github.com/nodejs/node/pull/20876 Reviewed-By: Gireesh Punathil <gpunathi@in.ibm.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Shingo Inoue <leko.noor@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com> Reviewed-By: John-David Dalton <john.david.dalton@gmail.com> Reviewed-By: Gus Caplan <me@gus.host>
Diffstat (limited to 'src')
-rw-r--r--src/async_wrap.h1
-rw-r--r--src/base_object-inl.h8
-rw-r--r--src/base_object.h4
-rw-r--r--src/bootstrapper.cc14
-rw-r--r--src/callback_scope.cc5
-rw-r--r--src/env-inl.h31
-rw-r--r--src/env.cc27
-rw-r--r--src/env.h29
-rw-r--r--src/js_stream.cc15
-rw-r--r--src/node.cc116
-rw-r--r--src/node_errors.h4
-rw-r--r--src/node_internals.h5
-rw-r--r--src/node_messaging.cc29
-rw-r--r--src/node_messaging.h5
-rw-r--r--src/node_worker.cc428
-rw-r--r--src/node_worker.h83
16 files changed, 746 insertions, 58 deletions
diff --git a/src/async_wrap.h b/src/async_wrap.h
index cf269a4c1f..b2f96477b4 100644
--- a/src/async_wrap.h
+++ b/src/async_wrap.h
@@ -67,6 +67,7 @@ namespace node {
V(TTYWRAP) \
V(UDPSENDWRAP) \
V(UDPWRAP) \
+ V(WORKER) \
V(WRITEWRAP) \
V(ZLIB)
diff --git a/src/base_object-inl.h b/src/base_object-inl.h
index 3bd854639b..06a2922397 100644
--- a/src/base_object-inl.h
+++ b/src/base_object-inl.h
@@ -65,6 +65,14 @@ v8::Local<v8::Object> BaseObject::object() {
return PersistentToLocal(env_->isolate(), persistent_handle_);
}
+v8::Local<v8::Object> BaseObject::object(v8::Isolate* isolate) {
+ v8::Local<v8::Object> handle = object();
+#ifdef DEBUG
+ CHECK_EQ(handle->CreationContext()->GetIsolate(), isolate);
+ CHECK_EQ(env_->isolate(), isolate);
+#endif
+ return handle;
+}
Environment* BaseObject::env() const {
return env_;
diff --git a/src/base_object.h b/src/base_object.h
index e0b6084340..38291d598f 100644
--- a/src/base_object.h
+++ b/src/base_object.h
@@ -43,6 +43,10 @@ class BaseObject {
// persistent.IsEmpty() is true.
inline v8::Local<v8::Object> object();
+ // Same as the above, except it additionally verifies that this object
+ // is associated with the passed Isolate in debug mode.
+ inline v8::Local<v8::Object> object(v8::Isolate* isolate);
+
inline Persistent<v8::Object>& persistent();
inline Environment* env() const;
diff --git a/src/bootstrapper.cc b/src/bootstrapper.cc
index 35c7c4dc69..f9db02562d 100644
--- a/src/bootstrapper.cc
+++ b/src/bootstrapper.cc
@@ -114,12 +114,14 @@ void SetupBootstrapObject(Environment* env,
BOOTSTRAP_METHOD(_umask, Umask);
#if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__)
- BOOTSTRAP_METHOD(_initgroups, InitGroups);
- BOOTSTRAP_METHOD(_setegid, SetEGid);
- BOOTSTRAP_METHOD(_seteuid, SetEUid);
- BOOTSTRAP_METHOD(_setgid, SetGid);
- BOOTSTRAP_METHOD(_setuid, SetUid);
- BOOTSTRAP_METHOD(_setgroups, SetGroups);
+ if (env->is_main_thread()) {
+ BOOTSTRAP_METHOD(_initgroups, InitGroups);
+ BOOTSTRAP_METHOD(_setegid, SetEGid);
+ BOOTSTRAP_METHOD(_seteuid, SetEUid);
+ BOOTSTRAP_METHOD(_setgid, SetGid);
+ BOOTSTRAP_METHOD(_setuid, SetUid);
+ BOOTSTRAP_METHOD(_setgroups, SetGroups);
+ }
#endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__)
Local<String> should_abort_on_uncaught_toggle =
diff --git a/src/callback_scope.cc b/src/callback_scope.cc
index 9eac7beb03..23e6d5b063 100644
--- a/src/callback_scope.cc
+++ b/src/callback_scope.cc
@@ -79,6 +79,11 @@ void InternalCallbackScope::Close() {
closed_ = true;
HandleScope handle_scope(env_->isolate());
+ if (!env_->can_call_into_js()) return;
+ if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) {
+ env_->async_hooks()->clear_async_id_stack();
+ }
+
if (pushed_ids_)
env_->async_hooks()->pop_async_id(async_context_.async_id);
diff --git a/src/env-inl.h b/src/env-inl.h
index 50328bd77c..eeb419b4a0 100644
--- a/src/env-inl.h
+++ b/src/env-inl.h
@@ -582,13 +582,42 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb,
}
inline bool Environment::can_call_into_js() const {
- return can_call_into_js_;
+ return can_call_into_js_ && (is_main_thread() || !is_stopping_worker());
}
inline void Environment::set_can_call_into_js(bool can_call_into_js) {
can_call_into_js_ = can_call_into_js;
}
+inline bool Environment::is_main_thread() const {
+ return thread_id_ == 0;
+}
+
+inline double Environment::thread_id() const {
+ return thread_id_;
+}
+
+inline void Environment::set_thread_id(double id) {
+ thread_id_ = id;
+}
+
+inline worker::Worker* Environment::worker_context() const {
+ return worker_context_;
+}
+
+inline void Environment::set_worker_context(worker::Worker* context) {
+ CHECK_EQ(worker_context_, nullptr); // Should be set only once.
+ worker_context_ = context;
+}
+
+inline void Environment::add_sub_worker_context(worker::Worker* context) {
+ sub_worker_contexts_.insert(context);
+}
+
+inline void Environment::remove_sub_worker_context(worker::Worker* context) {
+ sub_worker_contexts_.erase(context);
+}
+
inline performance::performance_state* Environment::performance_state() {
return performance_state_.get();
}
diff --git a/src/env.cc b/src/env.cc
index 090b43968b..8df59d1546 100644
--- a/src/env.cc
+++ b/src/env.cc
@@ -4,6 +4,7 @@
#include "node_buffer.h"
#include "node_platform.h"
#include "node_file.h"
+#include "node_worker.h"
#include "tracing/agent.h"
#include <stdio.h>
@@ -25,6 +26,7 @@ using v8::StackTrace;
using v8::String;
using v8::Symbol;
using v8::Value;
+using worker::Worker;
IsolateData::IsolateData(Isolate* isolate,
uv_loop_t* event_loop,
@@ -444,7 +446,9 @@ void Environment::RunAndClearNativeImmediates() {
if (it->refed_)
ref_count++;
if (UNLIKELY(try_catch.HasCaught())) {
- FatalException(isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(isolate(), try_catch);
+
// Bail out, remove the already executed callbacks from list
// and set up a new TryCatch for the other pending callbacks.
std::move_backward(it, list.end(), list.begin() + (list.end() - it));
@@ -632,4 +636,25 @@ void Environment::AsyncHooks::grow_async_ids_stack() {
uv_key_t Environment::thread_local_env = {};
+void Environment::Exit(int exit_code) {
+ if (is_main_thread())
+ exit(exit_code);
+ else
+ worker_context_->Exit(exit_code);
+}
+
+void Environment::stop_sub_worker_contexts() {
+ while (!sub_worker_contexts_.empty()) {
+ Worker* w = *sub_worker_contexts_.begin();
+ remove_sub_worker_context(w);
+ w->Exit(1);
+ w->JoinThread();
+ }
+}
+
+bool Environment::is_stopping_worker() const {
+ CHECK(!is_main_thread());
+ return worker_context_->is_stopped();
+}
+
} // namespace node
diff --git a/src/env.h b/src/env.h
index cdb592732a..cf6873e5fe 100644
--- a/src/env.h
+++ b/src/env.h
@@ -55,6 +55,10 @@ namespace performance {
class performance_state;
}
+namespace worker {
+class Worker;
+}
+
namespace loader {
class ModuleWrap;
@@ -193,7 +197,10 @@ struct PackageConfig {
V(mac_string, "mac") \
V(main_string, "main") \
V(max_buffer_string, "maxBuffer") \
+ V(max_semi_space_size_string, "maxSemiSpaceSize") \
+ V(max_old_space_size_string, "maxOldSpaceSize") \
V(message_string, "message") \
+ V(message_port_string, "messagePort") \
V(message_port_constructor_string, "MessagePort") \
V(minttl_string, "minttl") \
V(modulus_string, "modulus") \
@@ -280,6 +287,7 @@ struct PackageConfig {
V(subject_string, "subject") \
V(subjectaltname_string, "subjectaltname") \
V(syscall_string, "syscall") \
+ V(thread_id_string, "threadId") \
V(ticketkeycallback_string, "onticketkeycallback") \
V(timeout_string, "timeout") \
V(tls_ticket_string, "tlsTicket") \
@@ -328,6 +336,7 @@ struct PackageConfig {
V(http2stream_constructor_template, v8::ObjectTemplate) \
V(immediate_callback_function, v8::Function) \
V(inspector_console_api_object, v8::Object) \
+ V(message_port, v8::Object) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
@@ -601,6 +610,7 @@ class Environment {
void RegisterHandleCleanups();
void CleanupHandles();
+ void Exit(int code);
// Register clean-up cb to be called on environment destruction.
inline void RegisterHandleCleanup(uv_handle_t* handle,
@@ -714,6 +724,18 @@ class Environment {
inline bool can_call_into_js() const;
inline void set_can_call_into_js(bool can_call_into_js);
+ // TODO(addaleax): This should be inline.
+ bool is_stopping_worker() const;
+
+ inline bool is_main_thread() const;
+ inline double thread_id() const;
+ inline void set_thread_id(double id);
+ inline worker::Worker* worker_context() const;
+ inline void set_worker_context(worker::Worker* context);
+ inline void add_sub_worker_context(worker::Worker* context);
+ inline void remove_sub_worker_context(worker::Worker* context);
+ void stop_sub_worker_contexts();
+
inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
inline void ThrowRangeError(const char* errmsg);
@@ -855,12 +877,15 @@ class Environment {
std::vector<double> destroy_async_id_list_;
AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_;
-
int should_not_abort_scope_counter_ = 0;
std::unique_ptr<performance::performance_state> performance_state_;
std::unordered_map<std::string, uint64_t> performance_marks_;
+
bool can_call_into_js_ = true;
+ double thread_id_ = 0;
+ std::unordered_set<worker::Worker*> sub_worker_contexts_;
+
#if HAVE_INSPECTOR
std::unique_ptr<inspector::Agent> inspector_agent_;
@@ -893,6 +918,8 @@ class Environment {
std::vector<std::unique_ptr<fs::FileHandleReadWrap>>
file_handle_read_wrap_freelist_;
+ worker::Worker* worker_context_ = nullptr;
+
struct ExitCallback {
void (*cb_)(void* arg);
void* arg_;
diff --git a/src/js_stream.cc b/src/js_stream.cc
index c766c322e3..e562a62f3d 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -44,7 +44,8 @@ bool JSStream::IsClosing() {
TryCatch try_catch(env()->isolate());
Local<Value> value;
if (!MakeCallback(env()->isclosing_string(), 0, nullptr).ToLocal(&value)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
return true;
}
return value->IsTrue();
@@ -59,7 +60,8 @@ int JSStream::ReadStart() {
int value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -73,7 +75,8 @@ int JSStream::ReadStop() {
int value_int = UV_EPROTO;
if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -94,7 +97,8 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) {
arraysize(argv),
argv).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
@@ -128,7 +132,8 @@ int JSStream::DoWrite(WriteWrap* w,
arraysize(argv),
argv).ToLocal(&value) ||
!value->Int32Value(env()->context()).To(&value_int)) {
- FatalException(env()->isolate(), try_catch);
+ if (!try_catch.HasTerminated())
+ FatalException(env()->isolate(), try_catch);
}
return value_int;
}
diff --git a/src/node.cc b/src/node.cc
index baa97281b0..663e4a222e 100644
--- a/src/node.cc
+++ b/src/node.cc
@@ -1021,9 +1021,9 @@ void AppendExceptionLine(Environment* env,
}
-static void ReportException(Environment* env,
- Local<Value> er,
- Local<Message> message) {
+void ReportException(Environment* env,
+ Local<Value> er,
+ Local<Message> message) {
CHECK(!er.IsEmpty());
HandleScope scope(env->isolate());
@@ -1110,9 +1110,9 @@ static void ReportException(Environment* env, const TryCatch& try_catch) {
// Executes a str within the current v8 context.
-static Local<Value> ExecuteString(Environment* env,
- Local<String> source,
- Local<String> filename) {
+static MaybeLocal<Value> ExecuteString(Environment* env,
+ Local<String> source,
+ Local<String> filename) {
EscapableHandleScope scope(env->isolate());
TryCatch try_catch(env->isolate());
@@ -1125,13 +1125,19 @@ static Local<Value> ExecuteString(Environment* env,
v8::Script::Compile(env->context(), source, &origin);
if (script.IsEmpty()) {
ReportException(env, try_catch);
- exit(3);
+ env->Exit(3);
+ return MaybeLocal<Value>();
}
MaybeLocal<Value> result = script.ToLocalChecked()->Run(env->context());
if (result.IsEmpty()) {
+ if (try_catch.HasTerminated()) {
+ env->isolate()->CancelTerminateExecution();
+ return MaybeLocal<Value>();
+ }
ReportException(env, try_catch);
- exit(4);
+ env->Exit(4);
+ return MaybeLocal<Value>();
}
return scope.Escape(result.ToLocalChecked());
@@ -1230,6 +1236,7 @@ static void Abort(const FunctionCallbackInfo<Value>& args) {
void Chdir(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsString());
@@ -1411,6 +1418,7 @@ static void GetEGid(const FunctionCallbackInfo<Value>& args) {
void SetGid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1430,6 +1438,7 @@ void SetGid(const FunctionCallbackInfo<Value>& args) {
void SetEGid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1449,6 +1458,7 @@ void SetEGid(const FunctionCallbackInfo<Value>& args) {
void SetUid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1468,6 +1478,7 @@ void SetUid(const FunctionCallbackInfo<Value>& args) {
void SetEUid(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ CHECK(env->is_main_thread());
CHECK_EQ(args.Length(), 1);
CHECK(args[0]->IsUint32() || args[0]->IsString());
@@ -1629,9 +1640,10 @@ static void WaitForInspectorDisconnect(Environment* env) {
static void Exit(const FunctionCallbackInfo<Value>& args) {
- WaitForInspectorDisconnect(Environment::GetCurrent(args));
+ Environment* env = Environment::GetCurrent(args);
+ WaitForInspectorDisconnect(env);
v8_platform.StopTracingAgent();
- exit(args[0]->Int32Value());
+ env->Exit(args[0]->Int32Value());
}
@@ -2040,6 +2052,9 @@ void FatalException(Isolate* isolate,
Local<Value> caught =
fatal_exception_function->Call(process_object, 1, &error);
+ if (fatal_try_catch.HasTerminated())
+ return;
+
if (fatal_try_catch.HasCaught()) {
// The fatal exception function threw, so we must exit
ReportException(env, fatal_try_catch);
@@ -2053,6 +2068,12 @@ void FatalException(Isolate* isolate,
void FatalException(Isolate* isolate, const TryCatch& try_catch) {
+ // If we try to print out a termination exception, we'd just get 'null',
+ // so just crashing here with that information seems like a better idea,
+ // and in particular it seems like we should handle terminations at the call
+ // site for this function rather than by printing them out somewhere.
+ CHECK(!try_catch.HasTerminated());
+
HandleScope scope(isolate);
if (!try_catch.IsVerbose()) {
FatalException(isolate, try_catch.Exception(), try_catch.Message());
@@ -2574,11 +2595,12 @@ void SetupProcessObject(Environment* env,
Local<Object> process = env->process_object();
auto title_string = FIXED_ONE_BYTE_STRING(env->isolate(), "title");
- CHECK(process->SetAccessor(env->context(),
- title_string,
- ProcessTitleGetter,
- ProcessTitleSetter,
- env->as_external()).FromJust());
+ CHECK(process->SetAccessor(
+ env->context(),
+ title_string,
+ ProcessTitleGetter,
+ env->is_main_thread() ? ProcessTitleSetter : nullptr,
+ env->as_external()).FromJust());
// process.version
READONLY_PROPERTY(process,
@@ -2862,25 +2884,27 @@ void SetupProcessObject(Environment* env,
CHECK(process->SetAccessor(env->context(),
debug_port_string,
DebugPortGetter,
- DebugPortSetter,
+ env->is_main_thread() ? DebugPortSetter : nullptr,
env->as_external()).FromJust());
// define various internal methods
- env->SetMethod(process,
- "_startProfilerIdleNotifier",
- StartProfilerIdleNotifier);
- env->SetMethod(process,
- "_stopProfilerIdleNotifier",
- StopProfilerIdleNotifier);
+ if (env->is_main_thread()) {
+ env->SetMethod(process,
+ "_startProfilerIdleNotifier",
+ StartProfilerIdleNotifier);
+ env->SetMethod(process,
+ "_stopProfilerIdleNotifier",
+ StopProfilerIdleNotifier);
+ env->SetMethod(process, "abort", Abort);
+ env->SetMethod(process, "chdir", Chdir);
+ env->SetMethod(process, "umask", Umask);
+ }
+
env->SetMethod(process, "_getActiveRequests", GetActiveRequests);
env->SetMethod(process, "_getActiveHandles", GetActiveHandles);
env->SetMethod(process, "reallyExit", Exit);
- env->SetMethod(process, "abort", Abort);
- env->SetMethod(process, "chdir", Chdir);
env->SetMethod(process, "cwd", Cwd);
- env->SetMethod(process, "umask", Umask);
-
#if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__)
env->SetMethod(process, "getuid", GetUid);
env->SetMethod(process, "geteuid", GetEUid);
@@ -2890,16 +2914,17 @@ void SetupProcessObject(Environment* env,
#endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__)
env->SetMethod(process, "_kill", Kill);
+ env->SetMethod(process, "dlopen", DLOpen);
- env->SetMethod(process, "_debugProcess", DebugProcess);
- env->SetMethod(process, "_debugEnd", DebugEnd);
+ if (env->is_main_thread()) {
+ env->SetMethod(process, "_debugProcess", DebugProcess);
+ env->SetMethod(process, "_debugEnd", DebugEnd);
+ }
env->SetMethod(process, "hrtime", Hrtime);
env->SetMethod(process, "cpuUsage", CPUUsage);
- env->SetMethod(process, "dlopen", DLOpen);
-
env->SetMethod(process, "uptime", Uptime);
env->SetMethod(process, "memoryUsage", MemoryUsage);
}
@@ -2935,8 +2960,10 @@ void RawDebug(const FunctionCallbackInfo<Value>& args) {
}
-static Local<Function> GetBootstrapper(Environment* env, Local<String> source,
- Local<String> script_name) {
+static MaybeLocal<Function> GetBootstrapper(
+ Environment* env,
+ Local<String> source,
+ Local<String> script_name) {
EscapableHandleScope scope(env->isolate());
TryCatch try_catch(env->isolate());
@@ -2947,16 +2974,17 @@ static Local<Function> GetBootstrapper(Environment* env, Local<String> source,
try_catch.SetVerbose(false);
// Execute the bootstrapper javascript file
- Local<Value> bootstrapper_v = ExecuteString(env, source, script_name);
+ MaybeLocal<Value> bootstrapper_v = ExecuteString(env, source, script_name);
+ if (bootstrapper_v.IsEmpty()) // This happens when execution was interrupted.
+ return MaybeLocal<Function>();
+
if (try_catch.HasCaught()) {
ReportException(env, try_catch);
exit(10);
}
- CHECK(bootstrapper_v->IsFunction());
- Local<Function> bootstrapper = Local<Function>::Cast(bootstrapper_v);
-
- return scope.Escape(bootstrapper);
+ CHECK(bootstrapper_v.ToLocalChecked()->IsFunction());
+ return scope.Escape(bootstrapper_v.ToLocalChecked().As<Function>());
}
static bool ExecuteBootstrapper(Environment* env, Local<Function> bootstrapper,
@@ -2995,13 +3023,18 @@ void LoadEnvironment(Environment* env) {
// node_js2c.
Local<String> loaders_name =
FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/loaders.js");
- Local<Function> loaders_bootstrapper =
+ MaybeLocal<Function> loaders_bootstrapper =
GetBootstrapper(env, LoadersBootstrapperSource(env), loaders_name);
Local<String> node_name =
FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/node.js");
- Local<Function> node_bootstrapper =
+ MaybeLocal<Function> node_bootstrapper =
GetBootstrapper(env, NodeBootstrapperSource(env), node_name);
+ if (loaders_bootstrapper.IsEmpty() || node_bootstrapper.IsEmpty()) {
+ // Execution was interrupted.
+ return;
+ }
+
// Add a reference to the global object
Local<Object> global = env->context()->Global();
@@ -3049,7 +3082,7 @@ void LoadEnvironment(Environment* env) {
// Bootstrap internal loaders
Local<Value> bootstrapped_loaders;
- if (!ExecuteBootstrapper(env, loaders_bootstrapper,
+ if (!ExecuteBootstrapper(env, loaders_bootstrapper.ToLocalChecked(),
arraysize(loaders_bootstrapper_args),
loaders_bootstrapper_args,
&bootstrapped_loaders)) {
@@ -3065,7 +3098,7 @@ void LoadEnvironment(Environment* env) {
bootstrapper,
bootstrapped_loaders
};
- if (!ExecuteBootstrapper(env, node_bootstrapper,
+ if (!ExecuteBootstrapper(env, node_bootstrapper.ToLocalChecked(),
arraysize(node_bootstrapper_args),
node_bootstrapper_args,
&bootstrapped_node)) {
@@ -4279,6 +4312,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data,
WaitForInspectorDisconnect(&env);
env.set_can_call_into_js(false);
+ env.stop_sub_worker_contexts();
env.RunCleanup();
RunAtExit(&env);
diff --git a/src/node_errors.h b/src/node_errors.h
index 931ce7b8fd..2c97088cc5 100644
--- a/src/node_errors.h
+++ b/src/node_errors.h
@@ -34,6 +34,7 @@ namespace node {
V(ERR_MISSING_ARGS, TypeError) \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \
V(ERR_MISSING_MODULE, Error) \
+ V(ERR_MISSING_PLATFORM_FOR_WORKER, Error) \
V(ERR_SCRIPT_EXECUTION_INTERRUPTED, Error) \
V(ERR_SCRIPT_EXECUTION_TIMEOUT, Error) \
V(ERR_STRING_TOO_LONG, Error) \
@@ -68,6 +69,9 @@ namespace node {
V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \
V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \
"MessagePort was found in message but not listed in transferList") \
+ V(ERR_MISSING_PLATFORM_FOR_WORKER, \
+ "The V8 platform used by this instance of Node does not support " \
+ "creating Workers") \
V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \
"Script execution was interrupted by `SIGINT`") \
V(ERR_TRANSFERRING_EXTERNALIZED_SHAREDARRAYBUFFER, \
diff --git a/src/node_internals.h b/src/node_internals.h
index a5d8ed0e5d..7760eb26c6 100644
--- a/src/node_internals.h
+++ b/src/node_internals.h
@@ -137,6 +137,7 @@ struct sockaddr;
V(util) \
V(uv) \
V(v8) \
+ V(worker) \
V(zlib)
#define NODE_BUILTIN_MODULES(V) \
@@ -314,6 +315,10 @@ class FatalTryCatch : public v8::TryCatch {
Environment* env_;
};
+void ReportException(Environment* env,
+ v8::Local<v8::Value> er,
+ v8::Local<v8::Message> message);
+
v8::Maybe<bool> ProcessEmitWarning(Environment* env, const char* fmt, ...);
v8::Maybe<bool> ProcessEmitDeprecationWarning(Environment* env,
const char* warning,
diff --git a/src/node_messaging.cc b/src/node_messaging.cc
index b56cef2d77..352749ea48 100644
--- a/src/node_messaging.cc
+++ b/src/node_messaging.cc
@@ -57,7 +57,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate {
if (!deserializer->ReadUint32(&id))
return MaybeLocal<Object>();
CHECK_LE(id, message_ports_.size());
- return message_ports_[id]->object();
+ return message_ports_[id]->object(isolate);
};
MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId(
@@ -436,7 +436,7 @@ MessagePort* MessagePort::New(
void MessagePort::OnMessage() {
HandleScope handle_scope(env()->isolate());
- Local<Context> context = object()->CreationContext();
+ Local<Context> context = object(env()->isolate())->CreationContext();
// data_ can only ever be modified by the owner thread, so no need to lock.
// However, the message port may be transferred while it is processing
@@ -447,6 +447,13 @@ void MessagePort::OnMessage() {
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);
+
+ if (stop_event_loop_) {
+ CHECK(!data_->receiving_messages_);
+ uv_stop(env()->event_loop());
+ break;
+ }
+
if (!data_->receiving_messages_)
break;
if (data_->incoming_messages_.empty())
@@ -514,8 +521,9 @@ void MessagePort::Send(Message&& message) {
void MessagePort::Send(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
+ Local<Context> context = object(env->isolate())->CreationContext();
Message msg;
- if (msg.Serialize(env, object()->CreationContext(), args[0], args[1])
+ if (msg.Serialize(env, context, args[0], args[1])
.IsNothing()) {
return;
}
@@ -548,6 +556,14 @@ void MessagePort::Stop() {
data_->receiving_messages_ = false;
}
+void MessagePort::StopEventLoop() {
+ Mutex::ScopedLock lock(data_->mutex_);
+ data_->receiving_messages_ = false;
+ stop_event_loop_ = true;
+
+ TriggerAsync();
+}
+
void MessagePort::Start(const FunctionCallbackInfo<Value>& args) {
Environment* env = Environment::GetCurrent(args);
MessagePort* port;
@@ -570,6 +586,12 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) {
port->Stop();
}
+void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
+ MessagePort* port;
+ ASSIGN_OR_RETURN_UNWRAP(&port, args.This());
+ port->OnMessage();
+}
+
size_t MessagePort::self_size() const {
Mutex::ScopedLock lock(data_->mutex_);
size_t sz = sizeof(*this) + sizeof(*data_);
@@ -604,6 +626,7 @@ MaybeLocal<Function> GetMessagePortConstructor(
env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage);
env->SetProtoMethod(m, "start", MessagePort::Start);
env->SetProtoMethod(m, "stop", MessagePort::Stop);
+ env->SetProtoMethod(m, "drain", MessagePort::Drain);
env->SetProtoMethod(m, "close", HandleWrap::Close);
env->SetProtoMethod(m, "unref", HandleWrap::Unref);
env->SetProtoMethod(m, "ref", HandleWrap::Ref);
diff --git a/src/node_messaging.h b/src/node_messaging.h
index ff8fcc7243..9a13437d19 100644
--- a/src/node_messaging.h
+++ b/src/node_messaging.h
@@ -133,11 +133,15 @@ class MessagePort : public HandleWrap {
void Start();
// Stop processing messages on this port as a receiving end.
void Stop();
+ // Stop processing messages on this port as a receiving end,
+ // and stop the event loop that this port is associated with.
+ void StopEventLoop();
static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args);
// Turns `a` and `b` into siblings, i.e. connects the sending side of one
// to the receiving side of the other. This is not thread-safe.
@@ -160,6 +164,7 @@ class MessagePort : public HandleWrap {
inline uv_async_t* async();
std::unique_ptr<MessagePortData> data_ = nullptr;
+ bool stop_event_loop_ = false;
friend class MessagePortData;
};
diff --git a/src/node_worker.cc b/src/node_worker.cc
new file mode 100644
index 0000000000..366dca353d
--- /dev/null
+++ b/src/node_worker.cc
@@ -0,0 +1,428 @@
+#include "node_worker.h"
+#include "node_errors.h"
+#include "node_internals.h"
+#include "node_buffer.h"
+#include "node_perf.h"
+#include "util.h"
+#include "util-inl.h"
+#include "async_wrap.h"
+#include "async_wrap-inl.h"
+
+using v8::ArrayBuffer;
+using v8::Context;
+using v8::Function;
+using v8::FunctionCallbackInfo;
+using v8::FunctionTemplate;
+using v8::HandleScope;
+using v8::Integer;
+using v8::Isolate;
+using v8::Local;
+using v8::Locker;
+using v8::Number;
+using v8::Object;
+using v8::SealHandleScope;
+using v8::String;
+using v8::Value;
+
+namespace node {
+namespace worker {
+
+namespace {
+
+double next_thread_id = 1;
+Mutex next_thread_id_mutex;
+
+} // anonymous namespace
+
+Worker::Worker(Environment* env, Local<Object> wrap)
+ : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER) {
+ // Generate a new thread id.
+ {
+ Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
+ thread_id_ = next_thread_id++;
+ }
+ wrap->Set(env->context(),
+ env->thread_id_string(),
+ Number::New(env->isolate(), thread_id_)).FromJust();
+
+ // Set up everything that needs to be set up in the parent environment.
+ parent_port_ = MessagePort::New(env, env->context());
+ if (parent_port_ == nullptr) {
+ // This can happen e.g. because execution is terminating.
+ return;
+ }
+
+ child_port_data_.reset(new MessagePortData(nullptr));
+ MessagePort::Entangle(parent_port_, child_port_data_.get());
+
+ object()->Set(env->context(),
+ env->message_port_string(),
+ parent_port_->object()).FromJust();
+
+ array_buffer_allocator_.reset(CreateArrayBufferAllocator());
+
+ isolate_ = NewIsolate(array_buffer_allocator_.get());
+ CHECK_NE(isolate_, nullptr);
+ CHECK_EQ(uv_loop_init(&loop_), 0);
+
+ thread_exit_async_.reset(new uv_async_t);
+ thread_exit_async_->data = this;
+ CHECK_EQ(uv_async_init(env->event_loop(),
+ thread_exit_async_.get(),
+ [](uv_async_t* handle) {
+ static_cast<Worker*>(handle->data)->OnThreadStopped();
+ }), 0);
+
+ {
+ // Enter an environment capable of executing code in the child Isolate
+ // (and only in it).
+ Locker locker(isolate_);
+ Isolate::Scope isolate_scope(isolate_);
+ HandleScope handle_scope(isolate_);
+
+ isolate_data_.reset(CreateIsolateData(isolate_,
+ &loop_,
+ env->isolate_data()->platform(),
+ array_buffer_allocator_.get()));
+ CHECK(isolate_data_);
+
+ Local<Context> context = NewContext(isolate_);
+ Context::Scope context_scope(context);
+
+ // TODO(addaleax): Use CreateEnvironment(), or generally another public API.
+ env_.reset(new Environment(isolate_data_.get(),
+ context,
+ nullptr));
+ CHECK_NE(env_, nullptr);
+ env_->set_abort_on_uncaught_exception(false);
+ env_->set_worker_context(this);
+ env_->set_thread_id(thread_id_);
+
+ env_->Start(0, nullptr, 0, nullptr, env->profiler_idle_notifier_started());
+ }
+
+ // The new isolate won't be bothered on this thread again.
+ isolate_->DiscardThreadSpecificMetadata();
+}
+
+bool Worker::is_stopped() const {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ return stopped_;
+}
+
+void Worker::Run() {
+ MultiIsolatePlatform* platform = isolate_data_->platform();
+ CHECK_NE(platform, nullptr);
+
+ {
+ Locker locker(isolate_);
+ Isolate::Scope isolate_scope(isolate_);
+ SealHandleScope outer_seal(isolate_);
+
+ {
+ Context::Scope context_scope(env_->context());
+ HandleScope handle_scope(isolate_);
+
+ {
+ HandleScope handle_scope(isolate_);
+ Mutex::ScopedLock lock(mutex_);
+ // Set up the message channel for receiving messages in the child.
+ child_port_ = MessagePort::New(env_.get(),
+ env_->context(),
+ std::move(child_port_data_));
+ // MessagePort::New() may return nullptr if execution is terminated
+ // within it.
+ if (child_port_ != nullptr)
+ env_->set_message_port(child_port_->object(isolate_));
+ }
+
+ if (!is_stopped()) {
+ HandleScope handle_scope(isolate_);
+ Environment::AsyncCallbackScope callback_scope(env_.get());
+ env_->async_hooks()->push_async_ids(1, 0);
+ // This loads the Node bootstrapping code.
+ LoadEnvironment(env_.get());
+ env_->async_hooks()->pop_async_id(1);
+ }
+
+ {
+ SealHandleScope seal(isolate_);
+ bool more;
+ env_->performance_state()->Mark(
+ node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
+ do {
+ if (is_stopped()) break;
+ uv_run(&loop_, UV_RUN_DEFAULT);
+ if (is_stopped()) break;
+
+ platform->DrainBackgroundTasks(isolate_);
+
+ more = uv_loop_alive(&loop_);
+ if (more && !is_stopped())
+ continue;
+
+ EmitBeforeExit(env_.get());
+
+ // Emit `beforeExit` if the loop became alive either after emitting
+ // event, or after running some callbacks.
+ more = uv_loop_alive(&loop_);
+ } while (more == true);
+ env_->performance_state()->Mark(
+ node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
+ }
+ }
+
+ {
+ int exit_code;
+ bool stopped = is_stopped();
+ if (!stopped)
+ exit_code = EmitExit(env_.get());
+ Mutex::ScopedLock lock(mutex_);
+ if (exit_code_ == 0 && !stopped)
+ exit_code_ = exit_code;
+ }
+
+ env_->set_can_call_into_js(false);
+ Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_,
+ Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE);
+
+ // Grab the parent-to-child channel and render is unusable.
+ MessagePort* child_port;
+ {
+ Mutex::ScopedLock lock(mutex_);
+ child_port = child_port_;
+ child_port_ = nullptr;
+ }
+
+ {
+ Context::Scope context_scope(env_->context());
+ child_port->Close();
+ env_->stop_sub_worker_contexts();
+ env_->RunCleanup();
+ RunAtExit(env_.get());
+
+ {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ stopped_ = true;
+ }
+
+ env_->RunCleanup();
+
+ // This call needs to be made while the `Environment` is still alive
+ // because we assume that it is available for async tracking in the
+ // NodePlatform implementation.
+ platform->DrainBackgroundTasks(isolate_);
+ }
+
+ env_.reset();
+ }
+
+ DisposeIsolate();
+
+ // Need to run the loop one more time to close the platform's uv_async_t
+ uv_run(&loop_, UV_RUN_ONCE);
+
+ {
+ Mutex::ScopedLock lock(mutex_);
+ CHECK(thread_exit_async_);
+ scheduled_on_thread_stopped_ = true;
+ uv_async_send(thread_exit_async_.get());
+ }
+}
+
+void Worker::DisposeIsolate() {
+ if (isolate_ == nullptr)
+ return;
+
+ CHECK(isolate_data_);
+ MultiIsolatePlatform* platform = isolate_data_->platform();
+ platform->CancelPendingDelayedTasks(isolate_);
+
+ isolate_data_.reset();
+
+ isolate_->Dispose();
+ isolate_ = nullptr;
+}
+
+void Worker::JoinThread() {
+ if (thread_joined_)
+ return;
+ CHECK_EQ(uv_thread_join(&tid_), 0);
+ thread_joined_ = true;
+
+ env()->remove_sub_worker_context(this);
+
+ if (thread_exit_async_) {
+ env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) {
+ delete async;
+ });
+
+ if (scheduled_on_thread_stopped_)
+ OnThreadStopped();
+ }
+}
+
+void Worker::OnThreadStopped() {
+ Mutex::ScopedLock lock(mutex_);
+ scheduled_on_thread_stopped_ = false;
+
+ {
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ CHECK(stopped_);
+ }
+
+ CHECK_EQ(child_port_, nullptr);
+ parent_port_ = nullptr;
+
+ // It's okay to join the thread while holding the mutex because
+ // OnThreadStopped means it's no longer doing any work that might grab it
+ // and really just silently exiting.
+ JoinThread();
+
+ {
+ HandleScope handle_scope(env()->isolate());
+ Context::Scope context_scope(env()->context());
+
+ // Reset the parent port as we're closing it now anyway.
+ object()->Set(env()->context(),
+ env()->message_port_string(),
+ Undefined(env()->isolate())).FromJust();
+
+ Local<Value> code = Integer::New(env()->isolate(), exit_code_);
+ MakeCallback(env()->onexit_string(), 1, &code);
+ }
+
+ // JoinThread() cleared all libuv handles bound to this Worker,
+ // the C++ object is no longer needed for anything now.
+ MakeWeak();
+}
+
+Worker::~Worker() {
+ Mutex::ScopedLock lock(mutex_);
+ JoinThread();
+
+ CHECK(stopped_);
+ CHECK(thread_joined_);
+ CHECK_EQ(child_port_, nullptr);
+ CHECK_EQ(uv_loop_close(&loop_), 0);
+
+ // This has most likely already happened within the worker thread -- this
+ // is just in case Worker creation failed early.
+ DisposeIsolate();
+}
+
+void Worker::New(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+
+ CHECK(args.IsConstructCall());
+
+ if (env->isolate_data()->platform() == nullptr) {
+ THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env);
+ return;
+ }
+
+ new Worker(env, args.This());
+}
+
+void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ Mutex::ScopedLock lock(w->mutex_);
+
+ w->env()->add_sub_worker_context(w);
+ w->stopped_ = false;
+ CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
+ static_cast<Worker*>(arg)->Run();
+ }, static_cast<void*>(w)), 0);
+ w->thread_joined_ = false;
+}
+
+void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+
+ w->Exit(1);
+ w->JoinThread();
+}
+
+void Worker::Ref(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ if (w->thread_exit_async_)
+ uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+}
+
+void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
+ Worker* w;
+ ASSIGN_OR_RETURN_UNWRAP(&w, args.This());
+ if (w->thread_exit_async_)
+ uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get()));
+}
+
+void Worker::Exit(int code) {
+ Mutex::ScopedLock lock(mutex_);
+ Mutex::ScopedLock stopped_lock(stopped_mutex_);
+ if (!stopped_) {
+ CHECK_NE(env_, nullptr);
+ stopped_ = true;
+ exit_code_ = code;
+ if (child_port_ != nullptr)
+ child_port_->StopEventLoop();
+ isolate_->TerminateExecution();
+ }
+}
+
+size_t Worker::self_size() const {
+ return sizeof(*this);
+}
+
+namespace {
+
+// Return the MessagePort that is global for this Environment and communicates
+// with the internal [kPort] port of the JS Worker class in the parent thread.
+void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) {
+ Environment* env = Environment::GetCurrent(args);
+ Local<Object> port = env->message_port();
+ if (!port.IsEmpty()) {
+ CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate());
+ args.GetReturnValue().Set(port);
+ }
+}
+
+void InitWorker(Local<Object> target,
+ Local<Value> unused,
+ Local<Context> context,
+ void* priv) {
+ Environment* env = Environment::GetCurrent(context);
+
+ {
+ Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New);
+
+ w->InstanceTemplate()->SetInternalFieldCount(1);
+
+ AsyncWrap::AddWrapMethods(env, w);
+ env->SetProtoMethod(w, "startThread", Worker::StartThread);
+ env->SetProtoMethod(w, "stopThread", Worker::StopThread);
+ env->SetProtoMethod(w, "ref", Worker::Ref);
+ env->SetProtoMethod(w, "unref", Worker::Unref);
+
+ Local<String> workerString =
+ FIXED_ONE_BYTE_STRING(env->isolate(), "Worker");
+ w->SetClassName(workerString);
+ target->Set(workerString, w->GetFunction());
+ }
+
+ env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort);
+
+ auto thread_id_string = FIXED_ONE_BYTE_STRING(env->isolate(), "threadId");
+ target->Set(env->context(),
+ thread_id_string,
+ Number::New(env->isolate(), env->thread_id())).FromJust();
+}
+
+} // anonymous namespace
+
+} // namespace worker
+} // namespace node
+
+NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker)
diff --git a/src/node_worker.h b/src/node_worker.h
new file mode 100644
index 0000000000..0a98d2f11e
--- /dev/null
+++ b/src/node_worker.h
@@ -0,0 +1,83 @@
+#ifndef SRC_NODE_WORKER_H_
+#define SRC_NODE_WORKER_H_
+
+#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+#include "node_messaging.h"
+#include <unordered_map>
+
+namespace node {
+namespace worker {
+
+// A worker thread, as represented in its parent thread.
+class Worker : public AsyncWrap {
+ public:
+ Worker(Environment* env, v8::Local<v8::Object> wrap);
+ ~Worker();
+
+ // Run the worker. This is only called from the worker thread.
+ void Run();
+
+ // Forcibly exit the thread with a specified exit code. This may be called
+ // from any thread.
+ void Exit(int code);
+
+ // Wait for the worker thread to stop (in a blocking manner).
+ void JoinThread();
+
+ size_t self_size() const override;
+ bool is_stopped() const;
+
+ static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void StartThread(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void StopThread(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void GetMessagePort(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Ref(const v8::FunctionCallbackInfo<v8::Value>& args);
+ static void Unref(const v8::FunctionCallbackInfo<v8::Value>& args);
+
+ private:
+ void OnThreadStopped();
+ void DisposeIsolate();
+
+ uv_loop_t loop_;
+ DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_;
+ DeleteFnPtr<Environment, FreeEnvironment> env_;
+ v8::Isolate* isolate_ = nullptr;
+ DeleteFnPtr<ArrayBufferAllocator, FreeArrayBufferAllocator>
+ array_buffer_allocator_;
+ uv_thread_t tid_;
+
+ // This mutex protects access to all variables listed below it.
+ mutable Mutex mutex_;
+
+ // Currently only used for telling the parent thread that the child
+ // thread exited.
+ std::unique_ptr<uv_async_t> thread_exit_async_;
+ bool scheduled_on_thread_stopped_ = false;
+
+ // This mutex only protects stopped_. If both locks are acquired, this needs
+ // to be the latter one.
+ mutable Mutex stopped_mutex_;
+ bool stopped_ = true;
+
+ bool thread_joined_ = true;
+ int exit_code_ = 0;
+ double thread_id_ = -1;
+
+ std::unique_ptr<MessagePortData> child_port_data_;
+
+ // The child port is always kept alive by the child Environment's persistent
+ // handle to it.
+ MessagePort* child_port_ = nullptr;
+ // This is always kept alive because the JS object associated with the Worker
+ // instance refers to it via its [kPort] property.
+ MessagePort* parent_port_ = nullptr;
+};
+
+} // namespace worker
+} // namespace node
+
+#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS
+
+
+#endif // SRC_NODE_WORKER_H_