summaryrefslogtreecommitdiff
path: root/src/node_api.cc
diff options
context:
space:
mode:
authorGabriel Schulhof <gabriel.schulhof@intel.com>2018-06-17 11:52:49 -0400
committerGabriel Schulhof <gabriel.schulhof@intel.com>2018-06-29 17:18:46 -0400
commit81f06ba7e49d9c077c209ab9c9de63bad08aa801 (patch)
tree7b7f35909237584e759a4ad9396e2e565b354b38 /src/node_api.cc
parent1c303439846de80aaec75285ec5ce087e85b31a6 (diff)
downloadandroid-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.tar.gz
android-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.tar.bz2
android-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.zip
n-api: add API for asynchronous functions
Bundle a `uv_async_t`, a `uv_idle_t`, a `uv_mutex_t`, a `uv_cond_t`, and a `v8::Persistent<v8::Function>` to make it possible to call into JS from another thread. The API accepts a void data pointer and a callback which will be invoked on the loop thread and which will receive the `napi_value` representing the JavaScript function to call so as to perform the call into JS. The callback is run inside a `node::CallbackScope`. A `std::queue<void*>` is used to store calls from the secondary threads, and an idle loop is started by the `uv_async_t` callback on the loop thread to drain the queue, calling into JS with each item. Items can be added to the queue blockingly or non-blockingly. The thread-safe function can be referenced or unreferenced, with the same semantics as libuv handles. Re: https://github.com/nodejs/help/issues/1035 Re: https://github.com/nodejs/node/issues/20964 Fixes: https://github.com/nodejs/node/issues/13512 PR-URL: https://github.com/nodejs/node/pull/17887 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
Diffstat (limited to 'src/node_api.cc')
-rw-r--r--src/node_api.cc440
1 files changed, 438 insertions, 2 deletions
diff --git a/src/node_api.cc b/src/node_api.cc
index b1b498958b..c32e63ea23 100644
--- a/src/node_api.cc
+++ b/src/node_api.cc
@@ -5,6 +5,7 @@
#include <algorithm>
#include <cmath>
#include <vector>
+#define NAPI_EXPERIMENTAL
#include "node_api.h"
#include "node_internals.h"
#include "env.h"
@@ -923,7 +924,10 @@ const char* error_messages[] = {nullptr,
"The async work item was cancelled",
"napi_escape_handle already called on scope",
"Invalid handle scope usage",
- "Invalid callback scope usage"};
+ "Invalid callback scope usage",
+ "Thread-safe function queue is full",
+ "Thread-safe function handle is closing"
+};
static inline napi_status napi_clear_last_error(napi_env env) {
env->last_error.error_code = napi_ok;
@@ -954,7 +958,7 @@ napi_status napi_get_last_error_info(napi_env env,
// We don't have a napi_status_last as this would result in an ABI
// change each time a message was added.
static_assert(
- node::arraysize(error_messages) == napi_callback_scope_mismatch + 1,
+ node::arraysize(error_messages) == napi_closing + 1,
"Count of error messages must match count of error values");
CHECK_LE(env->last_error.error_code, napi_callback_scope_mismatch);
@@ -3553,3 +3557,435 @@ napi_status napi_run_script(napi_env env,
*result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked());
return GET_RETURN_STATUS(env);
}
+
+class TsFn: public node::AsyncResource {
+ public:
+ TsFn(v8::Local<v8::Function> func,
+ v8::Local<v8::Object> resource,
+ v8::Local<v8::String> name,
+ size_t thread_count_,
+ void* context_,
+ size_t max_queue_size_,
+ napi_env env_,
+ void* finalize_data_,
+ napi_finalize finalize_cb_,
+ napi_threadsafe_function_call_js call_js_cb_):
+ AsyncResource(env_->isolate,
+ resource,
+ *v8::String::Utf8Value(env_->isolate, name)),
+ thread_count(thread_count_),
+ is_closing(false),
+ context(context_),
+ max_queue_size(max_queue_size_),
+ env(env_),
+ finalize_data(finalize_data_),
+ finalize_cb(finalize_cb_),
+ idle_running(false),
+ call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_),
+ handles_closing(false) {
+ ref.Reset(env->isolate, func);
+ node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this);
+ }
+
+ ~TsFn() {
+ node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this);
+ }
+
+ // These methods can be called from any thread.
+
+ napi_status Push(void* data, napi_threadsafe_function_call_mode mode) {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ while (queue.size() >= max_queue_size &&
+ max_queue_size > 0 &&
+ !is_closing) {
+ if (mode == napi_tsfn_nonblocking) {
+ return napi_queue_full;
+ }
+ cond->Wait(lock);
+ }
+
+ if (is_closing) {
+ if (thread_count == 0) {
+ return napi_invalid_arg;
+ } else {
+ thread_count--;
+ return napi_closing;
+ }
+ } else {
+ if (uv_async_send(&async) != 0) {
+ return napi_generic_failure;
+ }
+ queue.push(data);
+ return napi_ok;
+ }
+ }
+
+ napi_status Acquire() {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ if (is_closing) {
+ return napi_closing;
+ }
+
+ thread_count++;
+
+ return napi_ok;
+ }
+
+ napi_status Release(napi_threadsafe_function_release_mode mode) {
+ node::Mutex::ScopedLock lock(this->mutex);
+
+ if (thread_count == 0) {
+ return napi_invalid_arg;
+ }
+
+ thread_count--;
+
+ if (thread_count == 0 || mode == napi_tsfn_abort) {
+ if (!is_closing) {
+ is_closing = (mode == napi_tsfn_abort);
+ if (is_closing) {
+ cond->Signal(lock);
+ }
+ if (uv_async_send(&async) != 0) {
+ return napi_generic_failure;
+ }
+ }
+ }
+
+ return napi_ok;
+ }
+
+ void EmptyQueueAndDelete() {
+ for (; !queue.empty() ; queue.pop()) {
+ call_js_cb(nullptr, nullptr, context, queue.front());
+ }
+ delete this;
+ }
+
+ // These methods must only be called from the loop thread.
+
+ napi_status Init() {
+ TsFn* ts_fn = this;
+
+ if (uv_async_init(env->loop, &async, AsyncCb) == 0) {
+ if (max_queue_size > 0) {
+ cond.reset(new node::ConditionVariable);
+ }
+ if ((max_queue_size == 0 || cond.get() != nullptr) &&
+ uv_idle_init(env->loop, &idle) == 0) {
+ return napi_ok;
+ }
+
+ node::Environment::GetCurrent(env->isolate)->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&async),
+ [] (uv_handle_t* handle) -> void {
+ TsFn* ts_fn =
+ node::ContainerOf(&TsFn::async,
+ reinterpret_cast<uv_async_t*>(handle));
+ delete ts_fn;
+ });
+
+ // Prevent the thread-safe function from being deleted here, because
+ // the callback above will delete it.
+ ts_fn = nullptr;
+ }
+
+ delete ts_fn;
+
+ return napi_generic_failure;
+ }
+
+ napi_status Unref() {
+ uv_unref(reinterpret_cast<uv_handle_t*>(&async));
+ uv_unref(reinterpret_cast<uv_handle_t*>(&idle));
+
+ return napi_ok;
+ }
+
+ napi_status Ref() {
+ uv_ref(reinterpret_cast<uv_handle_t*>(&async));
+ uv_ref(reinterpret_cast<uv_handle_t*>(&idle));
+
+ return napi_ok;
+ }
+
+ void DispatchOne() {
+ void* data;
+ bool popped_value = false;
+ bool idle_stop_failed = false;
+
+ {
+ node::Mutex::ScopedLock lock(this->mutex);
+ if (is_closing) {
+ CloseHandlesAndMaybeDelete();
+ } else {
+ size_t size = queue.size();
+ if (size > 0) {
+ data = queue.front();
+ queue.pop();
+ popped_value = true;
+ if (size == max_queue_size && max_queue_size > 0) {
+ cond->Signal(lock);
+ }
+ size--;
+ }
+
+ if (size == 0) {
+ if (thread_count == 0) {
+ is_closing = true;
+ cond->Signal(lock);
+ CloseHandlesAndMaybeDelete();
+ } else {
+ if (uv_idle_stop(&idle) != 0) {
+ idle_stop_failed = true;
+ } else {
+ idle_running = false;
+ }
+ }
+ }
+ }
+ }
+
+ if (popped_value || idle_stop_failed) {
+ v8::HandleScope scope(env->isolate);
+ CallbackScope cb_scope(this);
+
+ if (idle_stop_failed) {
+ CHECK(napi_throw_error(env,
+ "ERR_NAPI_TSFN_STOP_IDLE_LOOP",
+ "Failed to stop the idle loop") == napi_ok);
+ } else {
+ v8::Local<v8::Function> js_cb =
+ v8::Local<v8::Function>::New(env->isolate, ref);
+ call_js_cb(env,
+ v8impl::JsValueFromV8LocalValue(js_cb),
+ context,
+ data);
+ }
+ }
+ }
+
+ node::Environment* NodeEnv() {
+ // For some reason grabbing the Node.js environment requires a handle scope.
+ v8::HandleScope scope(env->isolate);
+ return node::Environment::GetCurrent(env->isolate);
+ }
+
+ void MaybeStartIdle() {
+ if (!idle_running) {
+ if (uv_idle_start(&idle, IdleCb) != 0) {
+ v8::HandleScope scope(env->isolate);
+ CallbackScope cb_scope(this);
+ CHECK(napi_throw_error(env,
+ "ERR_NAPI_TSFN_START_IDLE_LOOP",
+ "Failed to start the idle loop") == napi_ok);
+ }
+ }
+ }
+
+ void Finalize() {
+ v8::HandleScope scope(env->isolate);
+ if (finalize_cb) {
+ CallbackScope cb_scope(this);
+ finalize_cb(env, finalize_data, context);
+ }
+ EmptyQueueAndDelete();
+ }
+
+ inline void* Context() {
+ return context;
+ }
+
+ void CloseHandlesAndMaybeDelete(bool set_closing = false) {
+ if (set_closing) {
+ node::Mutex::ScopedLock lock(this->mutex);
+ is_closing = true;
+ cond->Signal(lock);
+ }
+ if (handles_closing) {
+ return;
+ }
+ handles_closing = true;
+ NodeEnv()->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&async),
+ [] (uv_handle_t* handle) -> void {
+ TsFn* ts_fn = node::ContainerOf(&TsFn::async,
+ reinterpret_cast<uv_async_t*>(handle));
+ ts_fn->NodeEnv()->CloseHandle(
+ reinterpret_cast<uv_handle_t*>(&ts_fn->idle),
+ [] (uv_handle_t* handle) -> void {
+ TsFn* ts_fn = node::ContainerOf(&TsFn::idle,
+ reinterpret_cast<uv_idle_t*>(handle));
+ ts_fn->Finalize();
+ });
+ });
+ }
+
+ // Default way of calling into JavaScript. Used when TsFn is constructed
+ // without a call_js_cb_.
+ static void CallJs(napi_env env, napi_value cb, void* context, void* data) {
+ if (!(env == nullptr || cb == nullptr)) {
+ napi_value recv;
+ napi_status status;
+
+ status = napi_get_undefined(env, &recv);
+ if (status != napi_ok) {
+ napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED",
+ "Failed to retrieve undefined value");
+ return;
+ }
+
+ status = napi_call_function(env, recv, cb, 0, nullptr, nullptr);
+ if (status != napi_ok && status != napi_pending_exception) {
+ napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS",
+ "Failed to call JS callback");
+ return;
+ }
+ }
+ }
+
+ static void IdleCb(uv_idle_t* idle) {
+ TsFn* ts_fn =
+ node::ContainerOf(&TsFn::idle, idle);
+ ts_fn->DispatchOne();
+ }
+
+ static void AsyncCb(uv_async_t* async) {
+ TsFn* ts_fn =
+ node::ContainerOf(&TsFn::async, async);
+ ts_fn->MaybeStartIdle();
+ }
+
+ static void Cleanup(void* data) {
+ reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true);
+ }
+
+ private:
+ // These are variables protected by the mutex.
+ node::Mutex mutex;
+ std::unique_ptr<node::ConditionVariable> cond;
+ std::queue<void*> queue;
+ uv_async_t async;
+ uv_idle_t idle;
+ size_t thread_count;
+ bool is_closing;
+
+ // These are variables set once, upon creation, and then never again, which
+ // means we don't need the mutex to read them.
+ void* context;
+ size_t max_queue_size;
+
+ // These are variables accessed only from the loop thread.
+ node::Persistent<v8::Function> ref;
+ napi_env env;
+ void* finalize_data;
+ napi_finalize finalize_cb;
+ bool idle_running;
+ napi_async_context async_context;
+ napi_threadsafe_function_call_js call_js_cb;
+ bool handles_closing;
+};
+
+NAPI_EXTERN napi_status
+napi_create_threadsafe_function(napi_env env,
+ napi_value func,
+ napi_value async_resource,
+ napi_value async_resource_name,
+ size_t max_queue_size,
+ size_t initial_thread_count,
+ void* thread_finalize_data,
+ napi_finalize thread_finalize_cb,
+ void* context,
+ napi_threadsafe_function_call_js call_js_cb,
+ napi_threadsafe_function* result) {
+ CHECK_ENV(env);
+ CHECK_ARG(env, func);
+ CHECK_ARG(env, async_resource_name);
+ RETURN_STATUS_IF_FALSE(env, initial_thread_count > 0, napi_invalid_arg);
+ CHECK_ARG(env, result);
+
+ napi_status status = napi_ok;
+
+ v8::Local<v8::Function> v8_func;
+ CHECK_TO_FUNCTION(env, v8_func, func);
+
+ v8::Local<v8::Context> v8_context = env->isolate->GetCurrentContext();
+
+ v8::Local<v8::Object> v8_resource;
+ if (async_resource == nullptr) {
+ v8_resource = v8::Object::New(env->isolate);
+ } else {
+ CHECK_TO_OBJECT(env, v8_context, v8_resource, async_resource);
+ }
+
+ v8::Local<v8::String> v8_name;
+ CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name);
+
+ TsFn* ts_fn = new TsFn(v8_func,
+ v8_resource,
+ v8_name,
+ initial_thread_count,
+ context,
+ max_queue_size,
+ env,
+ thread_finalize_data,
+ thread_finalize_cb,
+ call_js_cb);
+
+ if (ts_fn == nullptr) {
+ status = napi_generic_failure;
+ } else {
+ // Init deletes ts_fn upon failure.
+ status = ts_fn->Init();
+ if (status == napi_ok) {
+ *result = reinterpret_cast<napi_threadsafe_function>(ts_fn);
+ }
+ }
+
+ return napi_set_last_error(env, status);
+}
+
+NAPI_EXTERN napi_status
+napi_get_threadsafe_function_context(napi_threadsafe_function func,
+ void** result) {
+ CHECK(func != nullptr);
+ CHECK(result != nullptr);
+
+ *result = reinterpret_cast<TsFn*>(func)->Context();
+ return napi_ok;
+}
+
+NAPI_EXTERN napi_status
+napi_call_threadsafe_function(napi_threadsafe_function func,
+ void* data,
+ napi_threadsafe_function_call_mode is_blocking) {
+ CHECK(func != nullptr);
+ return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking);
+}
+
+NAPI_EXTERN napi_status
+napi_acquire_threadsafe_function(napi_threadsafe_function func) {
+ CHECK(func != nullptr);
+ return reinterpret_cast<TsFn*>(func)->Acquire();
+}
+
+NAPI_EXTERN napi_status
+napi_release_threadsafe_function(napi_threadsafe_function func,
+ napi_threadsafe_function_release_mode mode) {
+ CHECK(func != nullptr);
+ return reinterpret_cast<TsFn*>(func)->Release(mode);
+}
+
+NAPI_EXTERN napi_status
+napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
+ CHECK(func != nullptr);
+ return reinterpret_cast<TsFn*>(func)->Unref();
+}
+
+NAPI_EXTERN napi_status
+napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) {
+ CHECK(func != nullptr);
+ return reinterpret_cast<TsFn*>(func)->Ref();
+}