summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--benchmark/net/tcp-raw-c2s.js8
-rw-r--r--benchmark/net/tcp-raw-pipe.js16
-rw-r--r--benchmark/net/tcp-raw-s2c.js8
-rw-r--r--lib/internal/child_process.js15
-rw-r--r--lib/internal/http2/core.js9
-rw-r--r--lib/internal/stream_base_commons.js14
-rw-r--r--src/env-inl.h5
-rw-r--r--src/env.cc1
-rw-r--r--src/env.h3
-rw-r--r--src/node_http2.cc5
-rw-r--r--src/stream_base.cc33
-rw-r--r--src/stream_base.h12
-rw-r--r--src/stream_wrap.cc5
-rw-r--r--test/parallel/test-net-end-close.js6
-rw-r--r--test/parallel/test-process-wrap.js5
-rw-r--r--test/parallel/test-tcp-wrap-listen.js14
16 files changed, 115 insertions, 44 deletions
diff --git a/benchmark/net/tcp-raw-c2s.js b/benchmark/net/tcp-raw-c2s.js
index 1f10ae7c83..116cf57a23 100644
--- a/benchmark/net/tcp-raw-c2s.js
+++ b/benchmark/net/tcp-raw-c2s.js
@@ -46,15 +46,15 @@ function main({ dur, len, type }) {
process.exit(0);
}, dur * 1000);
- clientHandle.onread = function(nread, buffer) {
+ clientHandle.onread = function(buffer) {
// we're not expecting to ever get an EOF from the client.
// just lots of data forever.
- if (nread < 0)
- fail(nread, 'read');
+ if (!buffer)
+ fail('read');
// don't slice the buffer. the point of this is to isolate, not
// simulate real traffic.
- bytes += buffer.length;
+ bytes += buffer.byteLength;
};
clientHandle.readStart();
diff --git a/benchmark/net/tcp-raw-pipe.js b/benchmark/net/tcp-raw-pipe.js
index 16dc6955c4..7144c237af 100644
--- a/benchmark/net/tcp-raw-pipe.js
+++ b/benchmark/net/tcp-raw-pipe.js
@@ -43,15 +43,15 @@ function main({ dur, len, type }) {
if (err)
fail(err, 'connect');
- clientHandle.onread = function(nread, buffer) {
+ clientHandle.onread = function(buffer) {
// we're not expecting to ever get an EOF from the client.
// just lots of data forever.
- if (nread < 0)
- fail(nread, 'read');
+ if (!buffer)
+ fail('read');
const writeReq = new WriteWrap();
writeReq.async = false;
- err = clientHandle.writeBuffer(writeReq, buffer);
+ err = clientHandle.writeBuffer(writeReq, Buffer.from(buffer));
if (err)
fail(err, 'write');
@@ -89,11 +89,11 @@ function main({ dur, len, type }) {
if (err)
fail(err, 'connect');
- clientHandle.onread = function(nread, buffer) {
- if (nread < 0)
- fail(nread, 'read');
+ clientHandle.onread = function(buffer) {
+ if (!buffer)
+ fail('read');
- bytes += buffer.length;
+ bytes += buffer.byteLength;
};
connectReq.oncomplete = function(err) {
diff --git a/benchmark/net/tcp-raw-s2c.js b/benchmark/net/tcp-raw-s2c.js
index 1700d23890..fbb7d2520c 100644
--- a/benchmark/net/tcp-raw-s2c.js
+++ b/benchmark/net/tcp-raw-s2c.js
@@ -109,15 +109,15 @@ function main({ dur, len, type }) {
connectReq.oncomplete = function() {
var bytes = 0;
- clientHandle.onread = function(nread, buffer) {
+ clientHandle.onread = function(buffer) {
// we're not expecting to ever get an EOF from the client.
// just lots of data forever.
- if (nread < 0)
- fail(nread, 'read');
+ if (!buffer)
+ fail('read');
// don't slice the buffer. the point of this is to isolate, not
// simulate real traffic.
- bytes += buffer.length;
+ bytes += buffer.byteLength;
};
clientHandle.readStart();
diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js
index e6cdde56c1..74d69de0dc 100644
--- a/lib/internal/child_process.js
+++ b/lib/internal/child_process.js
@@ -22,7 +22,12 @@ const util = require('util');
const assert = require('assert');
const { Process } = internalBinding('process_wrap');
-const { WriteWrap } = internalBinding('stream_wrap');
+const {
+ WriteWrap,
+ kReadBytesOrError,
+ kArrayBufferOffset,
+ streamBaseState
+} = internalBinding('stream_wrap');
const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
const { TCP } = internalBinding('tcp_wrap');
const { TTY } = internalBinding('tty_wrap');
@@ -486,11 +491,13 @@ function setupChannel(target, channel) {
var pendingHandle = null;
channel.buffering = false;
channel.pendingHandle = null;
- channel.onread = function(nread, pool) {
+ channel.onread = function(arrayBuffer) {
const recvHandle = channel.pendingHandle;
channel.pendingHandle = null;
- // TODO(bnoordhuis) Check that nread > 0.
- if (pool) {
+ if (arrayBuffer) {
+ const nread = streamBaseState[kReadBytesOrError];
+ const offset = streamBaseState[kArrayBufferOffset];
+ const pool = new Uint8Array(arrayBuffer, offset, nread);
if (recvHandle)
pendingHandle = recvHandle;
diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js
index 4e9ab05af5..ded26644c5 100644
--- a/lib/internal/http2/core.js
+++ b/lib/internal/http2/core.js
@@ -120,7 +120,11 @@ const { isArrayBufferView } = require('internal/util/types');
const { FileHandle } = process.binding('fs');
const binding = internalBinding('http2');
-const { ShutdownWrap } = internalBinding('stream_wrap');
+const {
+ ShutdownWrap,
+ kReadBytesOrError,
+ streamBaseState
+} = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv');
const { StreamPipe } = internalBinding('stream_pipe');
@@ -2043,7 +2047,8 @@ function onFileUnpipe() {
// This is only called once the pipe has returned back control, so
// it only has to handle errors and End-of-File.
-function onPipedFileHandleRead(err) {
+function onPipedFileHandleRead() {
+ const err = streamBaseState[kReadBytesOrError];
if (err < 0 && err !== UV_EOF) {
this.stream.close(NGHTTP2_INTERNAL_ERROR);
}
diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js
index 8da15983f1..870b5b3e3b 100644
--- a/lib/internal/stream_base_commons.js
+++ b/lib/internal/stream_base_commons.js
@@ -1,7 +1,13 @@
'use strict';
const { Buffer } = require('buffer');
-const { WriteWrap } = internalBinding('stream_wrap');
+const { FastBuffer } = require('internal/buffer');
+const {
+ WriteWrap,
+ kReadBytesOrError,
+ kArrayBufferOffset,
+ streamBaseState
+} = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv');
const { errnoException } = require('internal/errors');
const { owner_symbol } = require('internal/async_hooks').symbols;
@@ -84,13 +90,17 @@ function afterWriteDispatched(self, req, err, cb) {
}
}
-function onStreamRead(nread, buf) {
+function onStreamRead(arrayBuffer) {
+ const nread = streamBaseState[kReadBytesOrError];
+
const handle = this;
const stream = this[owner_symbol];
stream[kUpdateTimer]();
if (nread > 0 && !stream.destroyed) {
+ const offset = streamBaseState[kArrayBufferOffset];
+ const buf = new FastBuffer(arrayBuffer, offset, nread);
if (!stream.push(buf)) {
handle.reading = false;
if (!stream.destroyed) {
diff --git a/src/env-inl.h b/src/env-inl.h
index 6ace0bf825..9d369d492c 100644
--- a/src/env-inl.h
+++ b/src/env-inl.h
@@ -446,6 +446,11 @@ Environment::trace_category_state() {
return trace_category_state_;
}
+inline AliasedBuffer<int32_t, v8::Int32Array>&
+Environment::stream_base_state() {
+ return stream_base_state_;
+}
+
inline uint32_t Environment::get_next_module_id() {
return module_id_counter_++;
}
diff --git a/src/env.cc b/src/env.cc
index f77bcbf169..9d8fd967a4 100644
--- a/src/env.cc
+++ b/src/env.cc
@@ -158,6 +158,7 @@ Environment::Environment(IsolateData* isolate_data,
makecallback_cntr_(0),
should_abort_on_uncaught_toggle_(isolate_, 1),
trace_category_state_(isolate_, kTraceCategoryCount),
+ stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields),
http_parser_buffer_(nullptr),
fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2),
fs_stats_field_bigint_array_(isolate_, kFsStatsFieldsLength * 2),
diff --git a/src/env.h b/src/env.h
index 3daa48f9cb..a85058f895 100644
--- a/src/env.h
+++ b/src/env.h
@@ -668,6 +668,7 @@ class Environment {
should_abort_on_uncaught_toggle();
inline AliasedBuffer<uint8_t, v8::Uint8Array>& trace_category_state();
+ inline AliasedBuffer<int32_t, v8::Int32Array>& stream_base_state();
// The necessary API for async_hooks.
inline double new_async_id();
@@ -951,6 +952,8 @@ class Environment {
AliasedBuffer<uint8_t, v8::Uint8Array> trace_category_state_;
std::unique_ptr<TrackingTraceStateObserver> trace_state_observer_;
+ AliasedBuffer<int32_t, v8::Int32Array> stream_base_state_;
+
std::unique_ptr<performance::performance_state> performance_state_;
std::unordered_map<std::string, uint64_t> performance_marks_;
diff --git a/src/node_http2.cc b/src/node_http2.cc
index 633d2389c7..ce5523a9d2 100644
--- a/src/node_http2.cc
+++ b/src/node_http2.cc
@@ -1256,10 +1256,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
CHECK_LE(offset, session->stream_buf_.len);
CHECK_LE(offset + buf.len, session->stream_buf_.len);
- Local<Object> buffer =
- Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked();
-
- stream->CallJSOnreadMethod(nread, buffer);
+ stream->CallJSOnreadMethod(nread, session->stream_buf_ab_, offset);
}
diff --git a/src/stream_base.cc b/src/stream_base.cc
index c6cce9c2d0..57713d5eaf 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -17,6 +17,7 @@
namespace node {
using v8::Array;
+using v8::ArrayBuffer;
using v8::Boolean;
using v8::Context;
using v8::FunctionCallbackInfo;
@@ -303,17 +304,29 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
}
-void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) {
+void StreamBase::CallJSOnreadMethod(ssize_t nread,
+ Local<ArrayBuffer> ab,
+ size_t offset) {
Environment* env = env_;
+#ifdef DEBUG
+ CHECK_EQ(static_cast<int32_t>(nread), nread);
+ CHECK_EQ(static_cast<int32_t>(offset), offset);
+
+ if (ab.IsEmpty()) {
+ CHECK_EQ(offset, 0);
+ CHECK_LE(nread, 0);
+ } else {
+ CHECK_GE(nread, 0);
+ }
+#endif
+ env->stream_base_state()[kReadBytesOrError] = nread;
+ env->stream_base_state()[kArrayBufferOffset] = offset;
+
Local<Value> argv[] = {
- Integer::New(env->isolate(), nread),
- buf
+ ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>()
};
- if (argv[1].IsEmpty())
- argv[1] = Undefined(env->isolate());
-
AsyncWrap* wrap = GetAsyncWrap();
CHECK_NOT_NULL(wrap);
wrap->MakeCallback(env->onread_string(), arraysize(argv), argv);
@@ -366,14 +379,18 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
if (nread <= 0) {
free(buf.base);
if (nread < 0)
- stream->CallJSOnreadMethod(nread, Local<Object>());
+ stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>());
return;
}
CHECK_LE(static_cast<size_t>(nread), buf.len);
char* base = Realloc(buf.base, nread);
- Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked();
+ Local<ArrayBuffer> obj = ArrayBuffer::New(
+ env->isolate(),
+ base,
+ nread,
+ v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8.
stream->CallJSOnreadMethod(nread, obj);
}
diff --git a/src/stream_base.h b/src/stream_base.h
index d8e6df960f..039009e072 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -264,7 +264,9 @@ class StreamBase : public StreamResource {
virtual bool IsIPCPipe();
virtual int GetFD();
- void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf);
+ void CallJSOnreadMethod(ssize_t nread,
+ v8::Local<v8::ArrayBuffer> ab,
+ size_t offset = 0);
// This is named `stream_env` to avoid name clashes, because a lot of
// subclasses are also `BaseObject`s.
@@ -326,12 +328,20 @@ class StreamBase : public StreamResource {
const v8::FunctionCallbackInfo<v8::Value>& args)>
static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args);
+ // Internal, used only in StreamBase methods + env.cc.
+ enum StreamBaseStateFields {
+ kReadBytesOrError,
+ kArrayBufferOffset,
+ kNumStreamBaseStateFields
+ };
+
private:
Environment* env_;
EmitToJSStreamListener default_listener_;
friend class WriteWrap;
friend class ShutdownWrap;
+ friend class Environment; // For kNumStreamBaseStateFields.
};
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index 9ccace435c..a3c45b940a 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -80,6 +80,11 @@ void LibuvStreamWrap::Initialize(Local<Object> target,
target->Set(writeWrapString,
ww->GetFunction(env->context()).ToLocalChecked());
env->set_write_wrap_template(ww->InstanceTemplate());
+
+ NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
+ NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
+ target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
+ env->stream_base_state().GetJSArray()).FromJust();
}
diff --git a/test/parallel/test-net-end-close.js b/test/parallel/test-net-end-close.js
index c0705da9d0..b488f16510 100644
--- a/test/parallel/test-net-end-close.js
+++ b/test/parallel/test-net-end-close.js
@@ -6,11 +6,15 @@ const net = require('net');
const { internalBinding } = require('internal/test/binding');
const { UV_EOF } = internalBinding('uv');
+const { streamBaseState, kReadBytesOrError } = internalBinding('stream_wrap');
const s = new net.Socket({
handle: {
readStart: function() {
- setImmediate(() => this.onread(UV_EOF, null));
+ setImmediate(() => {
+ streamBaseState[kReadBytesOrError] = UV_EOF;
+ this.onread();
+ });
},
close: (cb) => setImmediate(cb)
},
diff --git a/test/parallel/test-process-wrap.js b/test/parallel/test-process-wrap.js
index eccdeb5d07..ef9075e915 100644
--- a/test/parallel/test-process-wrap.js
+++ b/test/parallel/test-process-wrap.js
@@ -44,11 +44,10 @@ p.onexit = function(exitCode, signal) {
processExited = true;
};
-pipe.onread = function(err, b, off, len) {
+pipe.onread = function(arrayBuffer) {
assert.ok(processExited);
- if (b) {
+ if (arrayBuffer) {
gotPipeData = true;
- console.log('read %d', len);
} else {
gotPipeEOF = true;
pipe.close();
diff --git a/test/parallel/test-tcp-wrap-listen.js b/test/parallel/test-tcp-wrap-listen.js
index 9ecdf60f8c..72981b683c 100644
--- a/test/parallel/test-tcp-wrap-listen.js
+++ b/test/parallel/test-tcp-wrap-listen.js
@@ -5,7 +5,12 @@ const assert = require('assert');
const { internalBinding } = require('internal/test/binding');
const { TCP, constants: TCPConstants } = internalBinding('tcp_wrap');
-const { WriteWrap } = internalBinding('stream_wrap');
+const {
+ WriteWrap,
+ kReadBytesOrError,
+ kArrayBufferOffset,
+ streamBaseState
+} = internalBinding('stream_wrap');
const server = new TCP(TCPConstants.SOCKET);
@@ -30,8 +35,11 @@ server.onconnection = (err, client) => {
client.readStart();
client.pendingWrites = [];
- client.onread = common.mustCall((err, buffer) => {
- if (buffer) {
+ client.onread = common.mustCall((arrayBuffer) => {
+ if (arrayBuffer) {
+ const offset = streamBaseState[kArrayBufferOffset];
+ const nread = streamBaseState[kReadBytesOrError];
+ const buffer = Buffer.from(arrayBuffer, offset, nread);
assert.ok(buffer.length > 0);
assert.strictEqual(client.writeQueueSize, 0);