diff options
author | Gabriel Schulhof <gabriel.schulhof@intel.com> | 2018-06-17 11:52:49 -0400 |
---|---|---|
committer | Gabriel Schulhof <gabriel.schulhof@intel.com> | 2018-06-29 17:18:46 -0400 |
commit | 81f06ba7e49d9c077c209ab9c9de63bad08aa801 (patch) | |
tree | 7b7f35909237584e759a4ad9396e2e565b354b38 /test | |
parent | 1c303439846de80aaec75285ec5ce087e85b31a6 (diff) | |
download | android-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.tar.gz android-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.tar.bz2 android-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.zip |
n-api: add API for asynchronous functions
Bundle a `uv_async_t`, a `uv_idle_t`, a `uv_mutex_t`, a `uv_cond_t`,
and a `v8::Persistent<v8::Function>` to make it possible to call into JS
from another thread. The API accepts a void data pointer and a callback
which will be invoked on the loop thread and which will receive the
`napi_value` representing the JavaScript function to call so as to
perform the call into JS. The callback is run inside a
`node::CallbackScope`.
A `std::queue<void*>` is used to store calls from the secondary
threads, and an idle loop is started by the `uv_async_t` callback on the
loop thread to drain the queue, calling into JS with each item.
Items can be added to the queue blockingly or non-blockingly.
The thread-safe function can be referenced or unreferenced, with the
same semantics as libuv handles.
Re: https://github.com/nodejs/help/issues/1035
Re: https://github.com/nodejs/node/issues/20964
Fixes: https://github.com/nodejs/node/issues/13512
PR-URL: https://github.com/nodejs/node/pull/17887
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
Diffstat (limited to 'test')
-rw-r--r-- | test/addons-napi/test_threadsafe_function/binding.c | 254 | ||||
-rw-r--r-- | test/addons-napi/test_threadsafe_function/binding.gyp | 8 | ||||
-rw-r--r-- | test/addons-napi/test_threadsafe_function/test.js | 166 |
3 files changed, 428 insertions, 0 deletions
diff --git a/test/addons-napi/test_threadsafe_function/binding.c b/test/addons-napi/test_threadsafe_function/binding.c new file mode 100644 index 0000000000..551705b1f2 --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/binding.c @@ -0,0 +1,254 @@ +// For the purpose of this test we use libuv's threading library. When deciding +// on a threading library for a new project it bears remembering that in the +// future libuv may introduce API changes which may render it non-ABI-stable, +// which, in turn, may affect the ABI stability of the project despite its use +// of N-API. +#include <uv.h> +#define NAPI_EXPERIMENTAL +#include <node_api.h> +#include "../common.h" + +#define ARRAY_LENGTH 10 + +static uv_thread_t uv_threads[2]; +static napi_threadsafe_function ts_fn; + +typedef struct { + napi_threadsafe_function_call_mode block_on_full; + napi_threadsafe_function_release_mode abort; + bool start_secondary; + napi_ref js_finalize_cb; +} ts_fn_hint; + +static ts_fn_hint ts_info; + +// Thread data to transmit to JS +static int ints[ARRAY_LENGTH]; + +static void secondary_thread(void* data) { + napi_threadsafe_function ts_fn = data; + + if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH); + } +} + +// Source thread producing the data +static void data_source_thread(void* data) { + napi_threadsafe_function ts_fn = data; + int index; + void* hint; + ts_fn_hint *ts_fn_info; + napi_status status; + bool queue_was_full = false; + bool queue_was_closing = false; + + if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH); + } + + ts_fn_info = (ts_fn_hint *)hint; + + if (ts_fn_info != &ts_info) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH); + } + + if (ts_fn_info->start_secondary) { + if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH); + } + + if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "failed to start secondary thread", NAPI_AUTO_LENGTH); + } + } + + for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) { + status = napi_call_threadsafe_function(ts_fn, &ints[index], + ts_fn_info->block_on_full); + switch (status) { + case napi_queue_full: + queue_was_full = true; + index++; + // fall through + + case napi_ok: + continue; + + case napi_closing: + queue_was_closing = true; + break; + + default: + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH); + } + } + + // Assert that the enqueuing of a value was refused at least once, if this is + // a non-blocking test run. + if (!ts_fn_info->block_on_full && !queue_was_full) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "queue was never full", NAPI_AUTO_LENGTH); + } + + // Assert that the queue was marked as closing at least once, if this is an + // aborting test run. + if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "queue was never closing", NAPI_AUTO_LENGTH); + } + + if (!queue_was_closing && + napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) { + napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH, + "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH); + } +} + +// Getting the data into JS +static void call_js(napi_env env, napi_value cb, void* hint, void* data) { + if (!(env == NULL || cb == NULL)) { + napi_value argv, undefined; + NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv)); + NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv, + NULL)); + } +} + +// Cleanup +static napi_value StopThread(napi_env env, napi_callback_info info) { + size_t argc = 2; + napi_value argv[2]; + NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_valuetype value_type; + NAPI_CALL(env, napi_typeof(env, argv[0], &value_type)); + NAPI_ASSERT(env, value_type == napi_function, + "StopThread argument is a function"); + NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function"); + NAPI_CALL(env, + napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb))); + bool abort; + NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + NAPI_CALL(env, + napi_release_threadsafe_function(ts_fn, + abort ? napi_tsfn_abort : napi_tsfn_release)); + ts_fn = NULL; + return NULL; +} + +// Join the thread and inform JS that we're done. +static void join_the_threads(napi_env env, void *data, void *hint) { + uv_thread_t *the_threads = data; + ts_fn_hint *the_hint = hint; + napi_value js_cb, undefined; + + uv_thread_join(&the_threads[0]); + if (the_hint->start_secondary) { + uv_thread_join(&the_threads[1]); + } + + NAPI_CALL_RETURN_VOID(env, + napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb)); + NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined)); + NAPI_CALL_RETURN_VOID(env, + napi_call_function(env, undefined, js_cb, 0, NULL, NULL)); + NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env, + the_hint->js_finalize_cb)); +} + +static napi_value StartThreadInternal(napi_env env, + napi_callback_info info, + napi_threadsafe_function_call_js cb, + bool block_on_full) { + size_t argc = 3; + napi_value argv[3]; + + ts_info.block_on_full = + (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking); + + NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function"); + NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL)); + napi_value async_name; + NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test", + NAPI_AUTO_LENGTH, &async_name)); + NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name, + 2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn)); + bool abort; + NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort)); + ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release; + NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary))); + + NAPI_ASSERT(env, + (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0), + "Thread creation"); + + return NULL; +} + +static napi_value Unref(napi_env env, napi_callback_info info) { + NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn)); + return NULL; +} + +static napi_value Release(napi_env env, napi_callback_info info) { + NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function"); + NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release)); + return NULL; +} + +// Startup +static napi_value StartThread(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, call_js, true); +} + +static napi_value StartThreadNonblocking(napi_env env, + napi_callback_info info) { + return StartThreadInternal(env, info, call_js, false); +} + +static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) { + return StartThreadInternal(env, info, NULL, true); +} + +// Module init +static napi_value Init(napi_env env, napi_value exports) { + size_t index; + for (index = 0; index < ARRAY_LENGTH; index++) { + ints[index] = index; + } + napi_value js_array_length; + napi_create_uint32(env, ARRAY_LENGTH, &js_array_length); + + napi_property_descriptor properties[] = { + { + "ARRAY_LENGTH", + NULL, + NULL, + NULL, + NULL, + js_array_length, + napi_enumerable, + NULL + }, + DECLARE_NAPI_PROPERTY("StartThread", StartThread), + DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative), + DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking), + DECLARE_NAPI_PROPERTY("StopThread", StopThread), + DECLARE_NAPI_PROPERTY("Unref", Unref), + DECLARE_NAPI_PROPERTY("Release", Release), + }; + + NAPI_CALL(env, napi_define_properties(env, exports, + sizeof(properties)/sizeof(properties[0]), properties)); + + return exports; +} +NAPI_MODULE(NODE_GYP_MODULE_NAME, Init) diff --git a/test/addons-napi/test_threadsafe_function/binding.gyp b/test/addons-napi/test_threadsafe_function/binding.gyp new file mode 100644 index 0000000000..b60352e05a --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/binding.gyp @@ -0,0 +1,8 @@ +{ + 'targets': [ + { + 'target_name': 'binding', + 'sources': ['binding.c'] + } + ] +} diff --git a/test/addons-napi/test_threadsafe_function/test.js b/test/addons-napi/test_threadsafe_function/test.js new file mode 100644 index 0000000000..8d8a6d9d8c --- /dev/null +++ b/test/addons-napi/test_threadsafe_function/test.js @@ -0,0 +1,166 @@ +'use strict'; + +const common = require('../../common'); +const assert = require('assert'); +const binding = require(`./build/${common.buildType}/binding`); +const { fork } = require('child_process'); +const expectedArray = (function(arrayLength) { + const result = []; + for (let index = 0; index < arrayLength; index++) { + result.push(arrayLength - 1 - index); + } + return result; +})(binding.ARRAY_LENGTH); + +common.crashOnUnhandledRejection(); + +// Handle the rapid teardown test case as the child process. We unref the +// thread-safe function after we have received two values. This causes the +// process to exit and the environment cleanup handler to be invoked. +if (process.argv[2] === 'child') { + let callCount = 0; + binding.StartThread((value) => { + callCount++; + console.log(value); + if (callCount === 2) { + binding.Unref(); + } + }, false /* abort */, true /* launchSecondary */); + + // Release the thread-safe function from the main thread so that it may be + // torn down via the environment cleanup handler. + binding.Release(); + return; +} + +function testWithJSMarshaller({ + threadStarter, + quitAfter, + abort, + launchSecondary }) { + return new Promise((resolve) => { + const array = []; + binding[threadStarter](function testCallback(value) { + array.push(value); + if (array.length === quitAfter) { + setImmediate(() => { + binding.StopThread(common.mustCall(() => { + resolve(array); + }), !!abort); + }); + } + }, !!abort, !!launchSecondary); + if (threadStarter === 'StartThreadNonblocking') { + // Let's make this thread really busy for a short while to ensure that + // the queue fills and the thread receives a napi_queue_full. + const start = Date.now(); + while (Date.now() - start < 200); + } + }); +} + +new Promise(function testWithoutJSMarshaller(resolve) { + let callCount = 0; + binding.StartThreadNoNative(function testCallback() { + callCount++; + + // The default call-into-JS implementation passes no arguments. + assert.strictEqual(arguments.length, 0); + if (callCount === binding.ARRAY_LENGTH) { + setImmediate(() => { + binding.StopThread(common.mustCall(() => { + resolve(); + }), false); + }); + } + }, false /* abort */, false /* launchSecondary */); +}) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit after it's done. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: binding.ARRAY_LENGTH +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1 +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. Launch a secondary thread to test the +// reference counter incrementing functionality. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + launchSecondary: true +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in non-blocking mode, and assert that all values are passed. +// Quit early, but let the thread finish. Launch a secondary thread to test the +// reference counter incrementing functionality. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1, + launchSecondary: true +})) +.then((result) => assert.deepStrictEqual(result, expectedArray)) + +// Start the thread in blocking mode, and assert that it could not finish. +// Quit early and aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThread', + quitAfter: 1, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start the thread in non-blocking mode, and assert that it could not finish. +// Quit early and aborting. +.then(() => testWithJSMarshaller({ + threadStarter: 'StartThreadNonblocking', + quitAfter: 1, + abort: true +})) +.then((result) => assert.strictEqual(result.indexOf(0), -1)) + +// Start a child process to test rapid teardown +.then(() => { + return new Promise((resolve, reject) => { + let output = ''; + const child = fork(__filename, ['child'], { + stdio: [process.stdin, 'pipe', process.stderr, 'ipc'] + }); + child.on('close', (code) => { + if (code === 0) { + resolve(output.match(/\S+/g)); + } else { + reject(new Error('Child process died with code ' + code)); + } + }); + child.stdout.on('data', (data) => (output += data.toString())); + }); +}) +.then((result) => assert.strictEqual(result.indexOf(0), -1)); |