summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--benchmark/cluster/echo.js11
-rw-r--r--doc/api/child_process.md39
-rw-r--r--doc/api/cluster.md8
-rw-r--r--doc/api/process.md6
-rw-r--r--lib/child_process.js4
-rw-r--r--lib/internal/bootstrap/pre_execution.js6
-rw-r--r--lib/internal/child_process.js59
-rw-r--r--lib/internal/child_process/serialization.js119
-rw-r--r--lib/internal/cluster/master.js1
-rw-r--r--node.gyp1
-rw-r--r--src/stream_base.cc18
-rw-r--r--test/parallel/test-child-process-advanced-serialization.js46
-rw-r--r--test/parallel/test-cluster-advanced-serialization.js22
13 files changed, 302 insertions, 38 deletions
diff --git a/benchmark/cluster/echo.js b/benchmark/cluster/echo.js
index 73c5971cd4..152f2c42f1 100644
--- a/benchmark/cluster/echo.js
+++ b/benchmark/cluster/echo.js
@@ -7,16 +7,25 @@ if (cluster.isMaster) {
workers: [1],
payload: ['string', 'object'],
sendsPerBroadcast: [1, 10],
+ serialization: ['json', 'advanced'],
n: [1e5]
});
- function main({ n, workers, sendsPerBroadcast, payload }) {
+ function main({
+ n,
+ workers,
+ sendsPerBroadcast,
+ payload,
+ serialization
+ }) {
const expectedPerBroadcast = sendsPerBroadcast * workers;
var readies = 0;
var broadcasts = 0;
var msgCount = 0;
var data;
+ cluster.settings.serialization = serialization;
+
switch (payload) {
case 'string':
data = 'hello world!';
diff --git a/doc/api/child_process.md b/doc/api/child_process.md
index 24dd6269f1..a2945ad764 100644
--- a/doc/api/child_process.md
+++ b/doc/api/child_process.md
@@ -321,6 +321,9 @@ arbitrary command execution.**
<!-- YAML
added: v0.5.0
changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/30162
+ description: The `serialization` option is supported now.
- version: v8.0.0
pr-url: https://github.com/nodejs/node/pull/10866
description: The `stdio` option can now be a string.
@@ -340,6 +343,9 @@ changes:
* `execPath` {string} Executable used to create the child process.
* `execArgv` {string[]} List of string arguments passed to the executable.
**Default:** `process.execArgv`.
+ * `serialization` {string} Specify the kind of serialization used for sending
+ messages between processes. Possible values are `'json'` and `'advanced'`.
+ See [Advanced Serialization][] for more details. **Default:** `'json'`.
* `silent` {boolean} If `true`, stdin, stdout, and stderr of the child will be
piped to the parent, otherwise they will be inherited from the parent, see
the `'pipe'` and `'inherit'` options for [`child_process.spawn()`][]'s
@@ -386,6 +392,9 @@ The `shell` option available in [`child_process.spawn()`][] is not supported by
<!-- YAML
added: v0.1.90
changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/30162
+ description: The `serialization` option is supported now.
- version: v8.8.0
pr-url: https://github.com/nodejs/node/pull/15380
description: The `windowsHide` option is supported now.
@@ -411,6 +420,9 @@ changes:
[`options.detached`][]).
* `uid` {number} Sets the user identity of the process (see setuid(2)).
* `gid` {number} Sets the group identity of the process (see setgid(2)).
+ * `serialization` {string} Specify the kind of serialization used for sending
+ messages between processes. Possible values are `'json'` and `'advanced'`.
+ See [Advanced Serialization][] for more details. **Default:** `'json'`.
* `shell` {boolean|string} If `true`, runs `command` inside of a shell. Uses
`'/bin/sh'` on Unix, and `process.env.ComSpec` on Windows. A different
shell can be specified as a string. See [Shell Requirements][] and
@@ -998,6 +1010,11 @@ The `'message'` event is triggered when a child process uses
The message goes through serialization and parsing. The resulting
message might not be the same as what is originally sent.
+If the `serialization` option was set to `'advanced'` used when spawning the
+child process, the `message` argument can contain data that JSON is not able
+to represent.
+See [Advanced Serialization][] for more details.
+
### subprocess.channel
<!-- YAML
added: v7.1.0
@@ -1472,6 +1489,26 @@ the same requirement. Thus, in `child_process` functions where a shell can be
spawned, `'cmd.exe'` is used as a fallback if `process.env.ComSpec` is
unavailable.
+## Advanced Serialization
+<!-- YAML
+added: REPLACEME
+-->
+
+Child processes support a serialization mechanism for IPC that is based on the
+[serialization API of the `v8` module][v8.serdes], based on the
+[HTML structured clone algorithm][]. This is generally more powerful and
+supports more built-in JavaScript object types, such as `BigInt`, `Map`
+and `Set`, `ArrayBuffer` and `TypedArray`, `Buffer`, `Error`, `RegExp` etc.
+
+However, this format is not a full superset of JSON, and e.g. properties set on
+objects of such built-in types will not be passed on through the serialization
+step. Additionally, performance may not be equivalent to that of JSON, depending
+on the structure of the passed data.
+Therefore, this feature requires opting in by setting the
+`serialization` option to `'advanced'` when calling [`child_process.spawn()`][]
+or [`child_process.fork()`][].
+
+[Advanced Serialization]: #child_process_advanced_serialization
[`'disconnect'`]: process.html#process_event_disconnect
[`'error'`]: #child_process_event_error
[`'exit'`]: #child_process_event_exit
@@ -1505,5 +1542,7 @@ unavailable.
[`subprocess.stdout`]: #child_process_subprocess_stdout
[`util.promisify()`]: util.html#util_util_promisify_original
[Default Windows Shell]: #child_process_default_windows_shell
+[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
[Shell Requirements]: #child_process_shell_requirements
[synchronous counterparts]: #child_process_synchronous_process_creation
+[v8.serdes]: v8.html#v8_serialization_api
diff --git a/doc/api/cluster.md b/doc/api/cluster.md
index 112da9b2aa..dbe5080ff3 100644
--- a/doc/api/cluster.md
+++ b/doc/api/cluster.md
@@ -724,6 +724,9 @@ values are `'rr'` and `'none'`.
<!-- YAML
added: v0.7.1
changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/30162
+ description: The `serialization` option is supported now.
- version: v9.5.0
pr-url: https://github.com/nodejs/node/pull/18399
description: The `cwd` option is supported now.
@@ -746,6 +749,10 @@ changes:
**Default:** `process.argv.slice(2)`.
* `cwd` {string} Current working directory of the worker process. **Default:**
`undefined` (inherits from parent process).
+ * `serialization` {string} Specify the kind of serialization used for sending
+ messages between processes. Possible values are `'json'` and `'advanced'`.
+ See [Advanced Serialization for `child_process`][] for more details.
+ **Default:** `false`.
* `silent` {boolean} Whether or not to send output to parent's stdio.
**Default:** `false`.
* `stdio` {Array} Configures the stdio of forked processes. Because the
@@ -874,4 +881,5 @@ socket.on('data', (id) => {
[`process` event: `'message'`]: process.html#process_event_message
[`server.close()`]: net.html#net_event_close
[`worker.exitedAfterDisconnect`]: #cluster_worker_exitedafterdisconnect
+[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization
[Child Process module]: child_process.html#child_process_child_process_fork_modulepath_args_options
diff --git a/doc/api/process.md b/doc/api/process.md
index 9fa2b78654..a9b5c4af09 100644
--- a/doc/api/process.md
+++ b/doc/api/process.md
@@ -119,6 +119,11 @@ the child process.
The message goes through serialization and parsing. The resulting message might
not be the same as what is originally sent.
+If the `serialization` option was set to `advanced` used when spawning the
+process, the `message` argument can contain data that JSON is not able
+to represent.
+See [Advanced Serialization for `child_process`][] for more details.
+
### Event: 'multipleResolves'
<!-- YAML
added: v10.12.0
@@ -2456,6 +2461,7 @@ cases:
[`require.resolve()`]: modules.html#modules_require_resolve_request_options
[`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal
[`v8.setFlagsFromString()`]: v8.html#v8_v8_setflagsfromstring_flags
+[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization
[Android building]: https://github.com/nodejs/node/blob/master/BUILDING.md#androidandroid-based-devices-eg-firefox-os
[Child Process]: child_process.html
[Cluster]: cluster.html
diff --git a/lib/child_process.js b/lib/child_process.js
index 3df73ab5e8..16dc30856a 100644
--- a/lib/child_process.js
+++ b/lib/child_process.js
@@ -108,12 +108,12 @@ function fork(modulePath /* , args, options */) {
return spawn(options.execPath, args, options);
}
-function _forkChild(fd) {
+function _forkChild(fd, serializationMode) {
// set process.send()
const p = new Pipe(PipeConstants.IPC);
p.open(fd);
p.unref();
- const control = setupChannel(process, p);
+ const control = setupChannel(process, p, serializationMode);
process.on('newListener', function onNewListener(name) {
if (name === 'message' || name === 'disconnect') control.ref();
});
diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js
index 80ac97ee45..e58293e616 100644
--- a/lib/internal/bootstrap/pre_execution.js
+++ b/lib/internal/bootstrap/pre_execution.js
@@ -326,7 +326,11 @@ function setupChildProcessIpcChannel() {
// Make sure it's not accidentally inherited by child processes.
delete process.env.NODE_CHANNEL_FD;
- require('child_process')._forkChild(fd);
+ const serializationMode =
+ process.env.NODE_CHANNEL_SERIALIZATION_MODE || 'json';
+ delete process.env.NODE_CHANNEL_SERIALIZATION_MODE;
+
+ require('child_process')._forkChild(fd, serializationMode);
assert(process.send);
}
}
diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js
index 22f7da92ce..9e13650fa3 100644
--- a/lib/internal/child_process.js
+++ b/lib/internal/child_process.js
@@ -1,6 +1,6 @@
'use strict';
-const { JSON, Object } = primordials;
+const { Object } = primordials;
const {
errnoException,
@@ -55,8 +55,6 @@ const {
const { SocketListSend, SocketListReceive } = SocketList;
-// Lazy loaded for startup performance.
-let StringDecoder;
// Lazy loaded for startup performance and to allow monkey patching of
// internalBinding('http_parser').HTTPParser.
let freeParser;
@@ -343,6 +341,15 @@ ChildProcess.prototype.spawn = function(options) {
const ipcFd = stdio.ipcFd;
stdio = options.stdio = stdio.stdio;
+ if (options.serialization !== undefined &&
+ options.serialization !== 'json' &&
+ options.serialization !== 'advanced') {
+ throw new ERR_INVALID_OPT_VALUE('options.serialization',
+ options.serialization);
+ }
+
+ const serialization = options.serialization || 'json';
+
if (ipc !== undefined) {
// Let child process know about opened IPC channel
if (options.envPairs === undefined)
@@ -353,7 +360,8 @@ ChildProcess.prototype.spawn = function(options) {
options.envPairs);
}
- options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd);
+ options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`);
+ options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
}
validateString(options.file, 'options.file');
@@ -446,7 +454,7 @@ ChildProcess.prototype.spawn = function(options) {
this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
// Add .send() method and start listening for IPC data
- if (ipc !== undefined) setupChannel(this, ipc);
+ if (ipc !== undefined) setupChannel(this, ipc, serialization);
return err;
};
@@ -516,7 +524,8 @@ class Control extends EventEmitter {
const channelDeprecationMsg = '_channel is deprecated. ' +
'Use ChildProcess.channel instead.';
-function setupChannel(target, channel) {
+let serialization;
+function setupChannel(target, channel, serializationMode) {
target.channel = channel;
Object.defineProperty(target, '_channel', {
@@ -535,12 +544,16 @@ function setupChannel(target, channel) {
const control = new Control(channel);
- if (StringDecoder === undefined)
- StringDecoder = require('string_decoder').StringDecoder;
- const decoder = new StringDecoder('utf8');
- var jsonBuffer = '';
- var pendingHandle = null;
- channel.buffering = false;
+ if (serialization === undefined)
+ serialization = require('internal/child_process/serialization');
+ const {
+ initMessageChannel,
+ parseChannelMessages,
+ writeChannelMessage
+ } = serialization[serializationMode];
+
+ let pendingHandle = null;
+ initMessageChannel(channel);
channel.pendingHandle = null;
channel.onread = function(arrayBuffer) {
const recvHandle = channel.pendingHandle;
@@ -552,21 +565,7 @@ function setupChannel(target, channel) {
if (recvHandle)
pendingHandle = recvHandle;
- // Linebreak is used as a message end sign
- var chunks = decoder.write(pool).split('\n');
- var numCompleteChunks = chunks.length - 1;
- // Last line does not have trailing linebreak
- var incompleteChunk = chunks[numCompleteChunks];
- if (numCompleteChunks === 0) {
- jsonBuffer += incompleteChunk;
- this.buffering = jsonBuffer.length !== 0;
- return;
- }
- chunks[0] = jsonBuffer + chunks[0];
-
- for (var i = 0; i < numCompleteChunks; i++) {
- var message = JSON.parse(chunks[i]);
-
+ for (const message of parseChannelMessages(channel, pool)) {
// There will be at most one NODE_HANDLE message in every chunk we
// read because SCM_RIGHTS messages don't get coalesced. Make sure
// that we deliver the handle with the right message however.
@@ -581,9 +580,6 @@ function setupChannel(target, channel) {
handleMessage(message, undefined, false);
}
}
- jsonBuffer = incompleteChunk;
- this.buffering = jsonBuffer.length !== 0;
-
} else {
this.buffering = false;
target.disconnect();
@@ -782,8 +778,7 @@ function setupChannel(target, channel) {
const req = new WriteWrap();
- const string = JSON.stringify(message) + '\n';
- const err = channel.writeUtf8String(req, string, handle);
+ const err = writeChannelMessage(channel, req, message, handle);
const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
if (err === 0) {
diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js
new file mode 100644
index 0000000000..1381f29926
--- /dev/null
+++ b/lib/internal/child_process/serialization.js
@@ -0,0 +1,119 @@
+'use strict';
+
+const { JSON } = primordials;
+const { Buffer } = require('buffer');
+const { StringDecoder } = require('string_decoder');
+const v8 = require('v8');
+const { isArrayBufferView } = require('internal/util/types');
+const assert = require('internal/assert');
+
+const kMessageBuffer = Symbol('kMessageBuffer');
+const kJSONBuffer = Symbol('kJSONBuffer');
+const kStringDecoder = Symbol('kStringDecoder');
+
+// Extend V8's serializer APIs to give more JSON-like behaviour in
+// some cases; in particular, for native objects this serializes them the same
+// way that JSON does rather than throwing an exception.
+const kArrayBufferViewTag = 0;
+const kNotArrayBufferViewTag = 1;
+class ChildProcessSerializer extends v8.DefaultSerializer {
+ _writeHostObject(object) {
+ if (isArrayBufferView(object)) {
+ this.writeUint32(kArrayBufferViewTag);
+ return super._writeHostObject(object);
+ } else {
+ this.writeUint32(kNotArrayBufferViewTag);
+ this.writeValue({ ...object });
+ }
+ }
+}
+
+class ChildProcessDeserializer extends v8.DefaultDeserializer {
+ _readHostObject() {
+ const tag = this.readUint32();
+ if (tag === kArrayBufferViewTag)
+ return super._readHostObject();
+
+ assert(tag === kNotArrayBufferViewTag);
+ return this.readValue();
+ }
+}
+
+// Messages are parsed in either of the following formats:
+// - Newline-delimited JSON, or
+// - V8-serialized buffers, prefixed with their length as a big endian uint32
+// (aka 'advanced')
+const advanced = {
+ initMessageChannel(channel) {
+ channel[kMessageBuffer] = Buffer.alloc(0);
+ channel.buffering = false;
+ },
+
+ *parseChannelMessages(channel, readData) {
+ if (readData.length === 0) return;
+
+ let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]);
+ while (messageBuffer.length > 4) {
+ const size = messageBuffer.readUInt32BE();
+ if (messageBuffer.length < 4 + size) {
+ break;
+ }
+
+ const deserializer = new ChildProcessDeserializer(
+ messageBuffer.subarray(4, 4 + size));
+ messageBuffer = messageBuffer.subarray(4 + size);
+
+ deserializer.readHeader();
+ yield deserializer.readValue();
+ }
+ channel[kMessageBuffer] = messageBuffer;
+ channel.buffering = messageBuffer.length > 0;
+ },
+
+ writeChannelMessage(channel, req, message, handle) {
+ const ser = new ChildProcessSerializer();
+ ser.writeHeader();
+ ser.writeValue(message);
+ const serializedMessage = ser.releaseBuffer();
+ const sizeBuffer = Buffer.allocUnsafe(4);
+ sizeBuffer.writeUInt32BE(serializedMessage.length);
+ return channel.writeBuffer(req, Buffer.concat([
+ sizeBuffer,
+ serializedMessage
+ ]), handle);
+ },
+};
+
+const json = {
+ initMessageChannel(channel) {
+ channel[kJSONBuffer] = '';
+ channel[kStringDecoder] = undefined;
+ },
+
+ *parseChannelMessages(channel, readData) {
+ if (readData.length === 0) return;
+
+ if (channel[kStringDecoder] === undefined)
+ channel[kStringDecoder] = new StringDecoder('utf8');
+ const chunks = channel[kStringDecoder].write(readData).split('\n');
+ const numCompleteChunks = chunks.length - 1;
+ // Last line does not have trailing linebreak
+ const incompleteChunk = chunks[numCompleteChunks];
+ if (numCompleteChunks === 0) {
+ channel[kJSONBuffer] += incompleteChunk;
+ } else {
+ chunks[0] = channel[kJSONBuffer] + chunks[0];
+ for (let i = 0; i < numCompleteChunks; i++)
+ yield JSON.parse(chunks[i]);
+ channel[kJSONBuffer] = incompleteChunk;
+ }
+ channel.buffering = channel[kJSONBuffer].length !== 0;
+ },
+
+ writeChannelMessage(channel, req, message, handle) {
+ const string = JSON.stringify(message) + '\n';
+ return channel.writeUtf8String(req, string, handle);
+ },
+};
+
+module.exports = { advanced, json };
diff --git a/lib/internal/cluster/master.js b/lib/internal/cluster/master.js
index a881021c5e..005de8aa1b 100644
--- a/lib/internal/cluster/master.js
+++ b/lib/internal/cluster/master.js
@@ -130,6 +130,7 @@ function createWorkerProcess(id, env) {
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
+ serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
diff --git a/node.gyp b/node.gyp
index 5f8089e617..c472fad452 100644
--- a/node.gyp
+++ b/node.gyp
@@ -91,6 +91,7 @@
'lib/internal/buffer.js',
'lib/internal/cli_table.js',
'lib/internal/child_process.js',
+ 'lib/internal/child_process/serialization.js',
'lib/internal/cluster/child.js',
'lib/internal/cluster/master.js',
'lib/internal/cluster/round_robin_handle.js',
diff --git a/src/stream_base.cc b/src/stream_base.cc
index 52163e2e43..eaccfc995c 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -180,12 +180,26 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
}
Local<Object> req_wrap_obj = args[0].As<Object>();
-
uv_buf_t buf;
buf.base = Buffer::Data(args[1]);
buf.len = Buffer::Length(args[1]);
- StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
+ uv_stream_t* send_handle = nullptr;
+
+ if (args[2]->IsObject() && IsIPCPipe()) {
+ Local<Object> send_handle_obj = args[2].As<Object>();
+
+ HandleWrap* wrap;
+ ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL);
+ send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle());
+ // Reference LibuvStreamWrap instance to prevent it from being garbage
+ // collected before `AfterWrite` is called.
+ req_wrap_obj->Set(env->context(),
+ env->handle_string(),
+ send_handle_obj).Check();
+ }
+
+ StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
SetWriteResult(res);
return res.err;
diff --git a/test/parallel/test-child-process-advanced-serialization.js b/test/parallel/test-child-process-advanced-serialization.js
new file mode 100644
index 0000000000..0424f14a06
--- /dev/null
+++ b/test/parallel/test-child-process-advanced-serialization.js
@@ -0,0 +1,46 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const child_process = require('child_process');
+const { once } = require('events');
+
+if (process.argv[2] !== 'child') {
+ for (const value of [null, 42, Infinity, 'foo']) {
+ common.expectsError(() => {
+ child_process.spawn(process.execPath, [], { serialization: value });
+ }, {
+ code: 'ERR_INVALID_OPT_VALUE',
+ message: `The value "${value}" is invalid ` +
+ 'for option "options.serialization"'
+ });
+ }
+
+ (async () => {
+ const cp = child_process.spawn(process.execPath, [__filename, 'child'],
+ {
+ stdio: ['ipc', 'inherit', 'inherit'],
+ serialization: 'advanced'
+ });
+
+ const circular = {};
+ circular.circular = circular;
+ for await (const message of [
+ { uint8: new Uint8Array(4) },
+ { float64: new Float64Array([ Math.PI ]) },
+ { buffer: Buffer.from('Hello!') },
+ { map: new Map([{ a: 1 }, { b: 2 }]) },
+ { bigInt: 1337n },
+ circular,
+ new Error('Something went wrong'),
+ new RangeError('Something range-y went wrong'),
+ ]) {
+ cp.send(message);
+ const [ received ] = await once(cp, 'message');
+ assert.deepStrictEqual(received, message);
+ }
+
+ cp.disconnect();
+ })().then(common.mustCall());
+} else {
+ process.on('message', (msg) => process.send(msg));
+}
diff --git a/test/parallel/test-cluster-advanced-serialization.js b/test/parallel/test-cluster-advanced-serialization.js
new file mode 100644
index 0000000000..2144d08f28
--- /dev/null
+++ b/test/parallel/test-cluster-advanced-serialization.js
@@ -0,0 +1,22 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const cluster = require('cluster');
+
+if (cluster.isMaster) {
+ cluster.settings.serialization = 'advanced';
+ const worker = cluster.fork();
+ const circular = {};
+ circular.circular = circular;
+
+ worker.on('online', common.mustCall(() => {
+ worker.send(circular);
+
+ worker.on('message', common.mustCall((msg) => {
+ assert.deepStrictEqual(msg, circular);
+ worker.kill();
+ }));
+ }));
+} else {
+ process.on('message', (msg) => process.send(msg));
+}