summaryrefslogtreecommitdiff
path: root/lib/internal
diff options
context:
space:
mode:
Diffstat (limited to 'lib/internal')
-rw-r--r--lib/internal/bootstrap/pre_execution.js6
-rw-r--r--lib/internal/child_process.js59
-rw-r--r--lib/internal/child_process/serialization.js119
-rw-r--r--lib/internal/cluster/master.js1
4 files changed, 152 insertions, 33 deletions
diff --git a/lib/internal/bootstrap/pre_execution.js b/lib/internal/bootstrap/pre_execution.js
index 80ac97ee45..e58293e616 100644
--- a/lib/internal/bootstrap/pre_execution.js
+++ b/lib/internal/bootstrap/pre_execution.js
@@ -326,7 +326,11 @@ function setupChildProcessIpcChannel() {
// Make sure it's not accidentally inherited by child processes.
delete process.env.NODE_CHANNEL_FD;
- require('child_process')._forkChild(fd);
+ const serializationMode =
+ process.env.NODE_CHANNEL_SERIALIZATION_MODE || 'json';
+ delete process.env.NODE_CHANNEL_SERIALIZATION_MODE;
+
+ require('child_process')._forkChild(fd, serializationMode);
assert(process.send);
}
}
diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js
index 22f7da92ce..9e13650fa3 100644
--- a/lib/internal/child_process.js
+++ b/lib/internal/child_process.js
@@ -1,6 +1,6 @@
'use strict';
-const { JSON, Object } = primordials;
+const { Object } = primordials;
const {
errnoException,
@@ -55,8 +55,6 @@ const {
const { SocketListSend, SocketListReceive } = SocketList;
-// Lazy loaded for startup performance.
-let StringDecoder;
// Lazy loaded for startup performance and to allow monkey patching of
// internalBinding('http_parser').HTTPParser.
let freeParser;
@@ -343,6 +341,15 @@ ChildProcess.prototype.spawn = function(options) {
const ipcFd = stdio.ipcFd;
stdio = options.stdio = stdio.stdio;
+ if (options.serialization !== undefined &&
+ options.serialization !== 'json' &&
+ options.serialization !== 'advanced') {
+ throw new ERR_INVALID_OPT_VALUE('options.serialization',
+ options.serialization);
+ }
+
+ const serialization = options.serialization || 'json';
+
if (ipc !== undefined) {
// Let child process know about opened IPC channel
if (options.envPairs === undefined)
@@ -353,7 +360,8 @@ ChildProcess.prototype.spawn = function(options) {
options.envPairs);
}
- options.envPairs.push('NODE_CHANNEL_FD=' + ipcFd);
+ options.envPairs.push(`NODE_CHANNEL_FD=${ipcFd}`);
+ options.envPairs.push(`NODE_CHANNEL_SERIALIZATION_MODE=${serialization}`);
}
validateString(options.file, 'options.file');
@@ -446,7 +454,7 @@ ChildProcess.prototype.spawn = function(options) {
this.stdio.push(stdio[i].socket === undefined ? null : stdio[i].socket);
// Add .send() method and start listening for IPC data
- if (ipc !== undefined) setupChannel(this, ipc);
+ if (ipc !== undefined) setupChannel(this, ipc, serialization);
return err;
};
@@ -516,7 +524,8 @@ class Control extends EventEmitter {
const channelDeprecationMsg = '_channel is deprecated. ' +
'Use ChildProcess.channel instead.';
-function setupChannel(target, channel) {
+let serialization;
+function setupChannel(target, channel, serializationMode) {
target.channel = channel;
Object.defineProperty(target, '_channel', {
@@ -535,12 +544,16 @@ function setupChannel(target, channel) {
const control = new Control(channel);
- if (StringDecoder === undefined)
- StringDecoder = require('string_decoder').StringDecoder;
- const decoder = new StringDecoder('utf8');
- var jsonBuffer = '';
- var pendingHandle = null;
- channel.buffering = false;
+ if (serialization === undefined)
+ serialization = require('internal/child_process/serialization');
+ const {
+ initMessageChannel,
+ parseChannelMessages,
+ writeChannelMessage
+ } = serialization[serializationMode];
+
+ let pendingHandle = null;
+ initMessageChannel(channel);
channel.pendingHandle = null;
channel.onread = function(arrayBuffer) {
const recvHandle = channel.pendingHandle;
@@ -552,21 +565,7 @@ function setupChannel(target, channel) {
if (recvHandle)
pendingHandle = recvHandle;
- // Linebreak is used as a message end sign
- var chunks = decoder.write(pool).split('\n');
- var numCompleteChunks = chunks.length - 1;
- // Last line does not have trailing linebreak
- var incompleteChunk = chunks[numCompleteChunks];
- if (numCompleteChunks === 0) {
- jsonBuffer += incompleteChunk;
- this.buffering = jsonBuffer.length !== 0;
- return;
- }
- chunks[0] = jsonBuffer + chunks[0];
-
- for (var i = 0; i < numCompleteChunks; i++) {
- var message = JSON.parse(chunks[i]);
-
+ for (const message of parseChannelMessages(channel, pool)) {
// There will be at most one NODE_HANDLE message in every chunk we
// read because SCM_RIGHTS messages don't get coalesced. Make sure
// that we deliver the handle with the right message however.
@@ -581,9 +580,6 @@ function setupChannel(target, channel) {
handleMessage(message, undefined, false);
}
}
- jsonBuffer = incompleteChunk;
- this.buffering = jsonBuffer.length !== 0;
-
} else {
this.buffering = false;
target.disconnect();
@@ -782,8 +778,7 @@ function setupChannel(target, channel) {
const req = new WriteWrap();
- const string = JSON.stringify(message) + '\n';
- const err = channel.writeUtf8String(req, string, handle);
+ const err = writeChannelMessage(channel, req, message, handle);
const wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
if (err === 0) {
diff --git a/lib/internal/child_process/serialization.js b/lib/internal/child_process/serialization.js
new file mode 100644
index 0000000000..1381f29926
--- /dev/null
+++ b/lib/internal/child_process/serialization.js
@@ -0,0 +1,119 @@
+'use strict';
+
+const { JSON } = primordials;
+const { Buffer } = require('buffer');
+const { StringDecoder } = require('string_decoder');
+const v8 = require('v8');
+const { isArrayBufferView } = require('internal/util/types');
+const assert = require('internal/assert');
+
+const kMessageBuffer = Symbol('kMessageBuffer');
+const kJSONBuffer = Symbol('kJSONBuffer');
+const kStringDecoder = Symbol('kStringDecoder');
+
+// Extend V8's serializer APIs to give more JSON-like behaviour in
+// some cases; in particular, for native objects this serializes them the same
+// way that JSON does rather than throwing an exception.
+const kArrayBufferViewTag = 0;
+const kNotArrayBufferViewTag = 1;
+class ChildProcessSerializer extends v8.DefaultSerializer {
+ _writeHostObject(object) {
+ if (isArrayBufferView(object)) {
+ this.writeUint32(kArrayBufferViewTag);
+ return super._writeHostObject(object);
+ } else {
+ this.writeUint32(kNotArrayBufferViewTag);
+ this.writeValue({ ...object });
+ }
+ }
+}
+
+class ChildProcessDeserializer extends v8.DefaultDeserializer {
+ _readHostObject() {
+ const tag = this.readUint32();
+ if (tag === kArrayBufferViewTag)
+ return super._readHostObject();
+
+ assert(tag === kNotArrayBufferViewTag);
+ return this.readValue();
+ }
+}
+
+// Messages are parsed in either of the following formats:
+// - Newline-delimited JSON, or
+// - V8-serialized buffers, prefixed with their length as a big endian uint32
+// (aka 'advanced')
+const advanced = {
+ initMessageChannel(channel) {
+ channel[kMessageBuffer] = Buffer.alloc(0);
+ channel.buffering = false;
+ },
+
+ *parseChannelMessages(channel, readData) {
+ if (readData.length === 0) return;
+
+ let messageBuffer = Buffer.concat([channel[kMessageBuffer], readData]);
+ while (messageBuffer.length > 4) {
+ const size = messageBuffer.readUInt32BE();
+ if (messageBuffer.length < 4 + size) {
+ break;
+ }
+
+ const deserializer = new ChildProcessDeserializer(
+ messageBuffer.subarray(4, 4 + size));
+ messageBuffer = messageBuffer.subarray(4 + size);
+
+ deserializer.readHeader();
+ yield deserializer.readValue();
+ }
+ channel[kMessageBuffer] = messageBuffer;
+ channel.buffering = messageBuffer.length > 0;
+ },
+
+ writeChannelMessage(channel, req, message, handle) {
+ const ser = new ChildProcessSerializer();
+ ser.writeHeader();
+ ser.writeValue(message);
+ const serializedMessage = ser.releaseBuffer();
+ const sizeBuffer = Buffer.allocUnsafe(4);
+ sizeBuffer.writeUInt32BE(serializedMessage.length);
+ return channel.writeBuffer(req, Buffer.concat([
+ sizeBuffer,
+ serializedMessage
+ ]), handle);
+ },
+};
+
+const json = {
+ initMessageChannel(channel) {
+ channel[kJSONBuffer] = '';
+ channel[kStringDecoder] = undefined;
+ },
+
+ *parseChannelMessages(channel, readData) {
+ if (readData.length === 0) return;
+
+ if (channel[kStringDecoder] === undefined)
+ channel[kStringDecoder] = new StringDecoder('utf8');
+ const chunks = channel[kStringDecoder].write(readData).split('\n');
+ const numCompleteChunks = chunks.length - 1;
+ // Last line does not have trailing linebreak
+ const incompleteChunk = chunks[numCompleteChunks];
+ if (numCompleteChunks === 0) {
+ channel[kJSONBuffer] += incompleteChunk;
+ } else {
+ chunks[0] = channel[kJSONBuffer] + chunks[0];
+ for (let i = 0; i < numCompleteChunks; i++)
+ yield JSON.parse(chunks[i]);
+ channel[kJSONBuffer] = incompleteChunk;
+ }
+ channel.buffering = channel[kJSONBuffer].length !== 0;
+ },
+
+ writeChannelMessage(channel, req, message, handle) {
+ const string = JSON.stringify(message) + '\n';
+ return channel.writeUtf8String(req, string, handle);
+ },
+};
+
+module.exports = { advanced, json };
diff --git a/lib/internal/cluster/master.js b/lib/internal/cluster/master.js
index a881021c5e..005de8aa1b 100644
--- a/lib/internal/cluster/master.js
+++ b/lib/internal/cluster/master.js
@@ -130,6 +130,7 @@ function createWorkerProcess(id, env) {
return fork(cluster.settings.exec, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
+ serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,