summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--benchmark/net/net-s2c.js57
-rw-r--r--doc/api/net.md36
-rw-r--r--lib/internal/stream_base_commons.js31
-rw-r--r--lib/net.js144
-rw-r--r--src/stream_base.cc59
-rw-r--r--src/stream_base.h26
-rw-r--r--test/parallel/test-net-onread-static-buffer.js186
7 files changed, 474 insertions, 65 deletions
diff --git a/benchmark/net/net-s2c.js b/benchmark/net/net-s2c.js
index e3c5c7e5eb..d8c26db9bd 100644
--- a/benchmark/net/net-s2c.js
+++ b/benchmark/net/net-s2c.js
@@ -5,33 +5,68 @@ const common = require('../common.js');
const PORT = common.PORT;
const bench = common.createBenchmark(main, {
- len: [64, 102400, 1024 * 1024 * 16],
+ sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024],
type: ['utf', 'asc', 'buf'],
+ recvbuflen: [0, 64 * 1024, 1024 * 1024],
+ recvbufgenfn: ['true', 'false'],
dur: [5]
});
var chunk;
var encoding;
+var recvbuf;
+var received = 0;
+
+function main({ dur, sendchunklen, type, recvbuflen, recvbufgenfn }) {
+ if (isFinite(recvbuflen) && recvbuflen > 0)
+ recvbuf = Buffer.alloc(recvbuflen);
-function main({ dur, len, type }) {
switch (type) {
case 'buf':
- chunk = Buffer.alloc(len, 'x');
+ chunk = Buffer.alloc(sendchunklen, 'x');
break;
case 'utf':
encoding = 'utf8';
- chunk = 'ü'.repeat(len / 2);
+ chunk = 'ü'.repeat(sendchunklen / 2);
break;
case 'asc':
encoding = 'ascii';
- chunk = 'x'.repeat(len);
+ chunk = 'x'.repeat(sendchunklen);
break;
default:
throw new Error(`invalid type: ${type}`);
}
const reader = new Reader();
- const writer = new Writer();
+ var writer;
+ var socketOpts;
+ if (recvbuf === undefined) {
+ writer = new Writer();
+ socketOpts = { port: PORT };
+ } else {
+ let buffer = recvbuf;
+ if (recvbufgenfn === 'true') {
+ let bufidx = -1;
+ const bufpool = [
+ recvbuf,
+ Buffer.from(recvbuf),
+ Buffer.from(recvbuf),
+ ];
+ buffer = () => {
+ bufidx = (bufidx + 1) % bufpool.length;
+ return bufpool[bufidx];
+ };
+ }
+ socketOpts = {
+ port: PORT,
+ onread: {
+ buffer,
+ callback: function(nread, buf) {
+ received += nread;
+ }
+ }
+ };
+ }
// The actual benchmark.
const server = net.createServer((socket) => {
@@ -39,14 +74,15 @@ function main({ dur, len, type }) {
});
server.listen(PORT, () => {
- const socket = net.connect(PORT);
+ const socket = net.connect(socketOpts);
socket.on('connect', () => {
bench.start();
- socket.pipe(writer);
+ if (recvbuf === undefined)
+ socket.pipe(writer);
setTimeout(() => {
- const bytes = writer.received;
+ const bytes = received;
const gbits = (bytes * 8) / (1024 * 1024 * 1024);
bench.end(gbits);
process.exit(0);
@@ -58,12 +94,11 @@ function main({ dur, len, type }) {
const net = require('net');
function Writer() {
- this.received = 0;
this.writable = true;
}
Writer.prototype.write = function(chunk, encoding, cb) {
- this.received += chunk.length;
+ received += chunk.length;
if (typeof encoding === 'function')
encoding();
diff --git a/doc/api/net.md b/doc/api/net.md
index 396cad36ab..a396edf31a 100644
--- a/doc/api/net.md
+++ b/doc/api/net.md
@@ -593,6 +593,9 @@ for the [`'connect'`][] event **once**.
<!-- YAML
added: v0.1.90
changes:
+ - version: REPLACEME
+ pr-url: https://github.com/nodejs/node/pull/25436
+ description: Added `onread` option.
- version: v6.0.0
pr-url: https://github.com/nodejs/node/pull/6021
description: The `hints` option defaults to `0` in all cases now.
@@ -629,6 +632,39 @@ For [IPC][] connections, available `options` are:
See [Identifying paths for IPC connections][]. If provided, the TCP-specific
options above are ignored.
+For both types, available `options` include:
+
+* `onread` {Object} - If specified, incoming data is stored in a single `buffer`
+ and passed to the supplied `callback` when data arrives on the socket.
+ Note: this will cause the streaming functionality to not provide any data,
+ however events like `'error'`, `'end'`, and `'close'` will still be emitted
+ as normal and methods like `pause()` and `resume()` will also behave as
+ expected.
+ * `buffer` {Buffer|Uint8Array|Function} - Either a reusable chunk of memory to
+ use for storing incoming data or a function that returns such.
+ * `callback` {Function} This function is called for every chunk of incoming
+ data. Two arguments are passed to it: the number of bytes written to
+ `buffer` and a reference to `buffer`. Return `false` from this function to
+ implicitly `pause()` the socket. This function will be executed in the
+ global context.
+
+Following is an example of a client using the `onread` option:
+
+```js
+const net = require('net');
+net.connect({
+ port: 80,
+ onread: {
+ // Reuses a 4KiB Buffer for every read from the socket
+ buffer: Buffer.alloc(4 * 1024),
+ callback: function(nread, buf) {
+ // Received data is available in `buf` from 0 to `nread`
+ console.log(buf.toString('utf8', 0, nread));
+ }
+ }
+});
+```
+
#### socket.connect(path[, connectListener])
* `path` {string} Path the client should connect to. See
diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js
index 88896083f1..eb2e53963d 100644
--- a/lib/internal/stream_base_commons.js
+++ b/lib/internal/stream_base_commons.js
@@ -23,6 +23,7 @@ const {
setUnrefTimeout,
getTimerDuration
} = require('internal/timers');
+const { isUint8Array } = require('internal/util/types');
const { clearTimeout } = require('timers');
const kMaybeDestroy = Symbol('kMaybeDestroy');
@@ -32,6 +33,9 @@ const kHandle = Symbol('kHandle');
const kSession = Symbol('kSession');
const debug = require('internal/util/debuglog').debuglog('stream');
+const kBuffer = Symbol('kBuffer');
+const kBufferGen = Symbol('kBufferGen');
+const kBufferCb = Symbol('kBufferCb');
function handleWriteReq(req, data, encoding) {
const { handle } = req;
@@ -161,9 +165,23 @@ function onStreamRead(arrayBuffer) {
stream[kUpdateTimer]();
if (nread > 0 && !stream.destroyed) {
- const offset = streamBaseState[kArrayBufferOffset];
- const buf = new FastBuffer(arrayBuffer, offset, nread);
- if (!stream.push(buf)) {
+ let ret;
+ let result;
+ const userBuf = stream[kBuffer];
+ if (userBuf) {
+ result = (stream[kBufferCb](nread, userBuf) !== false);
+ const bufGen = stream[kBufferGen];
+ if (bufGen !== null) {
+ const nextBuf = bufGen();
+ if (isUint8Array(nextBuf))
+ stream[kBuffer] = ret = nextBuf;
+ }
+ } else {
+ const offset = streamBaseState[kArrayBufferOffset];
+ const buf = new FastBuffer(arrayBuffer, offset, nread);
+ result = stream.push(buf);
+ }
+ if (!result) {
handle.reading = false;
if (!stream.destroyed) {
const err = handle.readStop();
@@ -172,7 +190,7 @@ function onStreamRead(arrayBuffer) {
}
}
- return;
+ return ret;
}
if (nread === 0) {
@@ -241,5 +259,8 @@ module.exports = {
kUpdateTimer,
kHandle,
kSession,
- setStreamTimeout
+ setStreamTimeout,
+ kBuffer,
+ kBufferCb,
+ kBufferGen
};
diff --git a/lib/net.js b/lib/net.js
index 2b099d75ef..1eb1d212cd 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -67,7 +67,10 @@ const {
kAfterAsyncWrite,
kHandle,
kUpdateTimer,
- setStreamTimeout
+ setStreamTimeout,
+ kBuffer,
+ kBufferCb,
+ kBufferGen
} = require('internal/stream_base_commons');
const {
codes: {
@@ -86,6 +89,7 @@ const {
exceptionWithHostPort,
uvExceptionWithHostPort
} = require('internal/errors');
+const { isUint8Array } = require('internal/util/types');
const { validateInt32, validateString } = require('internal/validators');
const kLastWriteQueueSize = Symbol('lastWriteQueueSize');
const {
@@ -225,6 +229,18 @@ function initSocketHandle(self) {
self._handle[owner_symbol] = self;
self._handle.onread = onStreamRead;
self[async_id_symbol] = getNewAsyncId(self._handle);
+
+ let userBuf = self[kBuffer];
+ if (userBuf) {
+ const bufGen = self[kBufferGen];
+ if (bufGen !== null) {
+ userBuf = bufGen();
+ if (!isUint8Array(userBuf))
+ return;
+ self[kBuffer] = userBuf;
+ }
+ self._handle.useUserBuffer(userBuf);
+ }
}
}
@@ -247,6 +263,9 @@ function Socket(options) {
this._host = null;
this[kLastWriteQueueSize] = 0;
this[kTimeout] = null;
+ this[kBuffer] = null;
+ this[kBufferCb] = null;
+ this[kBufferGen] = null;
if (typeof options === 'number')
options = { fd: options }; // Legacy interface.
@@ -271,40 +290,55 @@ function Socket(options) {
if (options.handle) {
this._handle = options.handle; // private
this[async_id_symbol] = getNewAsyncId(this._handle);
- } else if (options.fd !== undefined) {
- const { fd } = options;
- let err;
-
- // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
- // a valid `PIPE` or `TCP` descriptor
- this._handle = createHandle(fd, false);
-
- err = this._handle.open(fd);
+ } else {
+ const onread = options.onread;
+ if (onread !== null && typeof onread === 'object' &&
+ (isUint8Array(onread.buffer) || typeof onread.buffer === 'function') &&
+ typeof onread.callback === 'function') {
+ if (typeof onread.buffer === 'function') {
+ this[kBuffer] = true;
+ this[kBufferGen] = onread.buffer;
+ } else {
+ this[kBuffer] = onread.buffer;
+ }
+ this[kBufferCb] = onread.callback;
+ }
+ if (options.fd !== undefined) {
+ const { fd } = options;
+ let err;
- // While difficult to fabricate, in some architectures
- // `open` may return an error code for valid file descriptors
- // which cannot be opened. This is difficult to test as most
- // un-openable fds will throw on `createHandle`
- if (err)
- throw errnoException(err, 'open');
+ // createHandle will throw ERR_INVALID_FD_TYPE if `fd` is not
+ // a valid `PIPE` or `TCP` descriptor
+ this._handle = createHandle(fd, false);
- this[async_id_symbol] = this._handle.getAsyncId();
+ err = this._handle.open(fd);
- if ((fd === 1 || fd === 2) &&
- (this._handle instanceof Pipe) &&
- process.platform === 'win32') {
- // Make stdout and stderr blocking on Windows
- err = this._handle.setBlocking(true);
+ // While difficult to fabricate, in some architectures
+ // `open` may return an error code for valid file descriptors
+ // which cannot be opened. This is difficult to test as most
+ // un-openable fds will throw on `createHandle`
if (err)
- throw errnoException(err, 'setBlocking');
-
- this._writev = null;
- this._write = makeSyncWrite(fd);
- // makeSyncWrite adjusts this value like the original handle would, so
- // we need to let it do that by turning it into a writable, own property.
- Object.defineProperty(this._handle, 'bytesWritten', {
- value: 0, writable: true
- });
+ throw errnoException(err, 'open');
+
+ this[async_id_symbol] = this._handle.getAsyncId();
+
+ if ((fd === 1 || fd === 2) &&
+ (this._handle instanceof Pipe) &&
+ process.platform === 'win32') {
+ // Make stdout and stderr blocking on Windows
+ err = this._handle.setBlocking(true);
+ if (err)
+ throw errnoException(err, 'setBlocking');
+
+ this._writev = null;
+ this._write = makeSyncWrite(fd);
+ // makeSyncWrite adjusts this value like the original handle would, so
+ // we need to let it do that by turning it into a writable, own
+ // property.
+ Object.defineProperty(this._handle, 'bytesWritten', {
+ value: 0, writable: true
+ });
+ }
}
}
@@ -514,6 +548,15 @@ Object.defineProperty(Socket.prototype, kUpdateTimer, {
});
+function tryReadStart(socket) {
+ // Not already reading, start the flow
+ debug('Socket._handle.readStart');
+ socket._handle.reading = true;
+ var err = socket._handle.readStart();
+ if (err)
+ socket.destroy(errnoException(err, 'read'));
+}
+
// Just call handle.readStart until we have enough in the buffer
Socket.prototype._read = function(n) {
debug('_read');
@@ -522,12 +565,7 @@ Socket.prototype._read = function(n) {
debug('_read wait for connection');
this.once('connect', () => this._read(n));
} else if (!this._handle.reading) {
- // Not already reading, start the flow
- debug('Socket._read readStart');
- this._handle.reading = true;
- var err = this._handle.readStart();
- if (err)
- this.destroy(errnoException(err, 'read'));
+ tryReadStart(this);
}
};
@@ -539,6 +577,38 @@ Socket.prototype.end = function(data, encoding, callback) {
};
+Socket.prototype.pause = function() {
+ if (this[kBuffer] && !this.connecting && this._handle &&
+ this._handle.reading) {
+ this._handle.reading = false;
+ if (!this.destroyed) {
+ const err = this._handle.readStop();
+ if (err)
+ this.destroy(errnoException(err, 'read'));
+ }
+ }
+ return stream.Duplex.prototype.pause.call(this);
+};
+
+
+Socket.prototype.resume = function() {
+ if (this[kBuffer] && !this.connecting && this._handle &&
+ !this._handle.reading) {
+ tryReadStart(this);
+ }
+ return stream.Duplex.prototype.resume.call(this);
+};
+
+
+Socket.prototype.read = function(n) {
+ if (this[kBuffer] && !this.connecting && this._handle &&
+ !this._handle.reading) {
+ tryReadStart(this);
+ }
+ return stream.Duplex.prototype.read.call(this, n);
+};
+
+
// Called when the 'end' event is emitted.
function onReadableStreamEnd() {
if (!this.allowHalfOpen) {
diff --git a/src/stream_base.cc b/src/stream_base.cc
index 695d19c123..52163e2e43 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -26,6 +26,7 @@ using v8::FunctionCallbackInfo;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
+using v8::MaybeLocal;
using v8::Object;
using v8::ReadOnly;
using v8::String;
@@ -50,6 +51,13 @@ int StreamBase::ReadStopJS(const FunctionCallbackInfo<Value>& args) {
return ReadStop();
}
+int StreamBase::UseUserBuffer(const FunctionCallbackInfo<Value>& args) {
+ CHECK(Buffer::HasInstance(args[0]));
+
+ uv_buf_t buf = uv_buf_init(Buffer::Data(args[0]), Buffer::Length(args[0]));
+ PushStreamListener(new CustomBufferJSListener(buf));
+ return 0;
+}
int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
CHECK(args[0]->IsObject());
@@ -291,19 +299,22 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
}
-void StreamBase::CallJSOnreadMethod(ssize_t nread,
- Local<ArrayBuffer> ab,
- size_t offset) {
+MaybeLocal<Value> StreamBase::CallJSOnreadMethod(ssize_t nread,
+ Local<ArrayBuffer> ab,
+ size_t offset,
+ StreamBaseJSChecks checks) {
Environment* env = env_;
DCHECK_EQ(static_cast<int32_t>(nread), nread);
DCHECK_LE(offset, INT32_MAX);
- if (ab.IsEmpty()) {
- DCHECK_EQ(offset, 0);
- DCHECK_LE(nread, 0);
- } else {
- DCHECK_GE(nread, 0);
+ if (checks == DONT_SKIP_NREAD_CHECKS) {
+ if (ab.IsEmpty()) {
+ DCHECK_EQ(offset, 0);
+ DCHECK_LE(nread, 0);
+ } else {
+ DCHECK_GE(nread, 0);
+ }
}
env->stream_base_state()[kReadBytesOrError] = nread;
@@ -317,7 +328,7 @@ void StreamBase::CallJSOnreadMethod(ssize_t nread,
CHECK_NOT_NULL(wrap);
Local<Value> onread = wrap->object()->GetInternalField(kOnReadFunctionField);
CHECK(onread->IsFunction());
- wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
+ return wrap->MakeCallback(onread.As<Function>(), arraysize(argv), argv);
}
@@ -366,6 +377,9 @@ void StreamBase::AddMethods(Environment* env, Local<FunctionTemplate> t) {
env->SetProtoMethod(t, "readStart", JSMethod<&StreamBase::ReadStartJS>);
env->SetProtoMethod(t, "readStop", JSMethod<&StreamBase::ReadStopJS>);
env->SetProtoMethod(t, "shutdown", JSMethod<&StreamBase::Shutdown>);
+ env->SetProtoMethod(t,
+ "useUserBuffer",
+ JSMethod<&StreamBase::UseUserBuffer>);
env->SetProtoMethod(t, "writev", JSMethod<&StreamBase::Writev>);
env->SetProtoMethod(t, "writeBuffer", JSMethod<&StreamBase::WriteBuffer>);
env->SetProtoMethod(
@@ -445,6 +459,7 @@ void StreamResource::ClearError() {
// No-op
}
+
uv_buf_t EmitToJSStreamListener::OnStreamAlloc(size_t suggested_size) {
CHECK_NOT_NULL(stream_);
Environment* env = static_cast<StreamBase*>(stream_)->stream_env();
@@ -472,6 +487,32 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf_) {
}
+uv_buf_t CustomBufferJSListener::OnStreamAlloc(size_t suggested_size) {
+ return buffer_;
+}
+
+
+void CustomBufferJSListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
+ CHECK_NOT_NULL(stream_);
+ CHECK_EQ(buf.base, buffer_.base);
+
+ StreamBase* stream = static_cast<StreamBase*>(stream_);
+ Environment* env = stream->stream_env();
+ HandleScope handle_scope(env->isolate());
+ Context::Scope context_scope(env->context());
+
+ MaybeLocal<Value> ret = stream->CallJSOnreadMethod(nread,
+ Local<ArrayBuffer>(),
+ 0,
+ StreamBase::SKIP_NREAD_CHECKS);
+ Local<Value> next_buf_v;
+ if (ret.ToLocal(&next_buf_v) && !next_buf_v->IsUndefined()) {
+ buffer_.base = Buffer::Data(next_buf_v);
+ buffer_.len = Buffer::Length(next_buf_v);
+ }
+}
+
+
void ReportWritesToJSStreamListener::OnStreamAfterReqFinished(
StreamReq* req_wrap, int status) {
StreamBase* stream = static_cast<StreamBase*>(stream_);
diff --git a/src/stream_base.h b/src/stream_base.h
index 3550233290..3bfdaedb79 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -180,6 +180,21 @@ class EmitToJSStreamListener : public ReportWritesToJSStreamListener {
};
+// An alternative listener that uses a custom, user-provided buffer
+// for reading data.
+class CustomBufferJSListener : public ReportWritesToJSStreamListener {
+ public:
+ uv_buf_t OnStreamAlloc(size_t suggested_size) override;
+ void OnStreamRead(ssize_t nread, const uv_buf_t& buf) override;
+ void OnStreamDestroy() override { delete this; }
+
+ explicit CustomBufferJSListener(uv_buf_t buffer) : buffer_(buffer) {}
+
+ private:
+ uv_buf_t buffer_;
+};
+
+
// A generic stream, comparable to JS land’s `Duplex` streams.
// A stream is always controlled through one `StreamListener` instance.
class StreamResource {
@@ -273,9 +288,13 @@ class StreamBase : public StreamResource {
virtual bool IsIPCPipe();
virtual int GetFD();
- void CallJSOnreadMethod(ssize_t nread,
- v8::Local<v8::ArrayBuffer> ab,
- size_t offset = 0);
+ enum StreamBaseJSChecks { DONT_SKIP_NREAD_CHECKS, SKIP_NREAD_CHECKS };
+
+ v8::MaybeLocal<v8::Value> CallJSOnreadMethod(
+ ssize_t nread,
+ v8::Local<v8::ArrayBuffer> ab,
+ size_t offset = 0,
+ StreamBaseJSChecks checks = DONT_SKIP_NREAD_CHECKS);
// This is named `stream_env` to avoid name clashes, because a lot of
// subclasses are also `BaseObject`s.
@@ -323,6 +342,7 @@ class StreamBase : public StreamResource {
int WriteBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
template <enum encoding enc>
int WriteString(const v8::FunctionCallbackInfo<v8::Value>& args);
+ int UseUserBuffer(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetFD(const v8::FunctionCallbackInfo<v8::Value>& args);
static void GetExternal(const v8::FunctionCallbackInfo<v8::Value>& args);
diff --git a/test/parallel/test-net-onread-static-buffer.js b/test/parallel/test-net-onread-static-buffer.js
new file mode 100644
index 0000000000..ce722f69cd
--- /dev/null
+++ b/test/parallel/test-net-onread-static-buffer.js
@@ -0,0 +1,186 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const net = require('net');
+
+const message = Buffer.from('hello world');
+
+// Test typical usage
+net.createServer(common.mustCall(function(socket) {
+ this.close();
+ socket.end(message);
+})).listen(0, function() {
+ let received = 0;
+ const buffers = [];
+ const sockBuf = Buffer.alloc(8);
+ net.connect({
+ port: this.address().port,
+ onread: {
+ buffer: sockBuf,
+ callback: function(nread, buf) {
+ assert.strictEqual(buf, sockBuf);
+ received += nread;
+ buffers.push(Buffer.from(buf.slice(0, nread)));
+ }
+ }
+ }).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
+ assert.strictEqual(received, message.length);
+ assert.deepStrictEqual(Buffer.concat(buffers), message);
+ }));
+});
+
+// Test Uint8Array support
+net.createServer(common.mustCall(function(socket) {
+ this.close();
+ socket.end(message);
+})).listen(0, function() {
+ let received = 0;
+ let incoming = new Uint8Array(0);
+ const sockBuf = new Uint8Array(8);
+ net.connect({
+ port: this.address().port,
+ onread: {
+ buffer: sockBuf,
+ callback: function(nread, buf) {
+ assert.strictEqual(buf, sockBuf);
+ received += nread;
+ const newIncoming = new Uint8Array(incoming.length + nread);
+ newIncoming.set(incoming);
+ newIncoming.set(buf.slice(0, nread), incoming.length);
+ incoming = newIncoming;
+ }
+ }
+ }).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
+ assert.strictEqual(received, message.length);
+ assert.deepStrictEqual(incoming, new Uint8Array(message));
+ }));
+});
+
+// Test Buffer callback usage
+net.createServer(common.mustCall(function(socket) {
+ this.close();
+ socket.end(message);
+})).listen(0, function() {
+ let received = 0;
+ const incoming = [];
+ const bufPool = [ Buffer.alloc(2), Buffer.alloc(2), Buffer.alloc(2) ];
+ let bufPoolIdx = -1;
+ let bufPoolUsage = 0;
+ net.connect({
+ port: this.address().port,
+ onread: {
+ buffer: () => {
+ ++bufPoolUsage;
+ bufPoolIdx = (bufPoolIdx + 1) % bufPool.length;
+ return bufPool[bufPoolIdx];
+ },
+ callback: function(nread, buf) {
+ assert.strictEqual(buf, bufPool[bufPoolIdx]);
+ received += nread;
+ incoming.push(Buffer.from(buf.slice(0, nread)));
+ }
+ }
+ }).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
+ assert.strictEqual(received, message.length);
+ assert.deepStrictEqual(Buffer.concat(incoming), message);
+ assert.strictEqual(bufPoolUsage, 7);
+ }));
+});
+
+// Test Uint8Array callback support
+net.createServer(common.mustCall(function(socket) {
+ this.close();
+ socket.end(message);
+})).listen(0, function() {
+ let received = 0;
+ let incoming = new Uint8Array(0);
+ const bufPool = [ new Uint8Array(2), new Uint8Array(2), new Uint8Array(2) ];
+ let bufPoolIdx = -1;
+ let bufPoolUsage = 0;
+ net.connect({
+ port: this.address().port,
+ onread: {
+ buffer: () => {
+ ++bufPoolUsage;
+ bufPoolIdx = (bufPoolIdx + 1) % bufPool.length;
+ return bufPool[bufPoolIdx];
+ },
+ callback: function(nread, buf) {
+ assert.strictEqual(buf, bufPool[bufPoolIdx]);
+ received += nread;
+ const newIncoming = new Uint8Array(incoming.length + nread);
+ newIncoming.set(incoming);
+ newIncoming.set(buf.slice(0, nread), incoming.length);
+ incoming = newIncoming;
+ }
+ }
+ }).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
+ assert.strictEqual(received, message.length);
+ assert.deepStrictEqual(incoming, new Uint8Array(message));
+ assert.strictEqual(bufPoolUsage, 7);
+ }));
+});
+
+// Test explicit socket pause
+net.createServer(common.mustCall(function(socket) {
+ this.close();
+ socket.end(message);
+})).listen(0, function() {
+ let received = 0;
+ const buffers = [];
+ const sockBuf = Buffer.alloc(8);
+ let paused = false;
+ net.connect({
+ port: this.address().port,
+ onread: {
+ buffer: sockBuf,
+ callback: function(nread, buf) {
+ assert.strictEqual(paused, false);
+ assert.strictEqual(buf, sockBuf);
+ received += nread;
+ buffers.push(Buffer.from(buf.slice(0, nread)));
+ paused = true;
+ this.pause();
+ setTimeout(() => {
+ paused = false;
+ this.resume();
+ }, 100);
+ }
+ }
+ }).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
+ assert.strictEqual(received, message.length);
+ assert.deepStrictEqual(Buffer.concat(buffers), message);
+ }));
+});
+
+// Test implicit socket pause
+net.createServer(common.mustCall(function(socket) {
+ this.close();
+ socket.end(message);
+})).listen(0, function() {
+ let received = 0;
+ const buffers = [];
+ const sockBuf = Buffer.alloc(8);
+ let paused = false;
+ net.connect({
+ port: this.address().port,
+ onread: {
+ buffer: sockBuf,
+ callback: function(nread, buf) {
+ assert.strictEqual(paused, false);
+ assert.strictEqual(buf, sockBuf);
+ received += nread;
+ buffers.push(Buffer.from(buf.slice(0, nread)));
+ paused = true;
+ setTimeout(() => {
+ paused = false;
+ this.resume();
+ }, 100);
+ return false;
+ }
+ }
+ }).on('data', common.mustNotCall()).on('end', common.mustCall(() => {
+ assert.strictEqual(received, message.length);
+ assert.deepStrictEqual(Buffer.concat(buffers), message);
+ }));
+});