diff options
-rw-r--r-- | benchmark/cluster/echo.js | 11 | ||||
-rw-r--r-- | doc/api/child_process.md | 39 | ||||
-rw-r--r-- | doc/api/cluster.md | 8 | ||||
-rw-r--r-- | doc/api/process.md | 6 | ||||
-rw-r--r-- | lib/child_process.js | 4 | ||||
-rw-r--r-- | lib/internal/bootstrap/pre_execution.js | 6 | ||||
-rw-r--r-- | lib/internal/child_process.js | 59 | ||||
-rw-r--r-- | lib/internal/child_process/serialization.js | 119 | ||||
-rw-r--r-- | lib/internal/cluster/master.js | 1 | ||||
-rw-r--r-- | node.gyp | 1 | ||||
-rw-r--r-- | src/stream_base.cc | 18 | ||||
-rw-r--r-- | test/parallel/test-child-process-advanced-serialization.js | 46 | ||||
-rw-r--r-- | test/parallel/test-cluster-advanced-serialization.js | 22 |
13 files changed, 302 insertions, 38 deletions
diff --git a/benchmark/cluster/echo.js b/benchmark/cluster/echo.js index 73c5971cd4..152f2c42f1 100644 --- a/benchmark/cluster/echo.js +++ b/benchmark/cluster/echo.js @@ -7,16 +7,25 @@ if (cluster.isMaster) { workers: [1], payload: ['string', 'object'], sendsPerBroadcast: [1, 10], + serialization: ['json', 'advanced'], n: [1e5] }); - function main({ n, workers, sendsPerBroadcast, payload }) { + function main({ + n, + workers, + sendsPerBroadcast, + payload, + serialization + }) { const expectedPerBroadcast = sendsPerBroadcast * workers; var readies = 0; var broadcasts = 0; var msgCount = 0; var data; + cluster.settings.serialization = serialization; + switch (payload) { case 'string': data = 'hello world!'; diff --git a/doc/api/child_process.md b/doc/api/child_process.md index 24dd6269f1..a2945ad764 100644 --- a/doc/api/child_process.md +++ b/doc/api/child_process.md @@ -321,6 +321,9 @@ arbitrary command execution.** <!-- YAML added: v0.5.0 changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/30162 + description: The `serialization` option is supported now. - version: v8.0.0 pr-url: https://github.com/nodejs/node/pull/10866 description: The `stdio` option can now be a string. @@ -340,6 +343,9 @@ changes: * `execPath` {string} Executable used to create the child process. * `execArgv` {string[]} List of string arguments passed to the executable. **Default:** `process.execArgv`. + * `serialization` {string} Specify the kind of serialization used for sending + messages between processes. Possible values are `'json'` and `'advanced'`. + See [Advanced Serialization][] for more details. **Default:** `'json'`. * `silent` {boolean} If `true`, stdin, stdout, and stderr of the child will be piped to the parent, otherwise they will be inherited from the parent, see the `'pipe'` and `'inherit'` options for [`child_process.spawn()`][]'s @@ -386,6 +392,9 @@ The `shell` option available in [`child_process.spawn()`][] is not supported by <!-- YAML added: v0.1.90 changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/30162 + description: The `serialization` option is supported now. - version: v8.8.0 pr-url: https://github.com/nodejs/node/pull/15380 description: The `windowsHide` option is supported now. @@ -411,6 +420,9 @@ changes: [`options.detached`][]). * `uid` {number} Sets the user identity of the process (see setuid(2)). * `gid` {number} Sets the group identity of the process (see setgid(2)). + * `serialization` {string} Specify the kind of serialization used for sending + messages between processes. Possible values are `'json'` and `'advanced'`. + See [Advanced Serialization][] for more details. **Default:** `'json'`. * `shell` {boolean|string} If `true`, runs `command` inside of a shell. Uses `'/bin/sh'` on Unix, and `process.env.ComSpec` on Windows. A different shell can be specified as a string. See [Shell Requirements][] and @@ -998,6 +1010,11 @@ The `'message'` event is triggered when a child process uses The message goes through serialization and parsing. The resulting message might not be the same as what is originally sent. +If the `serialization` option was set to `'advanced'` used when spawning the +child process, the `message` argument can contain data that JSON is not able +to represent. +See [Advanced Serialization][] for more details. + ### subprocess.channel <!-- YAML added: v7.1.0 @@ -1472,6 +1489,26 @@ the same requirement. Thus, in `child_process` functions where a shell can be spawned, `'cmd.exe'` is used as a fallback if `process.env.ComSpec` is unavailable. +## Advanced Serialization +<!-- YAML +added: REPLACEME +--> + +Child processes support a serialization mechanism for IPC that is based on the +[serialization API of the `v8` module][v8.serdes], based on the +[HTML structured clone algorithm][]. This is generally more powerful and +supports more built-in JavaScript object types, such as `BigInt`, `Map` +and `Set`, `ArrayBuffer` and `TypedArray`, `Buffer`, `Error`, `RegExp` etc. + +However, this format is not a full superset of JSON, and e.g. properties set on +objects of such built-in types will not be passed on through the serialization +step. Additionally, performance may not be equivalent to that of JSON, depending +on the structure of the passed data. +Therefore, this feature requires opting in by setting the +`serialization` option to `'advanced'` when calling [`child_process.spawn()`][] +or [`child_process.fork()`][]. + +[Advanced Serialization]: #child_process_advanced_serialization [`'disconnect'`]: process.html#process_event_disconnect [`'error'`]: #child_process_event_error [`'exit'`]: #child_process_event_exit @@ -1505,5 +1542,7 @@ unavailable. [`subprocess.stdout`]: #child_process_subprocess_stdout [`util.promisify()`]: util.html#util_util_promisify_original [Default Windows Shell]: #child_process_default_windows_shell +[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm [Shell Requirements]: #child_process_shell_requirements [synchronous counterparts]: #child_process_synchronous_process_creation +[v8.serdes]: v8.html#v8_serialization_api diff --git a/doc/api/cluster.md b/doc/api/cluster.md index 112da9b2aa..dbe5080ff3 100644 --- a/doc/api/cluster.md +++ b/doc/api/cluster.md @@ -724,6 +724,9 @@ values are `'rr'` and `'none'`. <!-- YAML added: v0.7.1 changes: + - version: REPLACEME + pr-url: https://github.com/nodejs/node/pull/30162 + description: The `serialization` option is supported now. - version: v9.5.0 pr-url: https://github.com/nodejs/node/pull/18399 description: The `cwd` option is supported now. @@ -746,6 +749,10 @@ changes: **Default:** `process.argv.slice(2)`. * `cwd` {string} Current working directory of the worker process. **Default:** `undefined` (inherits from parent process). + * `serialization` {string} Specify the kind of serialization used for sending + messages between processes. Possible values are `'json'` and `'advanced'`. + See [Advanced Serialization for `child_process`][] for more details. + **Default:** `false`. * `silent` {boolean} Whether or not to send output to parent's stdio. **Default:** `false`. * `stdio` {Array} Configures the stdio of forked processes. Because the @@ -874,4 +881,5 @@ socket.on('data', (id) => { [`process` event: `'message'`]: process.html#process_event_message [`server.close()`]: net.html#net_event_close [`worker.exitedAfterDisconnect`]: #cluster_worker_exitedafterdisconnect +[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization [Child Process module]: child_process.html#child_process_child_process_fork_modulepath_args_options diff --git a/doc/api/process.md b/doc/api/process.md index 9fa2b78654..a9b5c4af09 100644 --- a/doc/api/process.md +++ b/doc/api/process.md @@ -119,6 +119,11 @@ the child process. The message goes through serialization and parsing. The resulting message might not be the same as what is originally sent. +If the `serialization` option was set to `advanced` used when spawning the +process, the `message` argument can contain data that JSON is not able +to represent. +See [Advanced Serialization for `child_process`][] for more details. + ### Event: 'multipleResolves' <!-- YAML added: v10.12.0 @@ -2456,6 +2461,7 @@ cases: [`require.resolve()`]: modules.html#modules_require_resolve_request_options [`subprocess.kill()`]: child_process.html#child_process_subprocess_kill_signal [`v8.setFlagsFromString()`]: v8.html#v8_v8_setflagsfromstring_flags +[Advanced Serialization for `child_process`]: child_process.html#child_process_advanced_serialization [Android building]: https://github.com/nodejs/node/blob/master/BUILDING.md#androidandroid-based-devices-eg-firefox-os [Child Process]: child_process.html [Cluster]: cluster.html diff --git a/lib/child_process.js b/lib/child_process.js index 3df73ab5e8..16dc30856a 100644 --- a/lib/child_process.js +++ b/lib/child_process.js @@ -108,12 +108,12 @@ function fork(modulePath /* , args, options */) { return spawn(options.execPath, args, options); } -function _forkChild(fd) { +function _forkChild(fd, serializationMode) { // set process.send() const p = new Pipe(PipeConstants.IPC); p.open(fd); p.unref(); - const control = setupChannel(process, p); + const control = setupChannel(process, p, serializationMode); process.on('newListener', function onNewListener(name) { if (name === 'message' || name === 'disconnect') control.ref(); }); diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js index 80ac97ee45..e58293e616 100644 --- a/lib/internal/bootstrap/pre_execution.js +++ b/lib/internal/bootstrap/pre_execution.js @@ -326,7 +326,11 @@ function setupChildProcessIpcChannel() { // Make sure it's not accidentally inherited by child processes. delete process.env.NODE_CHANNEL_FD; - require('child_process')._forkChild(fd); + const serializationMode = + process.env.NODE_CHANNEL_SERIALIZATION_MODE || 'json'; + delete process.env.NODE_CHANNEL_SERIALIZATION_MODE; + + require('child_process')._forkChild(fd, serializationMode); assert(process.send); } } diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index 22f7da92ce..9e13650fa3 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -1,6 +1,6 @@ 'use strict'; -const { JSON, Object } = primordials; +const { Object } = primordials; const { errnoException, @@ -55,8 +55,6 @@ const { const { SocketListSend, SocketListReceive } = SocketList; -// Lazy loaded for startup performance. -let StringDecoder; // Lazy loaded for startup performance and to allow monkey patching of // internalBinding('http_parser').HTTPParser. let freeParser; @@ -343,6 +341,15 @@ ChildProcess.prototype.spawn = function(options) { const ipcFd = stdio.ipcFd; stdio = options.stdio = stdio.stdio; + if (options.serialization !== undefined && + options.serialization !== 'json' && + options.serialization !== 'advanced') { + throw new ERR_INVALID_OPT_VALUE('options.serialization', + options.serialization); + } + + const serialization = options.serialization || 'json'; + if (ipc !== undefined) { // Let child process know about opened IPC channel if (options.envPairs === undefined) @@ -353,7 +360,8 @@ ChildProcess.prototype.spawn = function(options) { options.envPairs); } - options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd); + options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`); + options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`); } validateString(options.file, 'options.file'); @@ -446,7 +454,7 @@ ChildProcess.prototype.spawn = function(options) { this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket); // Add .send() method and start listening for IPC data - if (ipc !== undefined) setupChannel(this, ipc); + if (ipc !== undefined) setupChannel(this, ipc, serialization); return err; }; @@ -516,7 +524,8 @@ class Control extends EventEmitter { const channelDeprecationMsg = '_channel is deprecated. ' + 'Use ChildProcess.channel instead.'; -function setupChannel(target, channel) { +let serialization; +function setupChannel(target, channel, serializationMode) { target.channel = channel; Object.defineProperty(target, '_channel', { @@ -535,12 +544,16 @@ function setupChannel(target, channel) { const control = new Control(channel); - if (StringDecoder === undefined) - StringDecoder = require('string_decoder').StringDecoder; - const decoder = new StringDecoder('utf8'); - var jsonBuffer = ''; - var pendingHandle = null; - channel.buffering = false; + if (serialization === undefined) + serialization = require('internal/child_process/serialization'); + const { + initMessageChannel, + parseChannelMessages, + writeChannelMessage + } = serialization[serializationMode]; + + let pendingHandle = null; + initMessageChannel(channel); channel.pendingHandle = null; channel.onread = function(arrayBuffer) { const recvHandle = channel.pendingHandle; @@ -552,21 +565,7 @@ function setupChannel(target, channel) { if (recvHandle) pendingHandle = recvHandle; - // Linebreak is used as a message end sign - var chunks = decoder.write(pool).split('\n'); - var numCompleteChunks = chunks.length - 1; - // Last line does not have trailing linebreak - var incompleteChunk = chunks[numCompleteChunks]; - if (numCompleteChunks === 0) { - jsonBuffer += incompleteChunk; - this.buffering = jsonBuffer.length !== 0; - return; - } - chunks[0] = jsonBuffer + chunks[0]; - - for (var i = 0; i < numCompleteChunks; i++) { - var message = JSON.parse(chunks[i]); - + for (const message of parseChannelMessages(channel, pool)) { // There will be at most one NODE_HANDLE message in every chunk we // read because SCM_RIGHTS messages don't get coalesced. Make sure // that we deliver the handle with the right message however. @@ -581,9 +580,6 @@ function setupChannel(target, channel) { handleMessage(message, undefined, false); } } - jsonBuffer = incompleteChunk; - this.buffering = jsonBuffer.length !== 0; - } else { this.buffering = false; target.disconnect(); @@ -782,8 +778,7 @@ function setupChannel(target, channel) { const req = new WriteWrap(); - const string = JSON.stringify(message) + '\n'; - const err = channel.writeUtf8String(req, string, handle); + const err = writeChannelMessage(channel, req, message, handle); const wasAsyncWrite = streamBaseState[kLastWriteWasAsync]; if (err === 0) { diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js new file mode 100644 index 0000000000..1381f29926 --- /dev/null +++ b/lib/internal/child_process/serialization.js @@ -0,0 +1,119 @@ +'use strict'; + +const { JSON } = primordials; +const { Buffer } = require('buffer'); +const { StringDecoder } = require('string_decoder'); +const v8 = require('v8'); +const { isArrayBufferView } = require('internal/util/types'); +const assert = require('internal/assert'); + +const kMessageBuffer = Symbol('kMessageBuffer'); +const kJSONBuffer = Symbol('kJSONBuffer'); +const kStringDecoder = Symbol('kStringDecoder'); + +// Extend V8's serializer APIs to give more JSON-like behaviour in +// some cases; in particular, for native objects this serializes them the same +// way that JSON does rather than throwing an exception. +const kArrayBufferViewTag = 0; +const kNotArrayBufferViewTag = 1; +class ChildProcessSerializer extends v8.DefaultSerializer { + _writeHostObject(object) { + if (isArrayBufferView(object)) { + this.writeUint32(kArrayBufferViewTag); + return super._writeHostObject(object); + } else { + this.writeUint32(kNotArrayBufferViewTag); + this.writeValue({ ...object }); + } + } +} + +class ChildProcessDeserializer extends v8.DefaultDeserializer { + _readHostObject() { + const tag = this.readUint32(); + if (tag === kArrayBufferViewTag) + return super._readHostObject(); + + assert(tag === kNotArrayBufferViewTag); + return this.readValue(); + } +} + +// Messages are parsed in either of the following formats: +// - Newline-delimited JSON, or +// - V8-serialized buffers, prefixed with their length as a big endian uint32 +// (aka 'advanced') +const advanced = { + initMessageChannel(channel) { + channel[kMessageBuffer] = Buffer.alloc(0); + channel.buffering = false; + }, + + *parseChannelMessages(channel, readData) { + if (readData.length === 0) return; + + let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]); + while (messageBuffer.length > 4) { + const size = messageBuffer.readUInt32BE(); + if (messageBuffer.length < 4 + size) { + break; + } + + const deserializer = new ChildProcessDeserializer( + messageBuffer.subarray(4, 4 + size)); + messageBuffer = messageBuffer.subarray(4 + size); + + deserializer.readHeader(); + yield deserializer.readValue(); + } + channel[kMessageBuffer] = messageBuffer; + channel.buffering = messageBuffer.length > 0; + }, + + writeChannelMessage(channel, req, message, handle) { + const ser = new ChildProcessSerializer(); + ser.writeHeader(); + ser.writeValue(message); + const serializedMessage = ser.releaseBuffer(); + const sizeBuffer = Buffer.allocUnsafe(4); + sizeBuffer.writeUInt32BE(serializedMessage.length); + return channel.writeBuffer(req, Buffer.concat([ + sizeBuffer, + serializedMessage + ]), handle); + }, +}; + +const json = { + initMessageChannel(channel) { + channel[kJSONBuffer] = ''; + channel[kStringDecoder] = undefined; + }, + + *parseChannelMessages(channel, readData) { + if (readData.length === 0) return; + + if (channel[kStringDecoder] === undefined) + channel[kStringDecoder] = new StringDecoder('utf8'); + const chunks = channel[kStringDecoder].write(readData).split('\n'); + const numCompleteChunks = chunks.length - 1; + // Last line does not have trailing linebreak + const incompleteChunk = chunks[numCompleteChunks]; + if (numCompleteChunks === 0) { + channel[kJSONBuffer] += incompleteChunk; + } else { + chunks[0] = channel[kJSONBuffer] + chunks[0]; + for (let i = 0; i < numCompleteChunks; i++) + yield JSON.parse(chunks[i]); + channel[kJSONBuffer] = incompleteChunk; + } + channel.buffering = channel[kJSONBuffer].length !== 0; + }, + + writeChannelMessage(channel, req, message, handle) { + const string = JSON.stringify(message) + '\n'; + return channel.writeUtf8String(req, string, handle); + }, +}; + +module.exports = { advanced, json }; diff --git a/lib/internal/cluster/master.js b/lib/internal/cluster/master.js index a881021c5e..005de8aa1b 100644 --- a/lib/internal/cluster/master.js +++ b/lib/internal/cluster/master.js @@ -130,6 +130,7 @@ function createWorkerProcess(id, env) { return fork(cluster.settings.exec, cluster.settings.args, { cwd: cluster.settings.cwd, env: workerEnv, + serialization: cluster.settings.serialization, silent: cluster.settings.silent, windowsHide: cluster.settings.windowsHide, execArgv: execArgv, @@ -91,6 +91,7 @@ 'lib/internal/buffer.js', 'lib/internal/cli_table.js', 'lib/internal/child_process.js', + 'lib/internal/child_process/serialization.js', 'lib/internal/cluster/child.js', 'lib/internal/cluster/master.js', 'lib/internal/cluster/round_robin_handle.js', diff --git a/src/stream_base.cc b/src/stream_base.cc index 52163e2e43..eaccfc995c 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -180,12 +180,26 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) { } Local<Object> req_wrap_obj = args[0].As<Object>(); - uv_buf_t buf; buf.base = Buffer::Data(args[1]); buf.len = Buffer::Length(args[1]); - StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj); + uv_stream_t* send_handle = nullptr; + + if (args[2]->IsObject() && IsIPCPipe()) { + Local<Object> send_handle_obj = args[2].As<Object>(); + + HandleWrap* wrap; + ASSIGN_OR_RETURN_UNWRAP(&wrap, send_handle_obj, UV_EINVAL); + send_handle = reinterpret_cast<uv_stream_t*>(wrap->GetHandle()); + // Reference LibuvStreamWrap instance to prevent it from being garbage + // collected before `AfterWrite` is called. + req_wrap_obj->Set(env->context(), + env->handle_string(), + send_handle_obj).Check(); + } + + StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); SetWriteResult(res); return res.err; diff --git a/test/parallel/test-child-process-advanced-serialization.js b/test/parallel/test-child-process-advanced-serialization.js new file mode 100644 index 0000000000..0424f14a06 --- /dev/null +++ b/test/parallel/test-child-process-advanced-serialization.js @@ -0,0 +1,46 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const child_process = require('child_process'); +const { once } = require('events'); + +if (process.argv[2] !== 'child') { + for (const value of [null, 42, Infinity, 'foo']) { + common.expectsError(() => { + child_process.spawn(process.execPath, [], { serialization: value }); + }, { + code: 'ERR_INVALID_OPT_VALUE', + message: `The value "${value}" is invalid ` + + 'for option "options.serialization"' + }); + } + + (async () => { + const cp = child_process.spawn(process.execPath, [__filename, 'child'], + { + stdio: ['ipc', 'inherit', 'inherit'], + serialization: 'advanced' + }); + + const circular = {}; + circular.circular = circular; + for await (const message of [ + { uint8: new Uint8Array(4) }, + { float64: new Float64Array([ Math.PI ]) }, + { buffer: Buffer.from('Hello!') }, + { map: new Map([{ a: 1 }, { b: 2 }]) }, + { bigInt: 1337n }, + circular, + new Error('Something went wrong'), + new RangeError('Something range-y went wrong'), + ]) { + cp.send(message); + const [ received ] = await once(cp, 'message'); + assert.deepStrictEqual(received, message); + } + + cp.disconnect(); + })().then(common.mustCall()); +} else { + process.on('message', (msg) => process.send(msg)); +} diff --git a/test/parallel/test-cluster-advanced-serialization.js b/test/parallel/test-cluster-advanced-serialization.js new file mode 100644 index 0000000000..2144d08f28 --- /dev/null +++ b/test/parallel/test-cluster-advanced-serialization.js @@ -0,0 +1,22 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const cluster = require('cluster'); + +if (cluster.isMaster) { + cluster.settings.serialization = 'advanced'; + const worker = cluster.fork(); + const circular = {}; + circular.circular = circular; + + worker.on('online', common.mustCall(() => { + worker.send(circular); + + worker.on('message', common.mustCall((msg) => { + assert.deepStrictEqual(msg, circular); + worker.kill(); + })); + })); +} else { + process.on('message', (msg) => process.send(msg)); +} |