diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/async_wrap.h | 1 | ||||
-rw-r--r-- | src/base_object-inl.h | 8 | ||||
-rw-r--r-- | src/base_object.h | 4 | ||||
-rw-r--r-- | src/bootstrapper.cc | 14 | ||||
-rw-r--r-- | src/callback_scope.cc | 5 | ||||
-rw-r--r-- | src/env-inl.h | 31 | ||||
-rw-r--r-- | src/env.cc | 27 | ||||
-rw-r--r-- | src/env.h | 29 | ||||
-rw-r--r-- | src/js_stream.cc | 15 | ||||
-rw-r--r-- | src/node.cc | 116 | ||||
-rw-r--r-- | src/node_errors.h | 4 | ||||
-rw-r--r-- | src/node_internals.h | 5 | ||||
-rw-r--r-- | src/node_messaging.cc | 29 | ||||
-rw-r--r-- | src/node_messaging.h | 5 | ||||
-rw-r--r-- | src/node_worker.cc | 428 | ||||
-rw-r--r-- | src/node_worker.h | 83 |
16 files changed, 746 insertions, 58 deletions
diff --git a/src/async_wrap.h b/src/async_wrap.h index cf269a4c1f..b2f96477b4 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -67,6 +67,7 @@ namespace node { V(TTYWRAP) \ V(UDPSENDWRAP) \ V(UDPWRAP) \ + V(WORKER) \ V(WRITEWRAP) \ V(ZLIB) diff --git a/src/base_object-inl.h b/src/base_object-inl.h index 3bd854639b..06a2922397 100644 --- a/src/base_object-inl.h +++ b/src/base_object-inl.h @@ -65,6 +65,14 @@ v8::Local<v8::Object> BaseObject::object() { return PersistentToLocal(env_->isolate(), persistent_handle_); } +v8::Local<v8::Object> BaseObject::object(v8::Isolate* isolate) { + v8::Local<v8::Object> handle = object(); +#ifdef DEBUG + CHECK_EQ(handle->CreationContext()->GetIsolate(), isolate); + CHECK_EQ(env_->isolate(), isolate); +#endif + return handle; +} Environment* BaseObject::env() const { return env_; diff --git a/src/base_object.h b/src/base_object.h index e0b6084340..38291d598f 100644 --- a/src/base_object.h +++ b/src/base_object.h @@ -43,6 +43,10 @@ class BaseObject { // persistent.IsEmpty() is true. inline v8::Local<v8::Object> object(); + // Same as the above, except it additionally verifies that this object + // is associated with the passed Isolate in debug mode. + inline v8::Local<v8::Object> object(v8::Isolate* isolate); + inline Persistent<v8::Object>& persistent(); inline Environment* env() const; diff --git a/src/bootstrapper.cc b/src/bootstrapper.cc index 35c7c4dc69..f9db02562d 100644 --- a/src/bootstrapper.cc +++ b/src/bootstrapper.cc @@ -114,12 +114,14 @@ void SetupBootstrapObject(Environment* env, BOOTSTRAP_METHOD(_umask, Umask); #if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__) - BOOTSTRAP_METHOD(_initgroups, InitGroups); - BOOTSTRAP_METHOD(_setegid, SetEGid); - BOOTSTRAP_METHOD(_seteuid, SetEUid); - BOOTSTRAP_METHOD(_setgid, SetGid); - BOOTSTRAP_METHOD(_setuid, SetUid); - BOOTSTRAP_METHOD(_setgroups, SetGroups); + if (env->is_main_thread()) { + BOOTSTRAP_METHOD(_initgroups, InitGroups); + BOOTSTRAP_METHOD(_setegid, SetEGid); + BOOTSTRAP_METHOD(_seteuid, SetEUid); + BOOTSTRAP_METHOD(_setgid, SetGid); + BOOTSTRAP_METHOD(_setuid, SetUid); + BOOTSTRAP_METHOD(_setgroups, SetGroups); + } #endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__) Local<String> should_abort_on_uncaught_toggle = diff --git a/src/callback_scope.cc b/src/callback_scope.cc index 9eac7beb03..23e6d5b063 100644 --- a/src/callback_scope.cc +++ b/src/callback_scope.cc @@ -79,6 +79,11 @@ void InternalCallbackScope::Close() { closed_ = true; HandleScope handle_scope(env_->isolate()); + if (!env_->can_call_into_js()) return; + if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) { + env_->async_hooks()->clear_async_id_stack(); + } + if (pushed_ids_) env_->async_hooks()->pop_async_id(async_context_.async_id); diff --git a/src/env-inl.h b/src/env-inl.h index 50328bd77c..eeb419b4a0 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -582,13 +582,42 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb, } inline bool Environment::can_call_into_js() const { - return can_call_into_js_; + return can_call_into_js_ && (is_main_thread() || !is_stopping_worker()); } inline void Environment::set_can_call_into_js(bool can_call_into_js) { can_call_into_js_ = can_call_into_js; } +inline bool Environment::is_main_thread() const { + return thread_id_ == 0; +} + +inline double Environment::thread_id() const { + return thread_id_; +} + +inline void Environment::set_thread_id(double id) { + thread_id_ = id; +} + +inline worker::Worker* Environment::worker_context() const { + return worker_context_; +} + +inline void Environment::set_worker_context(worker::Worker* context) { + CHECK_EQ(worker_context_, nullptr); // Should be set only once. + worker_context_ = context; +} + +inline void Environment::add_sub_worker_context(worker::Worker* context) { + sub_worker_contexts_.insert(context); +} + +inline void Environment::remove_sub_worker_context(worker::Worker* context) { + sub_worker_contexts_.erase(context); +} + inline performance::performance_state* Environment::performance_state() { return performance_state_.get(); } diff --git a/src/env.cc b/src/env.cc index 090b43968b..8df59d1546 100644 --- a/src/env.cc +++ b/src/env.cc @@ -4,6 +4,7 @@ #include "node_buffer.h" #include "node_platform.h" #include "node_file.h" +#include "node_worker.h" #include "tracing/agent.h" #include <stdio.h> @@ -25,6 +26,7 @@ using v8::StackTrace; using v8::String; using v8::Symbol; using v8::Value; +using worker::Worker; IsolateData::IsolateData(Isolate* isolate, uv_loop_t* event_loop, @@ -444,7 +446,9 @@ void Environment::RunAndClearNativeImmediates() { if (it->refed_) ref_count++; if (UNLIKELY(try_catch.HasCaught())) { - FatalException(isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(isolate(), try_catch); + // Bail out, remove the already executed callbacks from list // and set up a new TryCatch for the other pending callbacks. std::move_backward(it, list.end(), list.begin() + (list.end() - it)); @@ -632,4 +636,25 @@ void Environment::AsyncHooks::grow_async_ids_stack() { uv_key_t Environment::thread_local_env = {}; +void Environment::Exit(int exit_code) { + if (is_main_thread()) + exit(exit_code); + else + worker_context_->Exit(exit_code); +} + +void Environment::stop_sub_worker_contexts() { + while (!sub_worker_contexts_.empty()) { + Worker* w = *sub_worker_contexts_.begin(); + remove_sub_worker_context(w); + w->Exit(1); + w->JoinThread(); + } +} + +bool Environment::is_stopping_worker() const { + CHECK(!is_main_thread()); + return worker_context_->is_stopped(); +} + } // namespace node @@ -55,6 +55,10 @@ namespace performance { class performance_state; } +namespace worker { +class Worker; +} + namespace loader { class ModuleWrap; @@ -193,7 +197,10 @@ struct PackageConfig { V(mac_string, "mac") \ V(main_string, "main") \ V(max_buffer_string, "maxBuffer") \ + V(max_semi_space_size_string, "maxSemiSpaceSize") \ + V(max_old_space_size_string, "maxOldSpaceSize") \ V(message_string, "message") \ + V(message_port_string, "messagePort") \ V(message_port_constructor_string, "MessagePort") \ V(minttl_string, "minttl") \ V(modulus_string, "modulus") \ @@ -280,6 +287,7 @@ struct PackageConfig { V(subject_string, "subject") \ V(subjectaltname_string, "subjectaltname") \ V(syscall_string, "syscall") \ + V(thread_id_string, "threadId") \ V(ticketkeycallback_string, "onticketkeycallback") \ V(timeout_string, "timeout") \ V(tls_ticket_string, "tlsTicket") \ @@ -328,6 +336,7 @@ struct PackageConfig { V(http2stream_constructor_template, v8::ObjectTemplate) \ V(immediate_callback_function, v8::Function) \ V(inspector_console_api_object, v8::Object) \ + V(message_port, v8::Object) \ V(message_port_constructor_template, v8::FunctionTemplate) \ V(pbkdf2_constructor_template, v8::ObjectTemplate) \ V(pipe_constructor_template, v8::FunctionTemplate) \ @@ -601,6 +610,7 @@ class Environment { void RegisterHandleCleanups(); void CleanupHandles(); + void Exit(int code); // Register clean-up cb to be called on environment destruction. inline void RegisterHandleCleanup(uv_handle_t* handle, @@ -714,6 +724,18 @@ class Environment { inline bool can_call_into_js() const; inline void set_can_call_into_js(bool can_call_into_js); + // TODO(addaleax): This should be inline. + bool is_stopping_worker() const; + + inline bool is_main_thread() const; + inline double thread_id() const; + inline void set_thread_id(double id); + inline worker::Worker* worker_context() const; + inline void set_worker_context(worker::Worker* context); + inline void add_sub_worker_context(worker::Worker* context); + inline void remove_sub_worker_context(worker::Worker* context); + void stop_sub_worker_contexts(); + inline void ThrowError(const char* errmsg); inline void ThrowTypeError(const char* errmsg); inline void ThrowRangeError(const char* errmsg); @@ -855,12 +877,15 @@ class Environment { std::vector<double> destroy_async_id_list_; AliasedBuffer<uint32_t, v8::Uint32Array> should_abort_on_uncaught_toggle_; - int should_not_abort_scope_counter_ = 0; std::unique_ptr<performance::performance_state> performance_state_; std::unordered_map<std::string, uint64_t> performance_marks_; + bool can_call_into_js_ = true; + double thread_id_ = 0; + std::unordered_set<worker::Worker*> sub_worker_contexts_; + #if HAVE_INSPECTOR std::unique_ptr<inspector::Agent> inspector_agent_; @@ -893,6 +918,8 @@ class Environment { std::vector<std::unique_ptr<fs::FileHandleReadWrap>> file_handle_read_wrap_freelist_; + worker::Worker* worker_context_ = nullptr; + struct ExitCallback { void (*cb_)(void* arg); void* arg_; diff --git a/src/js_stream.cc b/src/js_stream.cc index c766c322e3..e562a62f3d 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -44,7 +44,8 @@ bool JSStream::IsClosing() { TryCatch try_catch(env()->isolate()); Local<Value> value; if (!MakeCallback(env()->isclosing_string(), 0, nullptr).ToLocal(&value)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); return true; } return value->IsTrue(); @@ -59,7 +60,8 @@ int JSStream::ReadStart() { int value_int = UV_EPROTO; if (!MakeCallback(env()->onreadstart_string(), 0, nullptr).ToLocal(&value) || !value->Int32Value(env()->context()).To(&value_int)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); } return value_int; } @@ -73,7 +75,8 @@ int JSStream::ReadStop() { int value_int = UV_EPROTO; if (!MakeCallback(env()->onreadstop_string(), 0, nullptr).ToLocal(&value) || !value->Int32Value(env()->context()).To(&value_int)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); } return value_int; } @@ -94,7 +97,8 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) { arraysize(argv), argv).ToLocal(&value) || !value->Int32Value(env()->context()).To(&value_int)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); } return value_int; } @@ -128,7 +132,8 @@ int JSStream::DoWrite(WriteWrap* w, arraysize(argv), argv).ToLocal(&value) || !value->Int32Value(env()->context()).To(&value_int)) { - FatalException(env()->isolate(), try_catch); + if (!try_catch.HasTerminated()) + FatalException(env()->isolate(), try_catch); } return value_int; } diff --git a/src/node.cc b/src/node.cc index baa97281b0..663e4a222e 100644 --- a/src/node.cc +++ b/src/node.cc @@ -1021,9 +1021,9 @@ void AppendExceptionLine(Environment* env, } -static void ReportException(Environment* env, - Local<Value> er, - Local<Message> message) { +void ReportException(Environment* env, + Local<Value> er, + Local<Message> message) { CHECK(!er.IsEmpty()); HandleScope scope(env->isolate()); @@ -1110,9 +1110,9 @@ static void ReportException(Environment* env, const TryCatch& try_catch) { // Executes a str within the current v8 context. -static Local<Value> ExecuteString(Environment* env, - Local<String> source, - Local<String> filename) { +static MaybeLocal<Value> ExecuteString(Environment* env, + Local<String> source, + Local<String> filename) { EscapableHandleScope scope(env->isolate()); TryCatch try_catch(env->isolate()); @@ -1125,13 +1125,19 @@ static Local<Value> ExecuteString(Environment* env, v8::Script::Compile(env->context(), source, &origin); if (script.IsEmpty()) { ReportException(env, try_catch); - exit(3); + env->Exit(3); + return MaybeLocal<Value>(); } MaybeLocal<Value> result = script.ToLocalChecked()->Run(env->context()); if (result.IsEmpty()) { + if (try_catch.HasTerminated()) { + env->isolate()->CancelTerminateExecution(); + return MaybeLocal<Value>(); + } ReportException(env, try_catch); - exit(4); + env->Exit(4); + return MaybeLocal<Value>(); } return scope.Escape(result.ToLocalChecked()); @@ -1230,6 +1236,7 @@ static void Abort(const FunctionCallbackInfo<Value>& args) { void Chdir(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsString()); @@ -1411,6 +1418,7 @@ static void GetEGid(const FunctionCallbackInfo<Value>& args) { void SetGid(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsUint32() || args[0]->IsString()); @@ -1430,6 +1438,7 @@ void SetGid(const FunctionCallbackInfo<Value>& args) { void SetEGid(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsUint32() || args[0]->IsString()); @@ -1449,6 +1458,7 @@ void SetEGid(const FunctionCallbackInfo<Value>& args) { void SetUid(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsUint32() || args[0]->IsString()); @@ -1468,6 +1478,7 @@ void SetUid(const FunctionCallbackInfo<Value>& args) { void SetEUid(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); + CHECK(env->is_main_thread()); CHECK_EQ(args.Length(), 1); CHECK(args[0]->IsUint32() || args[0]->IsString()); @@ -1629,9 +1640,10 @@ static void WaitForInspectorDisconnect(Environment* env) { static void Exit(const FunctionCallbackInfo<Value>& args) { - WaitForInspectorDisconnect(Environment::GetCurrent(args)); + Environment* env = Environment::GetCurrent(args); + WaitForInspectorDisconnect(env); v8_platform.StopTracingAgent(); - exit(args[0]->Int32Value()); + env->Exit(args[0]->Int32Value()); } @@ -2040,6 +2052,9 @@ void FatalException(Isolate* isolate, Local<Value> caught = fatal_exception_function->Call(process_object, 1, &error); + if (fatal_try_catch.HasTerminated()) + return; + if (fatal_try_catch.HasCaught()) { // The fatal exception function threw, so we must exit ReportException(env, fatal_try_catch); @@ -2053,6 +2068,12 @@ void FatalException(Isolate* isolate, void FatalException(Isolate* isolate, const TryCatch& try_catch) { + // If we try to print out a termination exception, we'd just get 'null', + // so just crashing here with that information seems like a better idea, + // and in particular it seems like we should handle terminations at the call + // site for this function rather than by printing them out somewhere. + CHECK(!try_catch.HasTerminated()); + HandleScope scope(isolate); if (!try_catch.IsVerbose()) { FatalException(isolate, try_catch.Exception(), try_catch.Message()); @@ -2574,11 +2595,12 @@ void SetupProcessObject(Environment* env, Local<Object> process = env->process_object(); auto title_string = FIXED_ONE_BYTE_STRING(env->isolate(), "title"); - CHECK(process->SetAccessor(env->context(), - title_string, - ProcessTitleGetter, - ProcessTitleSetter, - env->as_external()).FromJust()); + CHECK(process->SetAccessor( + env->context(), + title_string, + ProcessTitleGetter, + env->is_main_thread() ? ProcessTitleSetter : nullptr, + env->as_external()).FromJust()); // process.version READONLY_PROPERTY(process, @@ -2862,25 +2884,27 @@ void SetupProcessObject(Environment* env, CHECK(process->SetAccessor(env->context(), debug_port_string, DebugPortGetter, - DebugPortSetter, + env->is_main_thread() ? DebugPortSetter : nullptr, env->as_external()).FromJust()); // define various internal methods - env->SetMethod(process, - "_startProfilerIdleNotifier", - StartProfilerIdleNotifier); - env->SetMethod(process, - "_stopProfilerIdleNotifier", - StopProfilerIdleNotifier); + if (env->is_main_thread()) { + env->SetMethod(process, + "_startProfilerIdleNotifier", + StartProfilerIdleNotifier); + env->SetMethod(process, + "_stopProfilerIdleNotifier", + StopProfilerIdleNotifier); + env->SetMethod(process, "abort", Abort); + env->SetMethod(process, "chdir", Chdir); + env->SetMethod(process, "umask", Umask); + } + env->SetMethod(process, "_getActiveRequests", GetActiveRequests); env->SetMethod(process, "_getActiveHandles", GetActiveHandles); env->SetMethod(process, "reallyExit", Exit); - env->SetMethod(process, "abort", Abort); - env->SetMethod(process, "chdir", Chdir); env->SetMethod(process, "cwd", Cwd); - env->SetMethod(process, "umask", Umask); - #if defined(__POSIX__) && !defined(__ANDROID__) && !defined(__CloudABI__) env->SetMethod(process, "getuid", GetUid); env->SetMethod(process, "geteuid", GetEUid); @@ -2890,16 +2914,17 @@ void SetupProcessObject(Environment* env, #endif // __POSIX__ && !defined(__ANDROID__) && !defined(__CloudABI__) env->SetMethod(process, "_kill", Kill); + env->SetMethod(process, "dlopen", DLOpen); - env->SetMethod(process, "_debugProcess", DebugProcess); - env->SetMethod(process, "_debugEnd", DebugEnd); + if (env->is_main_thread()) { + env->SetMethod(process, "_debugProcess", DebugProcess); + env->SetMethod(process, "_debugEnd", DebugEnd); + } env->SetMethod(process, "hrtime", Hrtime); env->SetMethod(process, "cpuUsage", CPUUsage); - env->SetMethod(process, "dlopen", DLOpen); - env->SetMethod(process, "uptime", Uptime); env->SetMethod(process, "memoryUsage", MemoryUsage); } @@ -2935,8 +2960,10 @@ void RawDebug(const FunctionCallbackInfo<Value>& args) { } -static Local<Function> GetBootstrapper(Environment* env, Local<String> source, - Local<String> script_name) { +static MaybeLocal<Function> GetBootstrapper( + Environment* env, + Local<String> source, + Local<String> script_name) { EscapableHandleScope scope(env->isolate()); TryCatch try_catch(env->isolate()); @@ -2947,16 +2974,17 @@ static Local<Function> GetBootstrapper(Environment* env, Local<String> source, try_catch.SetVerbose(false); // Execute the bootstrapper javascript file - Local<Value> bootstrapper_v = ExecuteString(env, source, script_name); + MaybeLocal<Value> bootstrapper_v = ExecuteString(env, source, script_name); + if (bootstrapper_v.IsEmpty()) // This happens when execution was interrupted. + return MaybeLocal<Function>(); + if (try_catch.HasCaught()) { ReportException(env, try_catch); exit(10); } - CHECK(bootstrapper_v->IsFunction()); - Local<Function> bootstrapper = Local<Function>::Cast(bootstrapper_v); - - return scope.Escape(bootstrapper); + CHECK(bootstrapper_v.ToLocalChecked()->IsFunction()); + return scope.Escape(bootstrapper_v.ToLocalChecked().As<Function>()); } static bool ExecuteBootstrapper(Environment* env, Local<Function> bootstrapper, @@ -2995,13 +3023,18 @@ void LoadEnvironment(Environment* env) { // node_js2c. Local<String> loaders_name = FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/loaders.js"); - Local<Function> loaders_bootstrapper = + MaybeLocal<Function> loaders_bootstrapper = GetBootstrapper(env, LoadersBootstrapperSource(env), loaders_name); Local<String> node_name = FIXED_ONE_BYTE_STRING(env->isolate(), "internal/bootstrap/node.js"); - Local<Function> node_bootstrapper = + MaybeLocal<Function> node_bootstrapper = GetBootstrapper(env, NodeBootstrapperSource(env), node_name); + if (loaders_bootstrapper.IsEmpty() || node_bootstrapper.IsEmpty()) { + // Execution was interrupted. + return; + } + // Add a reference to the global object Local<Object> global = env->context()->Global(); @@ -3049,7 +3082,7 @@ void LoadEnvironment(Environment* env) { // Bootstrap internal loaders Local<Value> bootstrapped_loaders; - if (!ExecuteBootstrapper(env, loaders_bootstrapper, + if (!ExecuteBootstrapper(env, loaders_bootstrapper.ToLocalChecked(), arraysize(loaders_bootstrapper_args), loaders_bootstrapper_args, &bootstrapped_loaders)) { @@ -3065,7 +3098,7 @@ void LoadEnvironment(Environment* env) { bootstrapper, bootstrapped_loaders }; - if (!ExecuteBootstrapper(env, node_bootstrapper, + if (!ExecuteBootstrapper(env, node_bootstrapper.ToLocalChecked(), arraysize(node_bootstrapper_args), node_bootstrapper_args, &bootstrapped_node)) { @@ -4279,6 +4312,7 @@ inline int Start(Isolate* isolate, IsolateData* isolate_data, WaitForInspectorDisconnect(&env); env.set_can_call_into_js(false); + env.stop_sub_worker_contexts(); env.RunCleanup(); RunAtExit(&env); diff --git a/src/node_errors.h b/src/node_errors.h index 931ce7b8fd..2c97088cc5 100644 --- a/src/node_errors.h +++ b/src/node_errors.h @@ -34,6 +34,7 @@ namespace node { V(ERR_MISSING_ARGS, TypeError) \ V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, TypeError) \ V(ERR_MISSING_MODULE, Error) \ + V(ERR_MISSING_PLATFORM_FOR_WORKER, Error) \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, Error) \ V(ERR_SCRIPT_EXECUTION_TIMEOUT, Error) \ V(ERR_STRING_TOO_LONG, Error) \ @@ -68,6 +69,9 @@ namespace node { V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \ V(ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST, \ "MessagePort was found in message but not listed in transferList") \ + V(ERR_MISSING_PLATFORM_FOR_WORKER, \ + "The V8 platform used by this instance of Node does not support " \ + "creating Workers") \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \ "Script execution was interrupted by `SIGINT`") \ V(ERR_TRANSFERRING_EXTERNALIZED_SHAREDARRAYBUFFER, \ diff --git a/src/node_internals.h b/src/node_internals.h index a5d8ed0e5d..7760eb26c6 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -137,6 +137,7 @@ struct sockaddr; V(util) \ V(uv) \ V(v8) \ + V(worker) \ V(zlib) #define NODE_BUILTIN_MODULES(V) \ @@ -314,6 +315,10 @@ class FatalTryCatch : public v8::TryCatch { Environment* env_; }; +void ReportException(Environment* env, + v8::Local<v8::Value> er, + v8::Local<v8::Message> message); + v8::Maybe<bool> ProcessEmitWarning(Environment* env, const char* fmt, ...); v8::Maybe<bool> ProcessEmitDeprecationWarning(Environment* env, const char* warning, diff --git a/src/node_messaging.cc b/src/node_messaging.cc index b56cef2d77..352749ea48 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -57,7 +57,7 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { if (!deserializer->ReadUint32(&id)) return MaybeLocal<Object>(); CHECK_LE(id, message_ports_.size()); - return message_ports_[id]->object(); + return message_ports_[id]->object(isolate); }; MaybeLocal<SharedArrayBuffer> GetSharedArrayBufferFromId( @@ -436,7 +436,7 @@ MessagePort* MessagePort::New( void MessagePort::OnMessage() { HandleScope handle_scope(env()->isolate()); - Local<Context> context = object()->CreationContext(); + Local<Context> context = object(env()->isolate())->CreationContext(); // data_ can only ever be modified by the owner thread, so no need to lock. // However, the message port may be transferred while it is processing @@ -447,6 +447,13 @@ void MessagePort::OnMessage() { { // Get the head of the message queue. Mutex::ScopedLock lock(data_->mutex_); + + if (stop_event_loop_) { + CHECK(!data_->receiving_messages_); + uv_stop(env()->event_loop()); + break; + } + if (!data_->receiving_messages_) break; if (data_->incoming_messages_.empty()) @@ -514,8 +521,9 @@ void MessagePort::Send(Message&& message) { void MessagePort::Send(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); + Local<Context> context = object(env->isolate())->CreationContext(); Message msg; - if (msg.Serialize(env, object()->CreationContext(), args[0], args[1]) + if (msg.Serialize(env, context, args[0], args[1]) .IsNothing()) { return; } @@ -548,6 +556,14 @@ void MessagePort::Stop() { data_->receiving_messages_ = false; } +void MessagePort::StopEventLoop() { + Mutex::ScopedLock lock(data_->mutex_); + data_->receiving_messages_ = false; + stop_event_loop_ = true; + + TriggerAsync(); +} + void MessagePort::Start(const FunctionCallbackInfo<Value>& args) { Environment* env = Environment::GetCurrent(args); MessagePort* port; @@ -570,6 +586,12 @@ void MessagePort::Stop(const FunctionCallbackInfo<Value>& args) { port->Stop(); } +void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) { + MessagePort* port; + ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); + port->OnMessage(); +} + size_t MessagePort::self_size() const { Mutex::ScopedLock lock(data_->mutex_); size_t sz = sizeof(*this) + sizeof(*data_); @@ -604,6 +626,7 @@ MaybeLocal<Function> GetMessagePortConstructor( env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage); env->SetProtoMethod(m, "start", MessagePort::Start); env->SetProtoMethod(m, "stop", MessagePort::Stop); + env->SetProtoMethod(m, "drain", MessagePort::Drain); env->SetProtoMethod(m, "close", HandleWrap::Close); env->SetProtoMethod(m, "unref", HandleWrap::Unref); env->SetProtoMethod(m, "ref", HandleWrap::Ref); diff --git a/src/node_messaging.h b/src/node_messaging.h index ff8fcc7243..9a13437d19 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -133,11 +133,15 @@ class MessagePort : public HandleWrap { void Start(); // Stop processing messages on this port as a receiving end. void Stop(); + // Stop processing messages on this port as a receiving end, + // and stop the event loop that this port is associated with. + void StopEventLoop(); static void New(const v8::FunctionCallbackInfo<v8::Value>& args); static void PostMessage(const v8::FunctionCallbackInfo<v8::Value>& args); static void Start(const v8::FunctionCallbackInfo<v8::Value>& args); static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args); + static void Drain(const v8::FunctionCallbackInfo<v8::Value>& args); // Turns `a` and `b` into siblings, i.e. connects the sending side of one // to the receiving side of the other. This is not thread-safe. @@ -160,6 +164,7 @@ class MessagePort : public HandleWrap { inline uv_async_t* async(); std::unique_ptr<MessagePortData> data_ = nullptr; + bool stop_event_loop_ = false; friend class MessagePortData; }; diff --git a/src/node_worker.cc b/src/node_worker.cc new file mode 100644 index 0000000000..366dca353d --- /dev/null +++ b/src/node_worker.cc @@ -0,0 +1,428 @@ +#include "node_worker.h" +#include "node_errors.h" +#include "node_internals.h" +#include "node_buffer.h" +#include "node_perf.h" +#include "util.h" +#include "util-inl.h" +#include "async_wrap.h" +#include "async_wrap-inl.h" + +using v8::ArrayBuffer; +using v8::Context; +using v8::Function; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::HandleScope; +using v8::Integer; +using v8::Isolate; +using v8::Local; +using v8::Locker; +using v8::Number; +using v8::Object; +using v8::SealHandleScope; +using v8::String; +using v8::Value; + +namespace node { +namespace worker { + +namespace { + +double next_thread_id = 1; +Mutex next_thread_id_mutex; + +} // anonymous namespace + +Worker::Worker(Environment* env, Local<Object> wrap) + : AsyncWrap(env, wrap, AsyncWrap::PROVIDER_WORKER) { + // Generate a new thread id. + { + Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex); + thread_id_ = next_thread_id++; + } + wrap->Set(env->context(), + env->thread_id_string(), + Number::New(env->isolate(), thread_id_)).FromJust(); + + // Set up everything that needs to be set up in the parent environment. + parent_port_ = MessagePort::New(env, env->context()); + if (parent_port_ == nullptr) { + // This can happen e.g. because execution is terminating. + return; + } + + child_port_data_.reset(new MessagePortData(nullptr)); + MessagePort::Entangle(parent_port_, child_port_data_.get()); + + object()->Set(env->context(), + env->message_port_string(), + parent_port_->object()).FromJust(); + + array_buffer_allocator_.reset(CreateArrayBufferAllocator()); + + isolate_ = NewIsolate(array_buffer_allocator_.get()); + CHECK_NE(isolate_, nullptr); + CHECK_EQ(uv_loop_init(&loop_), 0); + + thread_exit_async_.reset(new uv_async_t); + thread_exit_async_->data = this; + CHECK_EQ(uv_async_init(env->event_loop(), + thread_exit_async_.get(), + [](uv_async_t* handle) { + static_cast<Worker*>(handle->data)->OnThreadStopped(); + }), 0); + + { + // Enter an environment capable of executing code in the child Isolate + // (and only in it). + Locker locker(isolate_); + Isolate::Scope isolate_scope(isolate_); + HandleScope handle_scope(isolate_); + + isolate_data_.reset(CreateIsolateData(isolate_, + &loop_, + env->isolate_data()->platform(), + array_buffer_allocator_.get())); + CHECK(isolate_data_); + + Local<Context> context = NewContext(isolate_); + Context::Scope context_scope(context); + + // TODO(addaleax): Use CreateEnvironment(), or generally another public API. + env_.reset(new Environment(isolate_data_.get(), + context, + nullptr)); + CHECK_NE(env_, nullptr); + env_->set_abort_on_uncaught_exception(false); + env_->set_worker_context(this); + env_->set_thread_id(thread_id_); + + env_->Start(0, nullptr, 0, nullptr, env->profiler_idle_notifier_started()); + } + + // The new isolate won't be bothered on this thread again. + isolate_->DiscardThreadSpecificMetadata(); +} + +bool Worker::is_stopped() const { + Mutex::ScopedLock stopped_lock(stopped_mutex_); + return stopped_; +} + +void Worker::Run() { + MultiIsolatePlatform* platform = isolate_data_->platform(); + CHECK_NE(platform, nullptr); + + { + Locker locker(isolate_); + Isolate::Scope isolate_scope(isolate_); + SealHandleScope outer_seal(isolate_); + + { + Context::Scope context_scope(env_->context()); + HandleScope handle_scope(isolate_); + + { + HandleScope handle_scope(isolate_); + Mutex::ScopedLock lock(mutex_); + // Set up the message channel for receiving messages in the child. + child_port_ = MessagePort::New(env_.get(), + env_->context(), + std::move(child_port_data_)); + // MessagePort::New() may return nullptr if execution is terminated + // within it. + if (child_port_ != nullptr) + env_->set_message_port(child_port_->object(isolate_)); + } + + if (!is_stopped()) { + HandleScope handle_scope(isolate_); + Environment::AsyncCallbackScope callback_scope(env_.get()); + env_->async_hooks()->push_async_ids(1, 0); + // This loads the Node bootstrapping code. + LoadEnvironment(env_.get()); + env_->async_hooks()->pop_async_id(1); + } + + { + SealHandleScope seal(isolate_); + bool more; + env_->performance_state()->Mark( + node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START); + do { + if (is_stopped()) break; + uv_run(&loop_, UV_RUN_DEFAULT); + if (is_stopped()) break; + + platform->DrainBackgroundTasks(isolate_); + + more = uv_loop_alive(&loop_); + if (more && !is_stopped()) + continue; + + EmitBeforeExit(env_.get()); + + // Emit `beforeExit` if the loop became alive either after emitting + // event, or after running some callbacks. + more = uv_loop_alive(&loop_); + } while (more == true); + env_->performance_state()->Mark( + node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT); + } + } + + { + int exit_code; + bool stopped = is_stopped(); + if (!stopped) + exit_code = EmitExit(env_.get()); + Mutex::ScopedLock lock(mutex_); + if (exit_code_ == 0 && !stopped) + exit_code_ = exit_code; + } + + env_->set_can_call_into_js(false); + Isolate::DisallowJavascriptExecutionScope disallow_js(isolate_, + Isolate::DisallowJavascriptExecutionScope::THROW_ON_FAILURE); + + // Grab the parent-to-child channel and render is unusable. + MessagePort* child_port; + { + Mutex::ScopedLock lock(mutex_); + child_port = child_port_; + child_port_ = nullptr; + } + + { + Context::Scope context_scope(env_->context()); + child_port->Close(); + env_->stop_sub_worker_contexts(); + env_->RunCleanup(); + RunAtExit(env_.get()); + + { + Mutex::ScopedLock stopped_lock(stopped_mutex_); + stopped_ = true; + } + + env_->RunCleanup(); + + // This call needs to be made while the `Environment` is still alive + // because we assume that it is available for async tracking in the + // NodePlatform implementation. + platform->DrainBackgroundTasks(isolate_); + } + + env_.reset(); + } + + DisposeIsolate(); + + // Need to run the loop one more time to close the platform's uv_async_t + uv_run(&loop_, UV_RUN_ONCE); + + { + Mutex::ScopedLock lock(mutex_); + CHECK(thread_exit_async_); + scheduled_on_thread_stopped_ = true; + uv_async_send(thread_exit_async_.get()); + } +} + +void Worker::DisposeIsolate() { + if (isolate_ == nullptr) + return; + + CHECK(isolate_data_); + MultiIsolatePlatform* platform = isolate_data_->platform(); + platform->CancelPendingDelayedTasks(isolate_); + + isolate_data_.reset(); + + isolate_->Dispose(); + isolate_ = nullptr; +} + +void Worker::JoinThread() { + if (thread_joined_) + return; + CHECK_EQ(uv_thread_join(&tid_), 0); + thread_joined_ = true; + + env()->remove_sub_worker_context(this); + + if (thread_exit_async_) { + env()->CloseHandle(thread_exit_async_.release(), [](uv_async_t* async) { + delete async; + }); + + if (scheduled_on_thread_stopped_) + OnThreadStopped(); + } +} + +void Worker::OnThreadStopped() { + Mutex::ScopedLock lock(mutex_); + scheduled_on_thread_stopped_ = false; + + { + Mutex::ScopedLock stopped_lock(stopped_mutex_); + CHECK(stopped_); + } + + CHECK_EQ(child_port_, nullptr); + parent_port_ = nullptr; + + // It's okay to join the thread while holding the mutex because + // OnThreadStopped means it's no longer doing any work that might grab it + // and really just silently exiting. + JoinThread(); + + { + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(env()->context()); + + // Reset the parent port as we're closing it now anyway. + object()->Set(env()->context(), + env()->message_port_string(), + Undefined(env()->isolate())).FromJust(); + + Local<Value> code = Integer::New(env()->isolate(), exit_code_); + MakeCallback(env()->onexit_string(), 1, &code); + } + + // JoinThread() cleared all libuv handles bound to this Worker, + // the C++ object is no longer needed for anything now. + MakeWeak(); +} + +Worker::~Worker() { + Mutex::ScopedLock lock(mutex_); + JoinThread(); + + CHECK(stopped_); + CHECK(thread_joined_); + CHECK_EQ(child_port_, nullptr); + CHECK_EQ(uv_loop_close(&loop_), 0); + + // This has most likely already happened within the worker thread -- this + // is just in case Worker creation failed early. + DisposeIsolate(); +} + +void Worker::New(const FunctionCallbackInfo<Value>& args) { + Environment* env = Environment::GetCurrent(args); + + CHECK(args.IsConstructCall()); + + if (env->isolate_data()->platform() == nullptr) { + THROW_ERR_MISSING_PLATFORM_FOR_WORKER(env); + return; + } + + new Worker(env, args.This()); +} + +void Worker::StartThread(const FunctionCallbackInfo<Value>& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + Mutex::ScopedLock lock(w->mutex_); + + w->env()->add_sub_worker_context(w); + w->stopped_ = false; + CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) { + static_cast<Worker*>(arg)->Run(); + }, static_cast<void*>(w)), 0); + w->thread_joined_ = false; +} + +void Worker::StopThread(const FunctionCallbackInfo<Value>& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + + w->Exit(1); + w->JoinThread(); +} + +void Worker::Ref(const FunctionCallbackInfo<Value>& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + if (w->thread_exit_async_) + uv_ref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get())); +} + +void Worker::Unref(const FunctionCallbackInfo<Value>& args) { + Worker* w; + ASSIGN_OR_RETURN_UNWRAP(&w, args.This()); + if (w->thread_exit_async_) + uv_unref(reinterpret_cast<uv_handle_t*>(w->thread_exit_async_.get())); +} + +void Worker::Exit(int code) { + Mutex::ScopedLock lock(mutex_); + Mutex::ScopedLock stopped_lock(stopped_mutex_); + if (!stopped_) { + CHECK_NE(env_, nullptr); + stopped_ = true; + exit_code_ = code; + if (child_port_ != nullptr) + child_port_->StopEventLoop(); + isolate_->TerminateExecution(); + } +} + +size_t Worker::self_size() const { + return sizeof(*this); +} + +namespace { + +// Return the MessagePort that is global for this Environment and communicates +// with the internal [kPort] port of the JS Worker class in the parent thread. +void GetEnvMessagePort(const FunctionCallbackInfo<Value>& args) { + Environment* env = Environment::GetCurrent(args); + Local<Object> port = env->message_port(); + if (!port.IsEmpty()) { + CHECK_EQ(port->CreationContext()->GetIsolate(), args.GetIsolate()); + args.GetReturnValue().Set(port); + } +} + +void InitWorker(Local<Object> target, + Local<Value> unused, + Local<Context> context, + void* priv) { + Environment* env = Environment::GetCurrent(context); + + { + Local<FunctionTemplate> w = env->NewFunctionTemplate(Worker::New); + + w->InstanceTemplate()->SetInternalFieldCount(1); + + AsyncWrap::AddWrapMethods(env, w); + env->SetProtoMethod(w, "startThread", Worker::StartThread); + env->SetProtoMethod(w, "stopThread", Worker::StopThread); + env->SetProtoMethod(w, "ref", Worker::Ref); + env->SetProtoMethod(w, "unref", Worker::Unref); + + Local<String> workerString = + FIXED_ONE_BYTE_STRING(env->isolate(), "Worker"); + w->SetClassName(workerString); + target->Set(workerString, w->GetFunction()); + } + + env->SetMethod(target, "getEnvMessagePort", GetEnvMessagePort); + + auto thread_id_string = FIXED_ONE_BYTE_STRING(env->isolate(), "threadId"); + target->Set(env->context(), + thread_id_string, + Number::New(env->isolate(), env->thread_id())).FromJust(); +} + +} // anonymous namespace + +} // namespace worker +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(worker, node::worker::InitWorker) diff --git a/src/node_worker.h b/src/node_worker.h new file mode 100644 index 0000000000..0a98d2f11e --- /dev/null +++ b/src/node_worker.h @@ -0,0 +1,83 @@ +#ifndef SRC_NODE_WORKER_H_ +#define SRC_NODE_WORKER_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "node_messaging.h" +#include <unordered_map> + +namespace node { +namespace worker { + +// A worker thread, as represented in its parent thread. +class Worker : public AsyncWrap { + public: + Worker(Environment* env, v8::Local<v8::Object> wrap); + ~Worker(); + + // Run the worker. This is only called from the worker thread. + void Run(); + + // Forcibly exit the thread with a specified exit code. This may be called + // from any thread. + void Exit(int code); + + // Wait for the worker thread to stop (in a blocking manner). + void JoinThread(); + + size_t self_size() const override; + bool is_stopped() const; + + static void New(const v8::FunctionCallbackInfo<v8::Value>& args); + static void StartThread(const v8::FunctionCallbackInfo<v8::Value>& args); + static void StopThread(const v8::FunctionCallbackInfo<v8::Value>& args); + static void GetMessagePort(const v8::FunctionCallbackInfo<v8::Value>& args); + static void Ref(const v8::FunctionCallbackInfo<v8::Value>& args); + static void Unref(const v8::FunctionCallbackInfo<v8::Value>& args); + + private: + void OnThreadStopped(); + void DisposeIsolate(); + + uv_loop_t loop_; + DeleteFnPtr<IsolateData, FreeIsolateData> isolate_data_; + DeleteFnPtr<Environment, FreeEnvironment> env_; + v8::Isolate* isolate_ = nullptr; + DeleteFnPtr<ArrayBufferAllocator, FreeArrayBufferAllocator> + array_buffer_allocator_; + uv_thread_t tid_; + + // This mutex protects access to all variables listed below it. + mutable Mutex mutex_; + + // Currently only used for telling the parent thread that the child + // thread exited. + std::unique_ptr<uv_async_t> thread_exit_async_; + bool scheduled_on_thread_stopped_ = false; + + // This mutex only protects stopped_. If both locks are acquired, this needs + // to be the latter one. + mutable Mutex stopped_mutex_; + bool stopped_ = true; + + bool thread_joined_ = true; + int exit_code_ = 0; + double thread_id_ = -1; + + std::unique_ptr<MessagePortData> child_port_data_; + + // The child port is always kept alive by the child Environment's persistent + // handle to it. + MessagePort* child_port_ = nullptr; + // This is always kept alive because the JS object associated with the Worker + // instance refers to it via its [kPort] property. + MessagePort* parent_port_ = nullptr; +}; + +} // namespace worker +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + + +#endif // SRC_NODE_WORKER_H_ |