summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorGabriel Schulhof <gabriel.schulhof@intel.com>2018-06-17 11:52:49 -0400
committerGabriel Schulhof <gabriel.schulhof@intel.com>2018-06-29 17:18:46 -0400
commit81f06ba7e49d9c077c209ab9c9de63bad08aa801 (patch)
tree7b7f35909237584e759a4ad9396e2e565b354b38 /test
parent1c303439846de80aaec75285ec5ce087e85b31a6 (diff)
downloadandroid-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.tar.gz
android-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.tar.bz2
android-node-v8-81f06ba7e49d9c077c209ab9c9de63bad08aa801.zip
n-api: add API for asynchronous functions
Bundle a `uv_async_t`, a `uv_idle_t`, a `uv_mutex_t`, a `uv_cond_t`, and a `v8::Persistent<v8::Function>` to make it possible to call into JS from another thread. The API accepts a void data pointer and a callback which will be invoked on the loop thread and which will receive the `napi_value` representing the JavaScript function to call so as to perform the call into JS. The callback is run inside a `node::CallbackScope`. A `std::queue<void*>` is used to store calls from the secondary threads, and an idle loop is started by the `uv_async_t` callback on the loop thread to drain the queue, calling into JS with each item. Items can be added to the queue blockingly or non-blockingly. The thread-safe function can be referenced or unreferenced, with the same semantics as libuv handles. Re: https://github.com/nodejs/help/issues/1035 Re: https://github.com/nodejs/node/issues/20964 Fixes: https://github.com/nodejs/node/issues/13512 PR-URL: https://github.com/nodejs/node/pull/17887 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Michael Dawson <michael_dawson@ca.ibm.com>
Diffstat (limited to 'test')
-rw-r--r--test/addons-napi/test_threadsafe_function/binding.c254
-rw-r--r--test/addons-napi/test_threadsafe_function/binding.gyp8
-rw-r--r--test/addons-napi/test_threadsafe_function/test.js166
3 files changed, 428 insertions, 0 deletions
diff --git a/test/addons-napi/test_threadsafe_function/binding.c b/test/addons-napi/test_threadsafe_function/binding.c
new file mode 100644
index 0000000000..551705b1f2
--- /dev/null
+++ b/test/addons-napi/test_threadsafe_function/binding.c
@@ -0,0 +1,254 @@
+// For the purpose of this test we use libuv's threading library. When deciding
+// on a threading library for a new project it bears remembering that in the
+// future libuv may introduce API changes which may render it non-ABI-stable,
+// which, in turn, may affect the ABI stability of the project despite its use
+// of N-API.
+#include <uv.h>
+#define NAPI_EXPERIMENTAL
+#include <node_api.h>
+#include "../common.h"
+
+#define ARRAY_LENGTH 10
+
+static uv_thread_t uv_threads[2];
+static napi_threadsafe_function ts_fn;
+
+typedef struct {
+ napi_threadsafe_function_call_mode block_on_full;
+ napi_threadsafe_function_release_mode abort;
+ bool start_secondary;
+ napi_ref js_finalize_cb;
+} ts_fn_hint;
+
+static ts_fn_hint ts_info;
+
+// Thread data to transmit to JS
+static int ints[ARRAY_LENGTH];
+
+static void secondary_thread(void* data) {
+ napi_threadsafe_function ts_fn = data;
+
+ if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
+ napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH,
+ "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
+ }
+}
+
+// Source thread producing the data
+static void data_source_thread(void* data) {
+ napi_threadsafe_function ts_fn = data;
+ int index;
+ void* hint;
+ ts_fn_hint *ts_fn_info;
+ napi_status status;
+ bool queue_was_full = false;
+ bool queue_was_closing = false;
+
+ if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) {
+ napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
+ "napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH);
+ }
+
+ ts_fn_info = (ts_fn_hint *)hint;
+
+ if (ts_fn_info != &ts_info) {
+ napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
+ "thread-safe function hint is not as expected", NAPI_AUTO_LENGTH);
+ }
+
+ if (ts_fn_info->start_secondary) {
+ if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) {
+ napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
+ "napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH);
+ }
+
+ if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) {
+ napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
+ "failed to start secondary thread", NAPI_AUTO_LENGTH);
+ }
+ }
+
+ for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
+ status = napi_call_threadsafe_function(ts_fn, &ints[index],
+ ts_fn_info->block_on_full);
+ switch (status) {
+ case napi_queue_full:
+ queue_was_full = true;
+ index++;
+ // fall through
+
+ case napi_ok:
+ continue;
+
+ case napi_closing:
+ queue_was_closing = true;
+ break;
+
+ default:
+ napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
+ "napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH);
+ }
+ }
+
+ // Assert that the enqueuing of a value was refused at least once, if this is
+ // a non-blocking test run.
+ if (!ts_fn_info->block_on_full && !queue_was_full) {
+ napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
+ "queue was never full", NAPI_AUTO_LENGTH);
+ }
+
+ // Assert that the queue was marked as closing at least once, if this is an
+ // aborting test run.
+ if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) {
+ napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
+ "queue was never closing", NAPI_AUTO_LENGTH);
+ }
+
+ if (!queue_was_closing &&
+ napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
+ napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
+ "napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
+ }
+}
+
+// Getting the data into JS
+static void call_js(napi_env env, napi_value cb, void* hint, void* data) {
+ if (!(env == NULL || cb == NULL)) {
+ napi_value argv, undefined;
+ NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
+ NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
+ NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv,
+ NULL));
+ }
+}
+
+// Cleanup
+static napi_value StopThread(napi_env env, napi_callback_info info) {
+ size_t argc = 2;
+ napi_value argv[2];
+ NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
+ napi_valuetype value_type;
+ NAPI_CALL(env, napi_typeof(env, argv[0], &value_type));
+ NAPI_ASSERT(env, value_type == napi_function,
+ "StopThread argument is a function");
+ NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function");
+ NAPI_CALL(env,
+ napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb)));
+ bool abort;
+ NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
+ NAPI_CALL(env,
+ napi_release_threadsafe_function(ts_fn,
+ abort ? napi_tsfn_abort : napi_tsfn_release));
+ ts_fn = NULL;
+ return NULL;
+}
+
+// Join the thread and inform JS that we're done.
+static void join_the_threads(napi_env env, void *data, void *hint) {
+ uv_thread_t *the_threads = data;
+ ts_fn_hint *the_hint = hint;
+ napi_value js_cb, undefined;
+
+ uv_thread_join(&the_threads[0]);
+ if (the_hint->start_secondary) {
+ uv_thread_join(&the_threads[1]);
+ }
+
+ NAPI_CALL_RETURN_VOID(env,
+ napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb));
+ NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
+ NAPI_CALL_RETURN_VOID(env,
+ napi_call_function(env, undefined, js_cb, 0, NULL, NULL));
+ NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env,
+ the_hint->js_finalize_cb));
+}
+
+static napi_value StartThreadInternal(napi_env env,
+ napi_callback_info info,
+ napi_threadsafe_function_call_js cb,
+ bool block_on_full) {
+ size_t argc = 3;
+ napi_value argv[3];
+
+ ts_info.block_on_full =
+ (block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
+
+ NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function");
+ NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
+ napi_value async_name;
+ NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test",
+ NAPI_AUTO_LENGTH, &async_name));
+ NAPI_CALL(env, napi_create_threadsafe_function(env, argv[0], NULL, async_name,
+ 2, 2, uv_threads, join_the_threads, &ts_info, cb, &ts_fn));
+ bool abort;
+ NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
+ ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
+ NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary)));
+
+ NAPI_ASSERT(env,
+ (uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0),
+ "Thread creation");
+
+ return NULL;
+}
+
+static napi_value Unref(napi_env env, napi_callback_info info) {
+ NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
+ NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn));
+ return NULL;
+}
+
+static napi_value Release(napi_env env, napi_callback_info info) {
+ NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
+ NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release));
+ return NULL;
+}
+
+// Startup
+static napi_value StartThread(napi_env env, napi_callback_info info) {
+ return StartThreadInternal(env, info, call_js, true);
+}
+
+static napi_value StartThreadNonblocking(napi_env env,
+ napi_callback_info info) {
+ return StartThreadInternal(env, info, call_js, false);
+}
+
+static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) {
+ return StartThreadInternal(env, info, NULL, true);
+}
+
+// Module init
+static napi_value Init(napi_env env, napi_value exports) {
+ size_t index;
+ for (index = 0; index < ARRAY_LENGTH; index++) {
+ ints[index] = index;
+ }
+ napi_value js_array_length;
+ napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
+
+ napi_property_descriptor properties[] = {
+ {
+ "ARRAY_LENGTH",
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ js_array_length,
+ napi_enumerable,
+ NULL
+ },
+ DECLARE_NAPI_PROPERTY("StartThread", StartThread),
+ DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative),
+ DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
+ DECLARE_NAPI_PROPERTY("StopThread", StopThread),
+ DECLARE_NAPI_PROPERTY("Unref", Unref),
+ DECLARE_NAPI_PROPERTY("Release", Release),
+ };
+
+ NAPI_CALL(env, napi_define_properties(env, exports,
+ sizeof(properties)/sizeof(properties[0]), properties));
+
+ return exports;
+}
+NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)
diff --git a/test/addons-napi/test_threadsafe_function/binding.gyp b/test/addons-napi/test_threadsafe_function/binding.gyp
new file mode 100644
index 0000000000..b60352e05a
--- /dev/null
+++ b/test/addons-napi/test_threadsafe_function/binding.gyp
@@ -0,0 +1,8 @@
+{
+ 'targets': [
+ {
+ 'target_name': 'binding',
+ 'sources': ['binding.c']
+ }
+ ]
+}
diff --git a/test/addons-napi/test_threadsafe_function/test.js b/test/addons-napi/test_threadsafe_function/test.js
new file mode 100644
index 0000000000..8d8a6d9d8c
--- /dev/null
+++ b/test/addons-napi/test_threadsafe_function/test.js
@@ -0,0 +1,166 @@
+'use strict';
+
+const common = require('../../common');
+const assert = require('assert');
+const binding = require(`./build/${common.buildType}/binding`);
+const { fork } = require('child_process');
+const expectedArray = (function(arrayLength) {
+ const result = [];
+ for (let index = 0; index < arrayLength; index++) {
+ result.push(arrayLength - 1 - index);
+ }
+ return result;
+})(binding.ARRAY_LENGTH);
+
+common.crashOnUnhandledRejection();
+
+// Handle the rapid teardown test case as the child process. We unref the
+// thread-safe function after we have received two values. This causes the
+// process to exit and the environment cleanup handler to be invoked.
+if (process.argv[2] === 'child') {
+ let callCount = 0;
+ binding.StartThread((value) => {
+ callCount++;
+ console.log(value);
+ if (callCount === 2) {
+ binding.Unref();
+ }
+ }, false /* abort */, true /* launchSecondary */);
+
+ // Release the thread-safe function from the main thread so that it may be
+ // torn down via the environment cleanup handler.
+ binding.Release();
+ return;
+}
+
+function testWithJSMarshaller({
+ threadStarter,
+ quitAfter,
+ abort,
+ launchSecondary }) {
+ return new Promise((resolve) => {
+ const array = [];
+ binding[threadStarter](function testCallback(value) {
+ array.push(value);
+ if (array.length === quitAfter) {
+ setImmediate(() => {
+ binding.StopThread(common.mustCall(() => {
+ resolve(array);
+ }), !!abort);
+ });
+ }
+ }, !!abort, !!launchSecondary);
+ if (threadStarter === 'StartThreadNonblocking') {
+ // Let's make this thread really busy for a short while to ensure that
+ // the queue fills and the thread receives a napi_queue_full.
+ const start = Date.now();
+ while (Date.now() - start < 200);
+ }
+ });
+}
+
+new Promise(function testWithoutJSMarshaller(resolve) {
+ let callCount = 0;
+ binding.StartThreadNoNative(function testCallback() {
+ callCount++;
+
+ // The default call-into-JS implementation passes no arguments.
+ assert.strictEqual(arguments.length, 0);
+ if (callCount === binding.ARRAY_LENGTH) {
+ setImmediate(() => {
+ binding.StopThread(common.mustCall(() => {
+ resolve();
+ }), false);
+ });
+ }
+ }, false /* abort */, false /* launchSecondary */);
+})
+
+// Start the thread in blocking mode, and assert that all values are passed.
+// Quit after it's done.
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThread',
+ quitAfter: binding.ARRAY_LENGTH
+}))
+.then((result) => assert.deepStrictEqual(result, expectedArray))
+
+// Start the thread in non-blocking mode, and assert that all values are passed.
+// Quit after it's done.
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThreadNonblocking',
+ quitAfter: binding.ARRAY_LENGTH
+}))
+.then((result) => assert.deepStrictEqual(result, expectedArray))
+
+// Start the thread in blocking mode, and assert that all values are passed.
+// Quit early, but let the thread finish.
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThread',
+ quitAfter: 1
+}))
+.then((result) => assert.deepStrictEqual(result, expectedArray))
+
+// Start the thread in non-blocking mode, and assert that all values are passed.
+// Quit early, but let the thread finish.
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThreadNonblocking',
+ quitAfter: 1
+}))
+.then((result) => assert.deepStrictEqual(result, expectedArray))
+
+// Start the thread in blocking mode, and assert that all values are passed.
+// Quit early, but let the thread finish. Launch a secondary thread to test the
+// reference counter incrementing functionality.
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThread',
+ quitAfter: 1,
+ launchSecondary: true
+}))
+.then((result) => assert.deepStrictEqual(result, expectedArray))
+
+// Start the thread in non-blocking mode, and assert that all values are passed.
+// Quit early, but let the thread finish. Launch a secondary thread to test the
+// reference counter incrementing functionality.
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThreadNonblocking',
+ quitAfter: 1,
+ launchSecondary: true
+}))
+.then((result) => assert.deepStrictEqual(result, expectedArray))
+
+// Start the thread in blocking mode, and assert that it could not finish.
+// Quit early and aborting.
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThread',
+ quitAfter: 1,
+ abort: true
+}))
+.then((result) => assert.strictEqual(result.indexOf(0), -1))
+
+// Start the thread in non-blocking mode, and assert that it could not finish.
+// Quit early and aborting.
+.then(() => testWithJSMarshaller({
+ threadStarter: 'StartThreadNonblocking',
+ quitAfter: 1,
+ abort: true
+}))
+.then((result) => assert.strictEqual(result.indexOf(0), -1))
+
+// Start a child process to test rapid teardown
+.then(() => {
+ return new Promise((resolve, reject) => {
+ let output = '';
+ const child = fork(__filename, ['child'], {
+ stdio: [process.stdin, 'pipe', process.stderr, 'ipc']
+ });
+ child.on('close', (code) => {
+ if (code === 0) {
+ resolve(output.match(/\S+/g));
+ } else {
+ reject(new Error('Child process died with code ' + code));
+ }
+ });
+ child.stdout.on('data', (data) => (output += data.toString()));
+ });
+})
+.then((result) => assert.strictEqual(result.indexOf(0), -1));