diff options
author | Brian White <mscdex@mscdex.net> | 2019-01-10 15:52:27 -0500 |
---|---|---|
committer | Brian White <mscdex@mscdex.net> | 2019-08-23 17:05:52 -0400 |
commit | 8292b280ec9e6b8c2444cbe49350facc77f5fefa (patch) | |
tree | 2f40ac41f20b53b748a7039c4f00e419efb8784b /lib | |
parent | 9d21b0395cc248a0e5537a11cc84f61919eccca6 (diff) | |
download | android-node-v8-8292b280ec9e6b8c2444cbe49350facc77f5fefa.tar.gz android-node-v8-8292b280ec9e6b8c2444cbe49350facc77f5fefa.tar.bz2 android-node-v8-8292b280ec9e6b8c2444cbe49350facc77f5fefa.zip |
net: allow reading data into a static buffer
Co-Authored-By: Anna Henningsen <anna@addaleax.net>
PR-URL: https://github.com/nodejs/node/pull/25436
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/internal/stream_base_commons.js | 31 | ||||
-rw-r--r-- | lib/net.js | 144 |
2 files changed, 133 insertions, 42 deletions
diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 88896083f1..eb2e53963d 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -23,6 +23,7 @@ const { setUnrefTimeout, getTimerDuration } = require('internal/timers'); +const { isUint8Array } = require('internal/util/types'); const { clearTimeout } = require('timers'); const kMaybeDestroy = Symbol('kMaybeDestroy'); @@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle'); const kSession = Symbol('kSession'); const debug = require('internal/util/debuglog').debuglog('stream'); +const kBuffer = Symbol('kBuffer'); +const kBufferGen = Symbol('kBufferGen'); +const kBufferCb = Symbol('kBufferCb'); function handleWriteReq(req, data, encoding) { const { handle } = req; @@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) { stream[kUpdateTimer](); if (nread > 0 && !stream.destroyed) { - const offset = streamBaseState[kArrayBufferOffset]; - const buf = new FastBuffer(arrayBuffer, offset, nread); - if (!stream.push(buf)) { + let ret; + let result; + const userBuf = stream[kBuffer]; + if (userBuf) { + result = (stream[kBufferCb](nread, userBuf) !== false); + const bufGen = stream[kBufferGen]; + if (bufGen !== null) { + const nextBuf = bufGen(); + if (isUint8Array(nextBuf)) + stream[kBuffer] = ret = nextBuf; + } + } else { + const offset = streamBaseState[kArrayBufferOffset]; + const buf = new FastBuffer(arrayBuffer, offset, nread); + result = stream.push(buf); + } + if (!result) { handle.reading = false; if (!stream.destroyed) { const err = handle.readStop(); @@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) { } } - return; + return ret; } if (nread === 0) { @@ -241,5 +259,8 @@ module.exports = { kUpdateTimer, kHandle, kSession, - setStreamTimeout + setStreamTimeout, + kBuffer, + kBufferCb, + kBufferGen }; diff --git a/lib/net.js b/lib/net.js index 2b099d75ef..1eb1d212cd 100644 --- a/lib/net.js +++ b/lib/net.js @@ -67,7 +67,10 @@ const { kAfterAsyncWrite, kHandle, kUpdateTimer, - setStreamTimeout + setStreamTimeout, + kBuffer, + kBufferCb, + kBufferGen } = require('internal/stream_base_commons'); const { codes: { @@ -86,6 +89,7 @@ const { exceptionWithHostPort, uvExceptionWithHostPort } = require('internal/errors'); +const { isUint8Array } = require('internal/util/types'); const { validateInt32, validateString } = require('internal/validators'); const kLastWriteQueueSize = Symbol('lastWriteQueueSize'); const { @@ -225,6 +229,18 @@ function initSocketHandle(self) { self._handle[owner_symbol] = self; self._handle.onread = onStreamRead; self[async_id_symbol] = getNewAsyncId(self._handle); + + let userBuf = self[kBuffer]; + if (userBuf) { + const bufGen = self[kBufferGen]; + if (bufGen !== null) { + userBuf = bufGen(); + if (!isUint8Array(userBuf)) + return; + self[kBuffer] = userBuf; + } + self._handle.useUserBuffer(userBuf); + } } } @@ -247,6 +263,9 @@ function Socket(options) { this._host = null; this[kLastWriteQueueSize] = 0; this[kTimeout] = null; + this[kBuffer] = null; + this[kBufferCb] = null; + this[kBufferGen] = null; if (typeof options === 'number') options = { fd: options }; // Legacy interface. @@ -271,40 +290,55 @@ function Socket(options) { if (options.handle) { this._handle = options.handle; // private this[async_id_symbol] = getNewAsyncId(this._handle); - } else if (options.fd !== undefined) { - const { fd } = options; - let err; - - // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not - // a valid `PIPE` or `TCP` descriptor - this._handle = createHandle(fd, false); - - err = this._handle.open(fd); + } else { + const onread = options.onread; + if (onread !== null && typeof onread === 'object' && + (isUint8Array(onread.buffer) || typeof onread.buffer === 'function') && + typeof onread.callback === 'function') { + if (typeof onread.buffer === 'function') { + this[kBuffer] = true; + this[kBufferGen] = onread.buffer; + } else { + this[kBuffer] = onread.buffer; + } + this[kBufferCb] = onread.callback; + } + if (options.fd !== undefined) { + const { fd } = options; + let err; - // While difficult to fabricate, in some architectures - // `open` may return an error code for valid file descriptors - // which cannot be opened. This is difficult to test as most - // un-openable fds will throw on `createHandle` - if (err) - throw errnoException(err, 'open'); + // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not + // a valid `PIPE` or `TCP` descriptor + this._handle = createHandle(fd, false); - this[async_id_symbol] = this._handle.getAsyncId(); + err = this._handle.open(fd); - if ((fd === 1 || fd === 2) && - (this._handle instanceof Pipe) && - process.platform === 'win32') { - // Make stdout and stderr blocking on Windows - err = this._handle.setBlocking(true); + // While difficult to fabricate, in some architectures + // `open` may return an error code for valid file descriptors + // which cannot be opened. This is difficult to test as most + // un-openable fds will throw on `createHandle` if (err) - throw errnoException(err, 'setBlocking'); - - this._writev = null; - this._write = makeSyncWrite(fd); - // makeSyncWrite adjusts this value like the original handle would, so - // we need to let it do that by turning it into a writable, own property. - Object.defineProperty(this._handle, 'bytesWritten', { - value: 0, writable: true - }); + throw errnoException(err, 'open'); + + this[async_id_symbol] = this._handle.getAsyncId(); + + if ((fd === 1 || fd === 2) && + (this._handle instanceof Pipe) && + process.platform === 'win32') { + // Make stdout and stderr blocking on Windows + err = this._handle.setBlocking(true); + if (err) + throw errnoException(err, 'setBlocking'); + + this._writev = null; + this._write = makeSyncWrite(fd); + // makeSyncWrite adjusts this value like the original handle would, so + // we need to let it do that by turning it into a writable, own + // property. + Object.defineProperty(this._handle, 'bytesWritten', { + value: 0, writable: true + }); + } } } @@ -514,6 +548,15 @@ Object.defineProperty(Socket.prototype, kUpdateTimer, { }); +function tryReadStart(socket) { + // Not already reading, start the flow + debug('Socket._handle.readStart'); + socket._handle.reading = true; + var err = socket._handle.readStart(); + if (err) + socket.destroy(errnoException(err, 'read')); +} + // Just call handle.readStart until we have enough in the buffer Socket.prototype._read = function(n) { debug('_read'); @@ -522,12 +565,7 @@ Socket.prototype._read = function(n) { debug('_read wait for connection'); this.once('connect', () => this._read(n)); } else if (!this._handle.reading) { - // Not already reading, start the flow - debug('Socket._read readStart'); - this._handle.reading = true; - var err = this._handle.readStart(); - if (err) - this.destroy(errnoException(err, 'read')); + tryReadStart(this); } }; @@ -539,6 +577,38 @@ Socket.prototype.end = function(data, encoding, callback) { }; +Socket.prototype.pause = function() { + if (this[kBuffer] && !this.connecting && this._handle && + this._handle.reading) { + this._handle.reading = false; + if (!this.destroyed) { + const err = this._handle.readStop(); + if (err) + this.destroy(errnoException(err, 'read')); + } + } + return stream.Duplex.prototype.pause.call(this); +}; + + +Socket.prototype.resume = function() { + if (this[kBuffer] && !this.connecting && this._handle && + !this._handle.reading) { + tryReadStart(this); + } + return stream.Duplex.prototype.resume.call(this); +}; + + +Socket.prototype.read = function(n) { + if (this[kBuffer] && !this.connecting && this._handle && + !this._handle.reading) { + tryReadStart(this); + } + return stream.Duplex.prototype.read.call(this, n); +}; + + // Called when the 'end' event is emitted. function onReadableStreamEnd() { if (!this.allowHalfOpen) { |