diff options
author | Anna Henningsen <anna@addaleax.net> | 2018-10-21 08:34:00 +0200 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2018-10-24 09:57:42 +0200 |
commit | 1365f657b51d31044cca54c3152d3940a4bd9dc3 (patch) | |
tree | 4772c2b484555de0a0368b35ebf889ff23d154c2 | |
parent | bb79e768e5ab150f2075780734005783d53eb3ca (diff) | |
download | android-node-v8-1365f657b51d31044cca54c3152d3940a4bd9dc3.tar.gz android-node-v8-1365f657b51d31044cca54c3152d3940a4bd9dc3.tar.bz2 android-node-v8-1365f657b51d31044cca54c3152d3940a4bd9dc3.zip |
src: improve StreamBase read throughput
Improve performance by providing JS with the raw ingridients
for the read data, i.e. an `ArrayBuffer` + offset + length
fields, instead of creating `Buffer` instances in C++ land.
PR-URL: https://github.com/nodejs/node/pull/23797
Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
-rw-r--r-- | benchmark/net/tcp-raw-c2s.js | 8 | ||||
-rw-r--r-- | benchmark/net/tcp-raw-pipe.js | 16 | ||||
-rw-r--r-- | benchmark/net/tcp-raw-s2c.js | 8 | ||||
-rw-r--r-- | lib/internal/child_process.js | 15 | ||||
-rw-r--r-- | lib/internal/http2/core.js | 9 | ||||
-rw-r--r-- | lib/internal/stream_base_commons.js | 14 | ||||
-rw-r--r-- | src/env-inl.h | 5 | ||||
-rw-r--r-- | src/env.cc | 1 | ||||
-rw-r--r-- | src/env.h | 3 | ||||
-rw-r--r-- | src/node_http2.cc | 5 | ||||
-rw-r--r-- | src/stream_base.cc | 33 | ||||
-rw-r--r-- | src/stream_base.h | 12 | ||||
-rw-r--r-- | src/stream_wrap.cc | 5 | ||||
-rw-r--r-- | test/parallel/test-net-end-close.js | 6 | ||||
-rw-r--r-- | test/parallel/test-process-wrap.js | 5 | ||||
-rw-r--r-- | test/parallel/test-tcp-wrap-listen.js | 14 |
16 files changed, 115 insertions, 44 deletions
diff --git a/benchmark/net/tcp-raw-c2s.js b/benchmark/net/tcp-raw-c2s.js index 1f10ae7c83..116cf57a23 100644 --- a/benchmark/net/tcp-raw-c2s.js +++ b/benchmark/net/tcp-raw-c2s.js @@ -46,15 +46,15 @@ function main({ dur, len, type }) { process.exit(0); }, dur * 1000); - clientHandle.onread = function(nread, buffer) { + clientHandle.onread = function(buffer) { // we're not expecting to ever get an EOF from the client. // just lots of data forever. - if (nread < 0) - fail(nread, 'read'); + if (!buffer) + fail('read'); // don't slice the buffer. the point of this is to isolate, not // simulate real traffic. - bytes += buffer.length; + bytes += buffer.byteLength; }; clientHandle.readStart(); diff --git a/benchmark/net/tcp-raw-pipe.js b/benchmark/net/tcp-raw-pipe.js index 16dc6955c4..7144c237af 100644 --- a/benchmark/net/tcp-raw-pipe.js +++ b/benchmark/net/tcp-raw-pipe.js @@ -43,15 +43,15 @@ function main({ dur, len, type }) { if (err) fail(err, 'connect'); - clientHandle.onread = function(nread, buffer) { + clientHandle.onread = function(buffer) { // we're not expecting to ever get an EOF from the client. // just lots of data forever. - if (nread < 0) - fail(nread, 'read'); + if (!buffer) + fail('read'); const writeReq = new WriteWrap(); writeReq.async = false; - err = clientHandle.writeBuffer(writeReq, buffer); + err = clientHandle.writeBuffer(writeReq, Buffer.from(buffer)); if (err) fail(err, 'write'); @@ -89,11 +89,11 @@ function main({ dur, len, type }) { if (err) fail(err, 'connect'); - clientHandle.onread = function(nread, buffer) { - if (nread < 0) - fail(nread, 'read'); + clientHandle.onread = function(buffer) { + if (!buffer) + fail('read'); - bytes += buffer.length; + bytes += buffer.byteLength; }; connectReq.oncomplete = function(err) { diff --git a/benchmark/net/tcp-raw-s2c.js b/benchmark/net/tcp-raw-s2c.js index 1700d23890..fbb7d2520c 100644 --- a/benchmark/net/tcp-raw-s2c.js +++ b/benchmark/net/tcp-raw-s2c.js @@ -109,15 +109,15 @@ function main({ dur, len, type }) { connectReq.oncomplete = function() { var bytes = 0; - clientHandle.onread = function(nread, buffer) { + clientHandle.onread = function(buffer) { // we're not expecting to ever get an EOF from the client. // just lots of data forever. - if (nread < 0) - fail(nread, 'read'); + if (!buffer) + fail('read'); // don't slice the buffer. the point of this is to isolate, not // simulate real traffic. - bytes += buffer.length; + bytes += buffer.byteLength; }; clientHandle.readStart(); diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index e6cdde56c1..74d69de0dc 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -22,7 +22,12 @@ const util = require('util'); const assert = require('assert'); const { Process } = internalBinding('process_wrap'); -const { WriteWrap } = internalBinding('stream_wrap'); +const { + WriteWrap, + kReadBytesOrError, + kArrayBufferOffset, + streamBaseState +} = internalBinding('stream_wrap'); const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); const { TCP } = internalBinding('tcp_wrap'); const { TTY } = internalBinding('tty_wrap'); @@ -486,11 +491,13 @@ function setupChannel(target, channel) { var pendingHandle = null; channel.buffering = false; channel.pendingHandle = null; - channel.onread = function(nread, pool) { + channel.onread = function(arrayBuffer) { const recvHandle = channel.pendingHandle; channel.pendingHandle = null; - // TODO(bnoordhuis) Check that nread > 0. - if (pool) { + if (arrayBuffer) { + const nread = streamBaseState[kReadBytesOrError]; + const offset = streamBaseState[kArrayBufferOffset]; + const pool = new Uint8Array(arrayBuffer, offset, nread); if (recvHandle) pendingHandle = recvHandle; diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 4e9ab05af5..ded26644c5 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -120,7 +120,11 @@ const { isArrayBufferView } = require('internal/util/types'); const { FileHandle } = process.binding('fs'); const binding = internalBinding('http2'); -const { ShutdownWrap } = internalBinding('stream_wrap'); +const { + ShutdownWrap, + kReadBytesOrError, + streamBaseState +} = internalBinding('stream_wrap'); const { UV_EOF } = internalBinding('uv'); const { StreamPipe } = internalBinding('stream_pipe'); @@ -2043,7 +2047,8 @@ function onFileUnpipe() { // This is only called once the pipe has returned back control, so // it only has to handle errors and End-of-File. -function onPipedFileHandleRead(err) { +function onPipedFileHandleRead() { + const err = streamBaseState[kReadBytesOrError]; if (err < 0 && err !== UV_EOF) { this.stream.close(NGHTTP2_INTERNAL_ERROR); } diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 8da15983f1..870b5b3e3b 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -1,7 +1,13 @@ 'use strict'; const { Buffer } = require('buffer'); -const { WriteWrap } = internalBinding('stream_wrap'); +const { FastBuffer } = require('internal/buffer'); +const { + WriteWrap, + kReadBytesOrError, + kArrayBufferOffset, + streamBaseState +} = internalBinding('stream_wrap'); const { UV_EOF } = internalBinding('uv'); const { errnoException } = require('internal/errors'); const { owner_symbol } = require('internal/async_hooks').symbols; @@ -84,13 +90,17 @@ function afterWriteDispatched(self, req, err, cb) { } } -function onStreamRead(nread, buf) { +function onStreamRead(arrayBuffer) { + const nread = streamBaseState[kReadBytesOrError]; + const handle = this; const stream = this[owner_symbol]; stream[kUpdateTimer](); if (nread > 0 && !stream.destroyed) { + const offset = streamBaseState[kArrayBufferOffset]; + const buf = new FastBuffer(arrayBuffer, offset, nread); if (!stream.push(buf)) { handle.reading = false; if (!stream.destroyed) { diff --git a/src/env-inl.h b/src/env-inl.h index 6ace0bf825..9d369d492c 100644 --- a/src/env-inl.h +++ b/src/env-inl.h @@ -446,6 +446,11 @@ Environment::trace_category_state() { return trace_category_state_; } +inline AliasedBuffer<int32_t, v8::Int32Array>& +Environment::stream_base_state() { + return stream_base_state_; +} + inline uint32_t Environment::get_next_module_id() { return module_id_counter_++; } diff --git a/src/env.cc b/src/env.cc index f77bcbf169..9d8fd967a4 100644 --- a/src/env.cc +++ b/src/env.cc @@ -158,6 +158,7 @@ Environment::Environment(IsolateData* isolate_data, makecallback_cntr_(0), should_abort_on_uncaught_toggle_(isolate_, 1), trace_category_state_(isolate_, kTraceCategoryCount), + stream_base_state_(isolate_, StreamBase::kNumStreamBaseStateFields), http_parser_buffer_(nullptr), fs_stats_field_array_(isolate_, kFsStatsFieldsLength * 2), fs_stats_field_bigint_array_(isolate_, kFsStatsFieldsLength * 2), @@ -668,6 +668,7 @@ class Environment { should_abort_on_uncaught_toggle(); inline AliasedBuffer<uint8_t, v8::Uint8Array>& trace_category_state(); + inline AliasedBuffer<int32_t, v8::Int32Array>& stream_base_state(); // The necessary API for async_hooks. inline double new_async_id(); @@ -951,6 +952,8 @@ class Environment { AliasedBuffer<uint8_t, v8::Uint8Array> trace_category_state_; std::unique_ptr<TrackingTraceStateObserver> trace_state_observer_; + AliasedBuffer<int32_t, v8::Int32Array> stream_base_state_; + std::unique_ptr<performance::performance_state> performance_state_; std::unordered_map<std::string, uint64_t> performance_marks_; diff --git a/src/node_http2.cc b/src/node_http2.cc index 633d2389c7..ce5523a9d2 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -1256,10 +1256,7 @@ void Http2StreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { CHECK_LE(offset, session->stream_buf_.len); CHECK_LE(offset + buf.len, session->stream_buf_.len); - Local<Object> buffer = - Buffer::New(env, session->stream_buf_ab_, offset, nread).ToLocalChecked(); - - stream->CallJSOnreadMethod(nread, buffer); + stream->CallJSOnreadMethod(nread, session->stream_buf_ab_, offset); } diff --git a/src/stream_base.cc b/src/stream_base.cc index c6cce9c2d0..57713d5eaf 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -17,6 +17,7 @@ namespace node { using v8::Array; +using v8::ArrayBuffer; using v8::Boolean; using v8::Context; using v8::FunctionCallbackInfo; @@ -303,17 +304,29 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) { } -void StreamBase::CallJSOnreadMethod(ssize_t nread, Local<Object> buf) { +void StreamBase::CallJSOnreadMethod(ssize_t nread, + Local<ArrayBuffer> ab, + size_t offset) { Environment* env = env_; +#ifdef DEBUG + CHECK_EQ(static_cast<int32_t>(nread), nread); + CHECK_EQ(static_cast<int32_t>(offset), offset); + + if (ab.IsEmpty()) { + CHECK_EQ(offset, 0); + CHECK_LE(nread, 0); + } else { + CHECK_GE(nread, 0); + } +#endif + env->stream_base_state()[kReadBytesOrError] = nread; + env->stream_base_state()[kArrayBufferOffset] = offset; + Local<Value> argv[] = { - Integer::New(env->isolate(), nread), - buf + ab.IsEmpty() ? Undefined(env->isolate()).As<Value>() : ab.As<Value>() }; - if (argv[1].IsEmpty()) - argv[1] = Undefined(env->isolate()); - AsyncWrap* wrap = GetAsyncWrap(); CHECK_NOT_NULL(wrap); wrap->MakeCallback(env->onread_string(), arraysize(argv), argv); @@ -366,14 +379,18 @@ void EmitToJSStreamListener::OnStreamRead(ssize_t nread, const uv_buf_t& buf) { if (nread <= 0) { free(buf.base); if (nread < 0) - stream->CallJSOnreadMethod(nread, Local<Object>()); + stream->CallJSOnreadMethod(nread, Local<ArrayBuffer>()); return; } CHECK_LE(static_cast<size_t>(nread), buf.len); char* base = Realloc(buf.base, nread); - Local<Object> obj = Buffer::New(env, base, nread).ToLocalChecked(); + Local<ArrayBuffer> obj = ArrayBuffer::New( + env->isolate(), + base, + nread, + v8::ArrayBufferCreationMode::kInternalized); // Transfer ownership to V8. stream->CallJSOnreadMethod(nread, obj); } diff --git a/src/stream_base.h b/src/stream_base.h index d8e6df960f..039009e072 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -264,7 +264,9 @@ class StreamBase : public StreamResource { virtual bool IsIPCPipe(); virtual int GetFD(); - void CallJSOnreadMethod(ssize_t nread, v8::Local<v8::Object> buf); + void CallJSOnreadMethod(ssize_t nread, + v8::Local<v8::ArrayBuffer> ab, + size_t offset = 0); // This is named `stream_env` to avoid name clashes, because a lot of // subclasses are also `BaseObject`s. @@ -326,12 +328,20 @@ class StreamBase : public StreamResource { const v8::FunctionCallbackInfo<v8::Value>& args)> static void JSMethod(const v8::FunctionCallbackInfo<v8::Value>& args); + // Internal, used only in StreamBase methods + env.cc. + enum StreamBaseStateFields { + kReadBytesOrError, + kArrayBufferOffset, + kNumStreamBaseStateFields + }; + private: Environment* env_; EmitToJSStreamListener default_listener_; friend class WriteWrap; friend class ShutdownWrap; + friend class Environment; // For kNumStreamBaseStateFields. }; diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 9ccace435c..a3c45b940a 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -80,6 +80,11 @@ void LibuvStreamWrap::Initialize(Local<Object> target, target->Set(writeWrapString, ww->GetFunction(env->context()).ToLocalChecked()); env->set_write_wrap_template(ww->InstanceTemplate()); + + NODE_DEFINE_CONSTANT(target, kReadBytesOrError); + NODE_DEFINE_CONSTANT(target, kArrayBufferOffset); + target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"), + env->stream_base_state().GetJSArray()).FromJust(); } diff --git a/test/parallel/test-net-end-close.js b/test/parallel/test-net-end-close.js index c0705da9d0..b488f16510 100644 --- a/test/parallel/test-net-end-close.js +++ b/test/parallel/test-net-end-close.js @@ -6,11 +6,15 @@ const net = require('net'); const { internalBinding } = require('internal/test/binding'); const { UV_EOF } = internalBinding('uv'); +const { streamBaseState, kReadBytesOrError } = internalBinding('stream_wrap'); const s = new net.Socket({ handle: { readStart: function() { - setImmediate(() => this.onread(UV_EOF, null)); + setImmediate(() => { + streamBaseState[kReadBytesOrError] = UV_EOF; + this.onread(); + }); }, close: (cb) => setImmediate(cb) }, diff --git a/test/parallel/test-process-wrap.js b/test/parallel/test-process-wrap.js index eccdeb5d07..ef9075e915 100644 --- a/test/parallel/test-process-wrap.js +++ b/test/parallel/test-process-wrap.js @@ -44,11 +44,10 @@ p.onexit = function(exitCode, signal) { processExited = true; }; -pipe.onread = function(err, b, off, len) { +pipe.onread = function(arrayBuffer) { assert.ok(processExited); - if (b) { + if (arrayBuffer) { gotPipeData = true; - console.log('read %d', len); } else { gotPipeEOF = true; pipe.close(); diff --git a/test/parallel/test-tcp-wrap-listen.js b/test/parallel/test-tcp-wrap-listen.js index 9ecdf60f8c..72981b683c 100644 --- a/test/parallel/test-tcp-wrap-listen.js +++ b/test/parallel/test-tcp-wrap-listen.js @@ -5,7 +5,12 @@ const assert = require('assert'); const { internalBinding } = require('internal/test/binding'); const { TCP, constants: TCPConstants } = internalBinding('tcp_wrap'); -const { WriteWrap } = internalBinding('stream_wrap'); +const { + WriteWrap, + kReadBytesOrError, + kArrayBufferOffset, + streamBaseState +} = internalBinding('stream_wrap'); const server = new TCP(TCPConstants.SOCKET); @@ -30,8 +35,11 @@ server.onconnection = (err, client) => { client.readStart(); client.pendingWrites = []; - client.onread = common.mustCall((err, buffer) => { - if (buffer) { + client.onread = common.mustCall((arrayBuffer) => { + if (arrayBuffer) { + const offset = streamBaseState[kArrayBufferOffset]; + const nread = streamBaseState[kReadBytesOrError]; + const buffer = Buffer.from(arrayBuffer, offset, nread); assert.ok(buffer.length > 0); assert.strictEqual(client.writeQueueSize, 0); |