summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorBrian White <mscdex@mscdex.net>2019-01-10 15:52:27 -0500
committerBrian White <mscdex@mscdex.net>2019-08-23 17:05:52 -0400
commit8292b280ec9e6b8c2444cbe49350facc77f5fefa (patch)
tree2f40ac41f20b53b748a7039c4f00e419efb8784b /lib
parent9d21b0395cc248a0e5537a11cc84f61919eccca6 (diff)
downloadandroid-node-v8-8292b280ec9e6b8c2444cbe49350facc77f5fefa.tar.gz
android-node-v8-8292b280ec9e6b8c2444cbe49350facc77f5fefa.tar.bz2
android-node-v8-8292b280ec9e6b8c2444cbe49350facc77f5fefa.zip
net: allow reading data into a static buffer
Co-Authored-By: Anna Henningsen <anna@addaleax.net> PR-URL: https://github.com/nodejs/node/pull/25436 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Diffstat (limited to 'lib')
-rw-r--r--lib/internal/stream_base_commons.js31
-rw-r--r--lib/net.js144
2 files changed, 133 insertions, 42 deletions
diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js
index 88896083f1..eb2e53963d 100644
--- a/lib/internal/stream_base_commons.js
+++ b/lib/internal/stream_base_commons.js
@@ -23,6 +23,7 @@ const {
setUnrefTimeout,
getTimerDuration
} = require('internal/timers');
+const { isUint8Array } = require('internal/util/types');
const { clearTimeout } = require('timers');
const kMaybeDestroy = Symbol('kMaybeDestroy');
@@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle');
const kSession = Symbol('kSession');
const debug = require('internal/util/debuglog').debuglog('stream');
+const kBuffer = Symbol('kBuffer');
+const kBufferGen = Symbol('kBufferGen');
+const kBufferCb = Symbol('kBufferCb');
function handleWriteReq(req, data, encoding) {
const { handle } = req;
@@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) {
stream[kUpdateTimer]();
if (nread > 0 && !stream.destroyed) {
- const offset = streamBaseState[kArrayBufferOffset];
- const buf = new FastBuffer(arrayBuffer, offset, nread);
- if (!stream.push(buf)) {
+ let ret;
+ let result;
+ const userBuf = stream[kBuffer];
+ if (userBuf) {
+ result = (stream[kBufferCb](nread, userBuf) !== false);
+ const bufGen = stream[kBufferGen];
+ if (bufGen !== null) {
+ const nextBuf = bufGen();
+ if (isUint8Array(nextBuf))
+ stream[kBuffer] = ret = nextBuf;
+ }
+ } else {
+ const offset = streamBaseState[kArrayBufferOffset];
+ const buf = new FastBuffer(arrayBuffer, offset, nread);
+ result = stream.push(buf);
+ }
+ if (!result) {
handle.reading = false;
if (!stream.destroyed) {
const err = handle.readStop();
@@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) {
}
}
- return;
+ return ret;
}
if (nread === 0) {
@@ -241,5 +259,8 @@ module.exports = {
kUpdateTimer,
kHandle,
kSession,
- setStreamTimeout
+ setStreamTimeout,
+ kBuffer,
+ kBufferCb,
+ kBufferGen
};
diff --git a/lib/net.js b/lib/net.js
index 2b099d75ef..1eb1d212cd 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -67,7 +67,10 @@ const {
kAfterAsyncWrite,
kHandle,
kUpdateTimer,
- setStreamTimeout
+ setStreamTimeout,
+ kBuffer,
+ kBufferCb,
+ kBufferGen
} = require('internal/stream_base_commons');
const {
codes: {
@@ -86,6 +89,7 @@ const {
exceptionWithHostPort,
uvExceptionWithHostPort
} = require('internal/errors');
+const { isUint8Array } = require('internal/util/types');
const { validateInt32, validateString } = require('internal/validators');
const kLastWriteQueueSize = Symbol('lastWriteQueueSize');
const {
@@ -225,6 +229,18 @@ function initSocketHandle(self) {
self._handle[owner_symbol] = self;
self._handle.onread = onStreamRead;
self[async_id_symbol] = getNewAsyncId(self._handle);
+
+ let userBuf = self[kBuffer];
+ if (userBuf) {
+ const bufGen = self[kBufferGen];
+ if (bufGen !== null) {
+ userBuf = bufGen();
+ if (!isUint8Array(userBuf))
+ return;
+ self[kBuffer] = userBuf;
+ }
+ self._handle.useUserBuffer(userBuf);
+ }
}
}
@@ -247,6 +263,9 @@ function Socket(options) {
this._host = null;
this[kLastWriteQueueSize] = 0;
this[kTimeout] = null;
+ this[kBuffer] = null;
+ this[kBufferCb] = null;
+ this[kBufferGen] = null;
if (typeof options === 'number')
options = { fd: options }; // Legacy interface.
@@ -271,40 +290,55 @@ function Socket(options) {
if (options.handle) {
this._handle = options.handle; // private
this[async_id_symbol] = getNewAsyncId(this._handle);
- } else if (options.fd !== undefined) {
- const { fd } = options;
- let err;
-
- // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
- // a valid `PIPE` or `TCP` descriptor
- this._handle = createHandle(fd, false);
-
- err = this._handle.open(fd);
+ } else {
+ const onread = options.onread;
+ if (onread !== null && typeof onread === 'object' &&
+ (isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
+ typeof onread.callback === 'function') {
+ if (typeof onread.buffer === 'function') {
+ this[kBuffer] = true;
+ this[kBufferGen] = onread.buffer;
+ } else {
+ this[kBuffer] = onread.buffer;
+ }
+ this[kBufferCb] = onread.callback;
+ }
+ if (options.fd !== undefined) {
+ const { fd } = options;
+ let err;
- // While difficult to fabricate, in some architectures
- // `open` may return an error code for valid file descriptors
- // which cannot be opened. This is difficult to test as most
- // un-openable fds will throw on `createHandle`
- if (err)
- throw errnoException(err, 'open');
+ // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
+ // a valid `PIPE` or `TCP` descriptor
+ this._handle = createHandle(fd, false);
- this[async_id_symbol] = this._handle.getAsyncId();
+ err = this._handle.open(fd);
- if ((fd === 1 || fd === 2) &&
- (this._handle instanceof Pipe) &&
- process.platform === 'win32') {
- // Make stdout and stderr blocking on Windows
- err = this._handle.setBlocking(true);
+ // While difficult to fabricate, in some architectures
+ // `open` may return an error code for valid file descriptors
+ // which cannot be opened. This is difficult to test as most
+ // un-openable fds will throw on `createHandle`
if (err)
- throw errnoException(err, 'setBlocking');
-
- this._writev = null;
- this._write = makeSyncWrite(fd);
- // makeSyncWrite adjusts this value like the original handle would, so
- // we need to let it do that by turning it into a writable, own property.
- Object.defineProperty(this._handle, 'bytesWritten', {
- value: 0, writable: true
- });
+ throw errnoException(err, 'open');
+
+ this[async_id_symbol] = this._handle.getAsyncId();
+
+ if ((fd === 1 || fd === 2) &&
+ (this._handle instanceof Pipe) &&
+ process.platform === 'win32') {
+ // Make stdout and stderr blocking on Windows
+ err = this._handle.setBlocking(true);
+ if (err)
+ throw errnoException(err, 'setBlocking');
+
+ this._writev = null;
+ this._write = makeSyncWrite(fd);
+ // makeSyncWrite adjusts this value like the original handle would, so
+ // we need to let it do that by turning it into a writable, own
+ // property.
+ Object.defineProperty(this._handle, 'bytesWritten', {
+ value: 0, writable: true
+ });
+ }
}
}
@@ -514,6 +548,15 @@ Object.defineProperty(Socket.prototype, kUpdateTimer, {
});
+function tryReadStart(socket) {
+ // Not already reading, start the flow
+ debug('Socket._handle.readStart');
+ socket._handle.reading = true;
+ var err = socket._handle.readStart();
+ if (err)
+ socket.destroy(errnoException(err, 'read'));
+}
+
// Just call handle.readStart until we have enough in the buffer
Socket.prototype._read = function(n) {
debug('_read');
@@ -522,12 +565,7 @@ Socket.prototype._read = function(n) {
debug('_read wait for connection');
this.once('connect', () => this._read(n));
} else if (!this._handle.reading) {
- // Not already reading, start the flow
- debug('Socket._read readStart');
- this._handle.reading = true;
- var err = this._handle.readStart();
- if (err)
- this.destroy(errnoException(err, 'read'));
+ tryReadStart(this);
}
};
@@ -539,6 +577,38 @@ Socket.prototype.end = function(data, encoding, callback) {
};
+Socket.prototype.pause = function() {
+ if (this[kBuffer] && !this.connecting && this._handle &&
+ this._handle.reading) {
+ this._handle.reading = false;
+ if (!this.destroyed) {
+ const err = this._handle.readStop();
+ if (err)
+ this.destroy(errnoException(err, 'read'));
+ }
+ }
+ return stream.Duplex.prototype.pause.call(this);
+};
+
+
+Socket.prototype.resume = function() {
+ if (this[kBuffer] && !this.connecting && this._handle &&
+ !this._handle.reading) {
+ tryReadStart(this);
+ }
+ return stream.Duplex.prototype.resume.call(this);
+};
+
+
+Socket.prototype.read = function(n) {
+ if (this[kBuffer] && !this.connecting && this._handle &&
+ !this._handle.reading) {
+ tryReadStart(this);
+ }
+ return stream.Duplex.prototype.read.call(this, n);
+};
+
+
// Called when the 'end' event is emitted.
function onReadableStreamEnd() {
if (!this.allowHalfOpen) {