summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorGireesh Punathil <gpunathi@in.ibm.com>2018-06-12 09:01:46 -0400
committerGireesh Punathil <gpunathi@in.ibm.com>2019-03-16 12:50:26 +0530
commitd35af56e5f3b1334c4360dbf8a013d0c522fe5f8 (patch)
treea6f9bb5467ec693c04d6741c3704a7c80a4eb482 /src
parent22de2cfb71f3f1ab63e9663f4aa62bd9016b762a (diff)
downloadandroid-node-v8-d35af56e5f3b1334c4360dbf8a013d0c522fe5f8.tar.gz
android-node-v8-d35af56e5f3b1334c4360dbf8a013d0c522fe5f8.tar.bz2
android-node-v8-d35af56e5f3b1334c4360dbf8a013d0c522fe5f8.zip
src: shutdown node in-flight
This commit introduces a `node::Stop()` API. An identified use case for embedders is their ability to tear down Node while it is still running (event loop contain pending events) Here the assumptions are that (i) embedders do not wish to resort to JS routines to initiate shutdown (ii) embedders have the Environment handle handy. (iii) embedders stop Node through a second thread. Fixes: https://github.com/nodejs/node/issues/19365 Refs: https://github.com/nodejs/user-feedback/issues/51 PR-URL: https://github.com/nodejs/node/pull/21283 Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Richard Lau <riclau@uk.ibm.com> Reviewed-By: Michael Dawson <Michael_Dawson@ca.ibm.com>
Diffstat (limited to 'src')
-rw-r--r--src/api/callback.cc2
-rw-r--r--src/api/environment.cc2
-rw-r--r--src/env-inl.h8
-rw-r--r--src/env.cc62
-rw-r--r--src/env.h29
-rw-r--r--src/module_wrap.cc2
-rw-r--r--src/node.cc10
-rw-r--r--src/node.h10
-rw-r--r--src/node_contextify.cc2
-rw-r--r--src/node_worker.cc102
-rw-r--r--src/node_worker.h34
11 files changed, 154 insertions, 109 deletions
diff --git a/src/api/callback.cc b/src/api/callback.cc
index 6c6aec4573..b90f5ca92d 100644
--- a/src/api/callback.cc
+++ b/src/api/callback.cc
@@ -82,7 +82,7 @@ void InternalCallbackScope::Close() {
HandleScope handle_scope(env_->isolate());
if (!env_->can_call_into_js()) return;
- if (failed_ && !env_->is_main_thread() && env_->is_stopping_worker()) {
+ if (failed_ && !env_->is_main_thread() && env_->is_stopping()) {
env_->async_hooks()->clear_async_id_stack();
}
diff --git a/src/api/environment.cc b/src/api/environment.cc
index 6e3c61fac1..1597a6cec5 100644
--- a/src/api/environment.cc
+++ b/src/api/environment.cc
@@ -40,7 +40,7 @@ static bool ShouldAbortOnUncaughtException(Isolate* isolate) {
DebugSealHandleScope scope(isolate);
Environment* env = Environment::GetCurrent(isolate);
return env != nullptr &&
- (env->is_main_thread() || !env->is_stopping_worker()) &&
+ (env->is_main_thread() || !env->is_stopping()) &&
env->should_abort_on_uncaught_toggle()[0] &&
!env->inside_should_not_abort_on_uncaught_scope();
}
diff --git a/src/env-inl.h b/src/env-inl.h
index 8c37b393dc..aff12e57bf 100644
--- a/src/env-inl.h
+++ b/src/env-inl.h
@@ -32,7 +32,6 @@
#include "v8.h"
#include "node_perf_common.h"
#include "node_context_data.h"
-#include "node_worker.h"
#include <cstddef>
#include <cstdint>
@@ -661,7 +660,7 @@ void Environment::SetUnrefImmediate(native_immediate_callback cb,
}
inline bool Environment::can_call_into_js() const {
- return can_call_into_js_ && (is_main_thread() || !is_stopping_worker());
+ return can_call_into_js_ && !is_stopping();
}
inline void Environment::set_can_call_into_js(bool can_call_into_js) {
@@ -709,9 +708,8 @@ inline void Environment::remove_sub_worker_context(worker::Worker* context) {
sub_worker_contexts_.erase(context);
}
-inline bool Environment::is_stopping_worker() const {
- CHECK(!is_main_thread());
- return worker_context_->is_stopped();
+inline bool Environment::is_stopping() const {
+ return thread_stopper_.IsStopped();
}
inline performance::performance_state* Environment::performance_state() {
diff --git a/src/env.cc b/src/env.cc
index 2448754db9..768e14d796 100644
--- a/src/env.cc
+++ b/src/env.cc
@@ -340,6 +340,14 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_prepare_handle_));
uv_unref(reinterpret_cast<uv_handle_t*>(&idle_check_handle_));
+ GetAsyncRequest()->Install(
+ this, static_cast<void*>(this), [](uv_async_t* handle) {
+ Environment* env = static_cast<Environment*>(handle->data);
+ uv_stop(env->event_loop());
+ });
+ GetAsyncRequest()->SetStopped(false);
+ uv_unref(reinterpret_cast<uv_handle_t*>(GetAsyncRequest()->GetHandle()));
+
// Register clean-up cb to be called to clean up the handles
// when the environment is freed, note that they are not cleaned in
// the one environment per process setup, but will be called in
@@ -355,6 +363,12 @@ void Environment::InitializeLibuv(bool start_profiler_idle_notifier) {
uv_key_set(&thread_local_env, this);
}
+void Environment::ExitEnv() {
+ set_can_call_into_js(false);
+ GetAsyncRequest()->Stop();
+ isolate_->TerminateExecution();
+}
+
MaybeLocal<Object> Environment::ProcessCliArgs(
const std::vector<std::string>& args,
const std::vector<std::string>& exec_args) {
@@ -519,6 +533,7 @@ void Environment::RunCleanup() {
started_cleanup_ = true;
TraceEventScope trace_scope(TRACING_CATEGORY_NODE1(environment),
"RunCleanup", this);
+ GetAsyncRequest()->Uninstall();
CleanupHandles();
while (!cleanup_hooks_.empty()) {
@@ -932,6 +947,53 @@ char* Environment::Reallocate(char* data, size_t old_size, size_t size) {
return new_data;
}
+void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
+ Mutex::ScopedLock lock(mutex_);
+ env_ = env;
+ async_ = new uv_async_t;
+ async_->data = data;
+ CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
+}
+
+void AsyncRequest::Uninstall() {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr) {
+ env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
+ async_ = nullptr;
+ }
+}
+
+void AsyncRequest::Stop() {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = true;
+ if (async_ != nullptr) uv_async_send(async_);
+}
+
+void AsyncRequest::SetStopped(bool flag) {
+ Mutex::ScopedLock lock(mutex_);
+ stop_ = flag;
+}
+
+bool AsyncRequest::IsStopped() const {
+ Mutex::ScopedLock lock(mutex_);
+ return stop_;
+}
+
+uv_async_t* AsyncRequest::GetHandle() {
+ Mutex::ScopedLock lock(mutex_);
+ return async_;
+}
+
+void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
+ Mutex::ScopedLock lock(mutex_);
+ if (async_ != nullptr) tracker->TrackField("async_request", *async_);
+}
+
+AsyncRequest::~AsyncRequest() {
+ Mutex::ScopedLock lock(mutex_);
+ CHECK_NULL(async_);
+}
+
// Not really any better place than env.cc at this moment.
void BaseObject::DeleteMe(void* data) {
BaseObject* self = static_cast<BaseObject*>(data);
diff --git a/src/env.h b/src/env.h
index 2c61082c5f..5be13df1a9 100644
--- a/src/env.h
+++ b/src/env.h
@@ -511,6 +511,27 @@ struct AllocatedBuffer {
friend class Environment;
};
+class AsyncRequest : public MemoryRetainer {
+ public:
+ AsyncRequest() {}
+ ~AsyncRequest();
+ void Install(Environment* env, void* data, uv_async_cb target);
+ void Uninstall();
+ void Stop();
+ void SetStopped(bool flag);
+ bool IsStopped() const;
+ uv_async_t* GetHandle();
+ void MemoryInfo(MemoryTracker* tracker) const override;
+ SET_MEMORY_INFO_NAME(AsyncRequest)
+ SET_SELF_SIZE(AsyncRequest)
+
+ private:
+ Environment* env_;
+ uv_async_t* async_ = nullptr;
+ mutable Mutex mutex_;
+ bool stop_ = true;
+};
+
class Environment {
public:
class AsyncHooks {
@@ -695,6 +716,7 @@ class Environment {
void RegisterHandleCleanups();
void CleanupHandles();
void Exit(int code);
+ void ExitEnv();
// Register clean-up cb to be called on environment destruction.
inline void RegisterHandleCleanup(uv_handle_t* handle,
@@ -844,7 +866,7 @@ class Environment {
inline void add_sub_worker_context(worker::Worker* context);
inline void remove_sub_worker_context(worker::Worker* context);
void stop_sub_worker_contexts();
- inline bool is_stopping_worker() const;
+ inline bool is_stopping() const;
inline void ThrowError(const char* errmsg);
inline void ThrowTypeError(const char* errmsg);
@@ -1018,6 +1040,7 @@ class Environment {
inline ExecutionMode execution_mode() { return execution_mode_; }
inline void set_execution_mode(ExecutionMode mode) { execution_mode_ = mode; }
+ inline AsyncRequest* GetAsyncRequest() { return &thread_stopper_; }
private:
inline void CreateImmediate(native_immediate_callback cb,
@@ -1174,6 +1197,10 @@ class Environment {
uint64_t cleanup_hook_counter_ = 0;
bool started_cleanup_ = false;
+ // A custom async abstraction (a pair of async handle and a state variable)
+ // Used by embedders to shutdown running Node instance.
+ AsyncRequest thread_stopper_;
+
static void EnvPromiseHook(v8::PromiseHookType type,
v8::Local<v8::Promise> promise,
v8::Local<v8::Value> parent);
diff --git a/src/module_wrap.cc b/src/module_wrap.cc
index d311f7caca..ac5d28fb23 100644
--- a/src/module_wrap.cc
+++ b/src/module_wrap.cc
@@ -302,7 +302,7 @@ void ModuleWrap::Evaluate(const FunctionCallbackInfo<Value>& args) {
// Convert the termination exception into a regular exception.
if (timed_out || received_signal) {
- if (!env->is_main_thread() && env->is_stopping_worker())
+ if (!env->is_main_thread() && env->is_stopping())
return;
env->isolate()->CancelTerminateExecution();
// It is possible that execution was terminated by another timeout in
diff --git a/src/node.cc b/src/node.cc
index adcd6f6cad..783962d192 100644
--- a/src/node.cc
+++ b/src/node.cc
@@ -832,15 +832,14 @@ inline int StartNodeWithIsolate(Isolate* isolate,
per_process::v8_platform.DrainVMTasks(isolate);
more = uv_loop_alive(env.event_loop());
- if (more)
- continue;
+ if (more && !env.GetAsyncRequest()->IsStopped()) continue;
RunBeforeExit(&env);
// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(env.event_loop());
- } while (more == true);
+ } while (more == true && !env.GetAsyncRequest()->IsStopped());
env.performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
}
@@ -977,6 +976,11 @@ int Start(int argc, char** argv) {
return exit_code;
}
+int Stop(Environment* env) {
+ env->ExitEnv();
+ return 0;
+}
+
} // namespace node
#if !HAVE_INSPECTOR
diff --git a/src/node.h b/src/node.h
index 17eae72e9c..6568085335 100644
--- a/src/node.h
+++ b/src/node.h
@@ -201,10 +201,17 @@ typedef intptr_t ssize_t;
namespace node {
+class IsolateData;
+class Environment;
+
// TODO(addaleax): Officially deprecate this and replace it with something
// better suited for a public embedder API.
NODE_EXTERN int Start(int argc, char* argv[]);
+// Tear down Node.js while it is running (there are active handles
+// in the loop and / or actively executing JavaScript code).
+NODE_EXTERN int Stop(Environment* env);
+
// TODO(addaleax): Officially deprecate this and replace it with something
// better suited for a public embedder API.
NODE_EXTERN void Init(int* argc,
@@ -239,9 +246,6 @@ class NODE_EXTERN ArrayBufferAllocator : public v8::ArrayBuffer::Allocator {
NODE_EXTERN ArrayBufferAllocator* CreateArrayBufferAllocator();
NODE_EXTERN void FreeArrayBufferAllocator(ArrayBufferAllocator* allocator);
-class IsolateData;
-class Environment;
-
class NODE_EXTERN MultiIsolatePlatform : public v8::Platform {
public:
~MultiIsolatePlatform() override { }
diff --git a/src/node_contextify.cc b/src/node_contextify.cc
index b9962f091d..621fa7eb16 100644
--- a/src/node_contextify.cc
+++ b/src/node_contextify.cc
@@ -924,7 +924,7 @@ bool ContextifyScript::EvalMachine(Environment* env,
// Convert the termination exception into a regular exception.
if (timed_out || received_signal) {
- if (!env->is_main_thread() && env->is_stopping_worker())
+ if (!env->is_main_thread() && env->is_stopping())
return false;
env->isolate()->CancelTerminateExecution();
// It is possible that execution was terminated by another timeout in
diff --git a/src/node_worker.cc b/src/node_worker.cc
index 42d407e654..d9f7311c5d 100644
--- a/src/node_worker.cc
+++ b/src/node_worker.cc
@@ -57,46 +57,6 @@ void WaitForWorkerInspectorToStop(Environment* child) {
} // anonymous namespace
-void AsyncRequest::Install(Environment* env, void* data, uv_async_cb target) {
- Mutex::ScopedLock lock(mutex_);
- env_ = env;
- async_ = new uv_async_t;
- if (data != nullptr) async_->data = data;
- CHECK_EQ(uv_async_init(env_->event_loop(), async_, target), 0);
-}
-
-void AsyncRequest::Uninstall() {
- Mutex::ScopedLock lock(mutex_);
- if (async_ != nullptr)
- env_->CloseHandle(async_, [](uv_async_t* async) { delete async; });
-}
-
-void AsyncRequest::Stop() {
- Mutex::ScopedLock lock(mutex_);
- stop_ = true;
- if (async_ != nullptr) uv_async_send(async_);
-}
-
-void AsyncRequest::SetStopped(bool flag) {
- Mutex::ScopedLock lock(mutex_);
- stop_ = flag;
-}
-
-bool AsyncRequest::IsStopped() const {
- Mutex::ScopedLock lock(mutex_);
- return stop_;
-}
-
-uv_async_t* AsyncRequest::GetHandle() {
- Mutex::ScopedLock lock(mutex_);
- return async_;
-}
-
-void AsyncRequest::MemoryInfo(MemoryTracker* tracker) const {
- Mutex::ScopedLock lock(mutex_);
- if (async_ != nullptr) tracker->TrackField("async_request", *async_);
-}
-
Worker::Worker(Environment* env,
Local<Object> wrap,
const std::string& url,
@@ -142,7 +102,10 @@ Worker::Worker(Environment* env,
}
bool Worker::is_stopped() const {
- return thread_stopper_.IsStopped();
+ Mutex::ScopedLock lock(mutex_);
+ if (env_ != nullptr)
+ return env_->GetAsyncRequest()->IsStopped();
+ return stopped_;
}
// This class contains data that is only relevant to the child thread itself,
@@ -254,8 +217,12 @@ void Worker::Run() {
Context::Scope context_scope(env_->context());
if (child_port != nullptr)
child_port->Close();
- thread_stopper_.Uninstall();
- thread_stopper_.SetStopped(true);
+ {
+ Mutex::ScopedLock lock(mutex_);
+ stopped_ = true;
+ this->env_ = nullptr;
+ }
+ env_->GetAsyncRequest()->SetStopped(true);
env_->stop_sub_worker_contexts();
env_->RunCleanup();
RunAtExit(env_.get());
@@ -271,12 +238,12 @@ void Worker::Run() {
}
});
- if (thread_stopper_.IsStopped()) return;
+ if (is_stopped()) return;
{
HandleScope handle_scope(isolate_);
Local<Context> context = NewContext(isolate_);
- if (thread_stopper_.IsStopped()) return;
+ if (is_stopped()) return;
CHECK(!context.IsEmpty());
Context::Scope context_scope(context);
{
@@ -294,18 +261,13 @@ void Worker::Run() {
env_->ProcessCliArgs(std::vector<std::string>{},
std::move(exec_argv_));
}
-
+ {
+ Mutex::ScopedLock lock(mutex_);
+ if (stopped_) return;
+ this->env_ = env_.get();
+ }
Debug(this, "Created Environment for worker with id %llu", thread_id_);
-
if (is_stopped()) return;
- thread_stopper_.Install(env_.get(), env_.get(), [](uv_async_t* handle) {
- Environment* env_ = static_cast<Environment*>(handle->data);
- uv_stop(env_->event_loop());
- });
- uv_unref(reinterpret_cast<uv_handle_t*>(thread_stopper_.GetHandle()));
-
- Debug(this, "Created Environment for worker with id %llu", thread_id_);
- if (thread_stopper_.IsStopped()) return;
{
HandleScope handle_scope(isolate_);
Mutex::ScopedLock lock(mutex_);
@@ -321,7 +283,7 @@ void Worker::Run() {
Debug(this, "Created message port for worker %llu", thread_id_);
}
- if (thread_stopper_.IsStopped()) return;
+ if (is_stopped()) return;
{
#if NODE_USE_V8_PLATFORM && HAVE_INSPECTOR
StartWorkerInspector(env_.get(),
@@ -342,28 +304,28 @@ void Worker::Run() {
Debug(this, "Loaded environment for worker %llu", thread_id_);
}
- if (thread_stopper_.IsStopped()) return;
+ if (is_stopped()) return;
{
SealHandleScope seal(isolate_);
bool more;
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_START);
do {
- if (thread_stopper_.IsStopped()) break;
+ if (is_stopped()) break;
uv_run(&data.loop_, UV_RUN_DEFAULT);
- if (thread_stopper_.IsStopped()) break;
+ if (is_stopped()) break;
platform_->DrainTasks(isolate_);
more = uv_loop_alive(&data.loop_);
- if (more && !thread_stopper_.IsStopped()) continue;
+ if (more && !is_stopped()) continue;
EmitBeforeExit(env_.get());
// Emit `beforeExit` if the loop became alive either after emitting
// event, or after running some callbacks.
more = uv_loop_alive(&data.loop_);
- } while (more == true);
+ } while (more == true && !is_stopped());
env_->performance_state()->Mark(
node::performance::NODE_PERFORMANCE_MILESTONE_LOOP_EXIT);
}
@@ -371,7 +333,7 @@ void Worker::Run() {
{
int exit_code;
- bool stopped = thread_stopper_.IsStopped();
+ bool stopped = is_stopped();
if (!stopped)
exit_code = EmitExit(env_.get());
Mutex::ScopedLock lock(mutex_);
@@ -419,7 +381,7 @@ void Worker::OnThreadStopped() {
Worker::~Worker() {
Mutex::ScopedLock lock(mutex_);
- CHECK(thread_stopper_.IsStopped());
+ CHECK(stopped_ || env_ == nullptr || env_->GetAsyncRequest()->IsStopped());
CHECK(thread_joined_);
Debug(this, "Worker %llu destroyed", thread_id_);
@@ -516,12 +478,12 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {
w->ClearWeak();
w->env()->add_sub_worker_context(w);
+ w->stopped_ = false;
w->thread_joined_ = false;
- w->thread_stopper_.SetStopped(false);
w->on_thread_finished_.Install(w->env(), w, [](uv_async_t* handle) {
Worker* w_ = static_cast<Worker*>(handle->data);
- CHECK(w_->thread_stopper_.IsStopped());
+ CHECK(w_->is_stopped());
w_->parent_port_ = nullptr;
w_->JoinThread();
delete w_;
@@ -569,14 +531,12 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
-
Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);
- if (!thread_stopper_.IsStopped()) {
+ if (env_ != nullptr) {
exit_code_ = code;
- Debug(this, "Received StopEventLoop request");
- thread_stopper_.Stop();
- if (isolate_ != nullptr)
- isolate_->TerminateExecution();
+ Stop(env_);
+ } else {
+ stopped_ = true;
}
}
diff --git a/src/node_worker.h b/src/node_worker.h
index 442056eaac..adc755426d 100644
--- a/src/node_worker.h
+++ b/src/node_worker.h
@@ -12,26 +12,6 @@ namespace worker {
class WorkerThreadData;
-class AsyncRequest : public MemoryRetainer {
- public:
- AsyncRequest() {}
- void Install(Environment* env, void* data, uv_async_cb target);
- void Uninstall();
- void Stop();
- void SetStopped(bool flag);
- bool IsStopped() const;
- uv_async_t* GetHandle();
- void MemoryInfo(MemoryTracker* tracker) const override;
- SET_MEMORY_INFO_NAME(AsyncRequest)
- SET_SELF_SIZE(AsyncRequest)
-
- private:
- Environment* env_;
- uv_async_t* async_ = nullptr;
- mutable Mutex mutex_;
- bool stop_ = true;
-};
-
// A worker thread, as represented in its parent thread.
class Worker : public AsyncWrap {
public:
@@ -54,7 +34,6 @@ class Worker : public AsyncWrap {
void MemoryInfo(MemoryTracker* tracker) const override {
tracker->TrackField("parent_port", parent_port_);
- tracker->TrackInlineField(&thread_stopper_, "thread_stopper_");
tracker->TrackInlineField(&on_thread_finished_, "on_thread_finished_");
}
@@ -107,9 +86,20 @@ class Worker : public AsyncWrap {
// instance refers to it via its [kPort] property.
MessagePort* parent_port_ = nullptr;
- AsyncRequest thread_stopper_;
AsyncRequest on_thread_finished_;
+ // A raw flag that is used by creator and worker threads to
+ // sync up on pre-mature termination of worker - while in the
+ // warmup phase. Once the worker is fully warmed up, use the
+ // async handle of the worker's Environment for the same purpose.
+ bool stopped_ = true;
+
+ // The real Environment of the worker object. It has a lesser
+ // lifespan than the worker object itself - comes to life
+ // when the worker thread creates a new Environment, and gets
+ // destroyed alongwith the worker thread.
+ Environment* env_ = nullptr;
+
friend class WorkerThreadData;
};