diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/node_api.cc | 440 | ||||
-rw-r--r-- | src/node_api.h | 38 | ||||
-rw-r--r-- | src/node_api_types.h | 28 |
3 files changed, 503 insertions, 3 deletions
diff --git a/src/node_api.cc b/src/node_api.cc index b1b498958b..c32e63ea23 100644 --- a/src/node_api.cc +++ b/src/node_api.cc @@ -5,6 +5,7 @@ #include <algorithm> #include <cmath> #include <vector> +#define NAPI_EXPERIMENTAL #include "node_api.h" #include "node_internals.h" #include "env.h" @@ -923,7 +924,10 @@ const char* error_messages[] = {nullptr, "The async work item was cancelled", "napi_escape_handle already called on scope", "Invalid handle scope usage", - "Invalid callback scope usage"}; + "Invalid callback scope usage", + "Thread-safe function queue is full", + "Thread-safe function handle is closing" +}; static inline napi_status napi_clear_last_error(napi_env env) { env->last_error.error_code = napi_ok; @@ -954,7 +958,7 @@ napi_status napi_get_last_error_info(napi_env env, // We don't have a napi_status_last as this would result in an ABI // change each time a message was added. static_assert( - node::arraysize(error_messages) == napi_callback_scope_mismatch + 1, + node::arraysize(error_messages) == napi_closing + 1, "Count of error messages must match count of error values"); CHECK_LE(env->last_error.error_code, napi_callback_scope_mismatch); @@ -3553,3 +3557,435 @@ napi_status napi_run_script(napi_env env, *result = v8impl::JsValueFromV8LocalValue(script_result.ToLocalChecked()); return GET_RETURN_STATUS(env); } + +class TsFn: public node::AsyncResource { + public: + TsFn(v8::Local<v8::Function> func, + v8::Local<v8::Object> resource, + v8::Local<v8::String> name, + size_t thread_count_, + void* context_, + size_t max_queue_size_, + napi_env env_, + void* finalize_data_, + napi_finalize finalize_cb_, + napi_threadsafe_function_call_js call_js_cb_): + AsyncResource(env_->isolate, + resource, + *v8::String::Utf8Value(env_->isolate, name)), + thread_count(thread_count_), + is_closing(false), + context(context_), + max_queue_size(max_queue_size_), + env(env_), + finalize_data(finalize_data_), + finalize_cb(finalize_cb_), + idle_running(false), + call_js_cb(call_js_cb_ == nullptr ? CallJs : call_js_cb_), + handles_closing(false) { + ref.Reset(env->isolate, func); + node::AddEnvironmentCleanupHook(env->isolate, Cleanup, this); + } + + ~TsFn() { + node::RemoveEnvironmentCleanupHook(env->isolate, Cleanup, this); + } + + // These methods can be called from any thread. + + napi_status Push(void* data, napi_threadsafe_function_call_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); + + while (queue.size() >= max_queue_size && + max_queue_size > 0 && + !is_closing) { + if (mode == napi_tsfn_nonblocking) { + return napi_queue_full; + } + cond->Wait(lock); + } + + if (is_closing) { + if (thread_count == 0) { + return napi_invalid_arg; + } else { + thread_count--; + return napi_closing; + } + } else { + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + queue.push(data); + return napi_ok; + } + } + + napi_status Acquire() { + node::Mutex::ScopedLock lock(this->mutex); + + if (is_closing) { + return napi_closing; + } + + thread_count++; + + return napi_ok; + } + + napi_status Release(napi_threadsafe_function_release_mode mode) { + node::Mutex::ScopedLock lock(this->mutex); + + if (thread_count == 0) { + return napi_invalid_arg; + } + + thread_count--; + + if (thread_count == 0 || mode == napi_tsfn_abort) { + if (!is_closing) { + is_closing = (mode == napi_tsfn_abort); + if (is_closing) { + cond->Signal(lock); + } + if (uv_async_send(&async) != 0) { + return napi_generic_failure; + } + } + } + + return napi_ok; + } + + void EmptyQueueAndDelete() { + for (; !queue.empty() ; queue.pop()) { + call_js_cb(nullptr, nullptr, context, queue.front()); + } + delete this; + } + + // These methods must only be called from the loop thread. + + napi_status Init() { + TsFn* ts_fn = this; + + if (uv_async_init(env->loop, &async, AsyncCb) == 0) { + if (max_queue_size > 0) { + cond.reset(new node::ConditionVariable); + } + if ((max_queue_size == 0 || cond.get() != nullptr) && + uv_idle_init(env->loop, &idle) == 0) { + return napi_ok; + } + + node::Environment::GetCurrent(env->isolate)->CloseHandle( + reinterpret_cast<uv_handle_t*>(&async), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = + node::ContainerOf(&TsFn::async, + reinterpret_cast<uv_async_t*>(handle)); + delete ts_fn; + }); + + // Prevent the thread-safe function from being deleted here, because + // the callback above will delete it. + ts_fn = nullptr; + } + + delete ts_fn; + + return napi_generic_failure; + } + + napi_status Unref() { + uv_unref(reinterpret_cast<uv_handle_t*>(&async)); + uv_unref(reinterpret_cast<uv_handle_t*>(&idle)); + + return napi_ok; + } + + napi_status Ref() { + uv_ref(reinterpret_cast<uv_handle_t*>(&async)); + uv_ref(reinterpret_cast<uv_handle_t*>(&idle)); + + return napi_ok; + } + + void DispatchOne() { + void* data; + bool popped_value = false; + bool idle_stop_failed = false; + + { + node::Mutex::ScopedLock lock(this->mutex); + if (is_closing) { + CloseHandlesAndMaybeDelete(); + } else { + size_t size = queue.size(); + if (size > 0) { + data = queue.front(); + queue.pop(); + popped_value = true; + if (size == max_queue_size && max_queue_size > 0) { + cond->Signal(lock); + } + size--; + } + + if (size == 0) { + if (thread_count == 0) { + is_closing = true; + cond->Signal(lock); + CloseHandlesAndMaybeDelete(); + } else { + if (uv_idle_stop(&idle) != 0) { + idle_stop_failed = true; + } else { + idle_running = false; + } + } + } + } + } + + if (popped_value || idle_stop_failed) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + + if (idle_stop_failed) { + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_STOP_IDLE_LOOP", + "Failed to stop the idle loop") == napi_ok); + } else { + v8::Local<v8::Function> js_cb = + v8::Local<v8::Function>::New(env->isolate, ref); + call_js_cb(env, + v8impl::JsValueFromV8LocalValue(js_cb), + context, + data); + } + } + } + + node::Environment* NodeEnv() { + // For some reason grabbing the Node.js environment requires a handle scope. + v8::HandleScope scope(env->isolate); + return node::Environment::GetCurrent(env->isolate); + } + + void MaybeStartIdle() { + if (!idle_running) { + if (uv_idle_start(&idle, IdleCb) != 0) { + v8::HandleScope scope(env->isolate); + CallbackScope cb_scope(this); + CHECK(napi_throw_error(env, + "ERR_NAPI_TSFN_START_IDLE_LOOP", + "Failed to start the idle loop") == napi_ok); + } + } + } + + void Finalize() { + v8::HandleScope scope(env->isolate); + if (finalize_cb) { + CallbackScope cb_scope(this); + finalize_cb(env, finalize_data, context); + } + EmptyQueueAndDelete(); + } + + inline void* Context() { + return context; + } + + void CloseHandlesAndMaybeDelete(bool set_closing = false) { + if (set_closing) { + node::Mutex::ScopedLock lock(this->mutex); + is_closing = true; + cond->Signal(lock); + } + if (handles_closing) { + return; + } + handles_closing = true; + NodeEnv()->CloseHandle( + reinterpret_cast<uv_handle_t*>(&async), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = node::ContainerOf(&TsFn::async, + reinterpret_cast<uv_async_t*>(handle)); + ts_fn->NodeEnv()->CloseHandle( + reinterpret_cast<uv_handle_t*>(&ts_fn->idle), + [] (uv_handle_t* handle) -> void { + TsFn* ts_fn = node::ContainerOf(&TsFn::idle, + reinterpret_cast<uv_idle_t*>(handle)); + ts_fn->Finalize(); + }); + }); + } + + // Default way of calling into JavaScript. Used when TsFn is constructed + // without a call_js_cb_. + static void CallJs(napi_env env, napi_value cb, void* context, void* data) { + if (!(env == nullptr || cb == nullptr)) { + napi_value recv; + napi_status status; + + status = napi_get_undefined(env, &recv); + if (status != napi_ok) { + napi_throw_error(env, "ERR_NAPI_TSFN_GET_UNDEFINED", + "Failed to retrieve undefined value"); + return; + } + + status = napi_call_function(env, recv, cb, 0, nullptr, nullptr); + if (status != napi_ok && status != napi_pending_exception) { + napi_throw_error(env, "ERR_NAPI_TSFN_CALL_JS", + "Failed to call JS callback"); + return; + } + } + } + + static void IdleCb(uv_idle_t* idle) { + TsFn* ts_fn = + node::ContainerOf(&TsFn::idle, idle); + ts_fn->DispatchOne(); + } + + static void AsyncCb(uv_async_t* async) { + TsFn* ts_fn = + node::ContainerOf(&TsFn::async, async); + ts_fn->MaybeStartIdle(); + } + + static void Cleanup(void* data) { + reinterpret_cast<TsFn*>(data)->CloseHandlesAndMaybeDelete(true); + } + + private: + // These are variables protected by the mutex. + node::Mutex mutex; + std::unique_ptr<node::ConditionVariable> cond; + std::queue<void*> queue; + uv_async_t async; + uv_idle_t idle; + size_t thread_count; + bool is_closing; + + // These are variables set once, upon creation, and then never again, which + // means we don't need the mutex to read them. + void* context; + size_t max_queue_size; + + // These are variables accessed only from the loop thread. + node::Persistent<v8::Function> ref; + napi_env env; + void* finalize_data; + napi_finalize finalize_cb; + bool idle_running; + napi_async_context async_context; + napi_threadsafe_function_call_js call_js_cb; + bool handles_closing; +}; + +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result) { + CHECK_ENV(env); + CHECK_ARG(env, func); + CHECK_ARG(env, async_resource_name); + RETURN_STATUS_IF_FALSE(env, initial_thread_count > 0, napi_invalid_arg); + CHECK_ARG(env, result); + + napi_status status = napi_ok; + + v8::Local<v8::Function> v8_func; + CHECK_TO_FUNCTION(env, v8_func, func); + + v8::Local<v8::Context> v8_context = env->isolate->GetCurrentContext(); + + v8::Local<v8::Object> v8_resource; + if (async_resource == nullptr) { + v8_resource = v8::Object::New(env->isolate); + } else { + CHECK_TO_OBJECT(env, v8_context, v8_resource, async_resource); + } + + v8::Local<v8::String> v8_name; + CHECK_TO_STRING(env, v8_context, v8_name, async_resource_name); + + TsFn* ts_fn = new TsFn(v8_func, + v8_resource, + v8_name, + initial_thread_count, + context, + max_queue_size, + env, + thread_finalize_data, + thread_finalize_cb, + call_js_cb); + + if (ts_fn == nullptr) { + status = napi_generic_failure; + } else { + // Init deletes ts_fn upon failure. + status = ts_fn->Init(); + if (status == napi_ok) { + *result = reinterpret_cast<napi_threadsafe_function>(ts_fn); + } + } + + return napi_set_last_error(env, status); +} + +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result) { + CHECK(func != nullptr); + CHECK(result != nullptr); + + *result = reinterpret_cast<TsFn*>(func)->Context(); + return napi_ok; +} + +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking) { + CHECK(func != nullptr); + return reinterpret_cast<TsFn*>(func)->Push(data, is_blocking); +} + +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast<TsFn*>(func)->Acquire(); +} + +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode) { + CHECK(func != nullptr); + return reinterpret_cast<TsFn*>(func)->Release(mode); +} + +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast<TsFn*>(func)->Unref(); +} + +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func) { + CHECK(func != nullptr); + return reinterpret_cast<TsFn*>(func)->Ref(); +} diff --git a/src/node_api.h b/src/node_api.h index 91c2775a03..84706ac3ed 100644 --- a/src/node_api.h +++ b/src/node_api.h @@ -614,6 +614,44 @@ NAPI_EXTERN napi_status napi_run_script(napi_env env, NAPI_EXTERN napi_status napi_get_uv_event_loop(napi_env env, struct uv_loop_s** loop); +#ifdef NAPI_EXPERIMENTAL +// Calling into JS from other threads +NAPI_EXTERN napi_status +napi_create_threadsafe_function(napi_env env, + napi_value func, + napi_value async_resource, + napi_value async_resource_name, + size_t max_queue_size, + size_t initial_thread_count, + void* thread_finalize_data, + napi_finalize thread_finalize_cb, + void* context, + napi_threadsafe_function_call_js call_js_cb, + napi_threadsafe_function* result); + +NAPI_EXTERN napi_status +napi_get_threadsafe_function_context(napi_threadsafe_function func, + void** result); + +NAPI_EXTERN napi_status +napi_call_threadsafe_function(napi_threadsafe_function func, + void* data, + napi_threadsafe_function_call_mode is_blocking); + +NAPI_EXTERN napi_status +napi_acquire_threadsafe_function(napi_threadsafe_function func); + +NAPI_EXTERN napi_status +napi_release_threadsafe_function(napi_threadsafe_function func, + napi_threadsafe_function_release_mode mode); + +NAPI_EXTERN napi_status +napi_unref_threadsafe_function(napi_env env, napi_threadsafe_function func); + +NAPI_EXTERN napi_status +napi_ref_threadsafe_function(napi_env env, napi_threadsafe_function func); + +#endif // NAPI_EXPERIMENTAL EXTERN_C_END #endif // SRC_NODE_API_H_ diff --git a/src/node_api_types.h b/src/node_api_types.h index f7f3ee6275..af7d7c7f95 100644 --- a/src/node_api_types.h +++ b/src/node_api_types.h @@ -20,6 +20,9 @@ typedef struct napi_callback_info__* napi_callback_info; typedef struct napi_async_context__* napi_async_context; typedef struct napi_async_work__* napi_async_work; typedef struct napi_deferred__* napi_deferred; +#ifdef NAPI_EXPERIMENTAL +typedef struct napi_threadsafe_function__* napi_threadsafe_function; +#endif // NAPI_EXPERIMENTAL typedef enum { napi_default = 0, @@ -72,9 +75,25 @@ typedef enum { napi_cancelled, napi_escape_called_twice, napi_handle_scope_mismatch, - napi_callback_scope_mismatch + napi_callback_scope_mismatch, +#ifdef NAPI_EXPERIMENTAL + napi_queue_full, + napi_closing, +#endif // NAPI_EXPERIMENTAL } napi_status; +#ifdef NAPI_EXPERIMENTAL +typedef enum { + napi_tsfn_release, + napi_tsfn_abort +} napi_threadsafe_function_release_mode; + +typedef enum { + napi_tsfn_nonblocking, + napi_tsfn_blocking +} napi_threadsafe_function_call_mode; +#endif // NAPI_EXPERIMENTAL + typedef napi_value (*napi_callback)(napi_env env, napi_callback_info info); typedef void (*napi_finalize)(napi_env env, @@ -86,6 +105,13 @@ typedef void (*napi_async_complete_callback)(napi_env env, napi_status status, void* data); +#ifdef NAPI_EXPERIMENTAL +typedef void (*napi_threadsafe_function_call_js)(napi_env env, + napi_value js_callback, + void* context, + void* data); +#endif // NAPI_EXPERIMENTAL + typedef struct { // One of utf8name or name should be NULL. const char* utf8name; |