From f01518edfd83e2235d84485d87621e61f675b4a7 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Tue, 23 Oct 2018 08:23:02 +0200 Subject: src: improve StreamBase write throughput Improve performance by transferring information about write status to JS through an `AliasedBuffer`, rather than object properties set from C++. PR-URL: https://github.com/nodejs/node/pull/23843 Reviewed-By: James M Snell Reviewed-By: Anatoli Papirovski --- benchmark/net/net-c2s.js | 2 +- benchmark/net/net-pipe.js | 2 +- benchmark/net/net-s2c.js | 2 +- benchmark/net/net-wrap-js-stream-passthrough.js | 2 +- lib/internal/child_process.js | 5 ++-- lib/internal/stream_base_commons.js | 22 +++++++++++++++-- src/env.h | 2 -- src/stream_base.cc | 33 ++++++------------------- src/stream_base.h | 4 +++ src/stream_wrap.cc | 2 ++ test/sequential/test-async-wrap-getasyncid.js | 2 +- 11 files changed, 41 insertions(+), 37 deletions(-) diff --git a/benchmark/net/net-c2s.js b/benchmark/net/net-c2s.js index 4add79a166..dc2a5bc015 100644 --- a/benchmark/net/net-c2s.js +++ b/benchmark/net/net-c2s.js @@ -6,7 +6,7 @@ const net = require('net'); const PORT = common.PORT; const bench = common.createBenchmark(main, { - len: [102400, 1024 * 1024 * 16], + len: [64, 102400, 1024 * 1024 * 16], type: ['utf', 'asc', 'buf'], dur: [5], }); diff --git a/benchmark/net/net-pipe.js b/benchmark/net/net-pipe.js index 3dd3bb78cc..e0b2842fd1 100644 --- a/benchmark/net/net-pipe.js +++ b/benchmark/net/net-pipe.js @@ -6,7 +6,7 @@ const net = require('net'); const PORT = common.PORT; const bench = common.createBenchmark(main, { - len: [102400, 1024 * 1024 * 16], + len: [64, 102400, 1024 * 1024 * 16], type: ['utf', 'asc', 'buf'], dur: [5], }); diff --git a/benchmark/net/net-s2c.js b/benchmark/net/net-s2c.js index 2ddf8fd6c5..6ee5afa663 100644 --- a/benchmark/net/net-s2c.js +++ b/benchmark/net/net-s2c.js @@ -5,7 +5,7 @@ const common = require('../common.js'); const PORT = common.PORT; const bench = common.createBenchmark(main, { - len: [102400, 1024 * 1024 * 16], + len: [64, 102400, 1024 * 1024 * 16], type: ['utf', 'asc', 'buf'], dur: [5] }); diff --git a/benchmark/net/net-wrap-js-stream-passthrough.js b/benchmark/net/net-wrap-js-stream-passthrough.js index 05a66f4e7a..c4d11fa56c 100644 --- a/benchmark/net/net-wrap-js-stream-passthrough.js +++ b/benchmark/net/net-wrap-js-stream-passthrough.js @@ -5,7 +5,7 @@ const common = require('../common.js'); const { PassThrough } = require('stream'); const bench = common.createBenchmark(main, { - len: [102400, 1024 * 1024 * 16], + len: [64, 102400, 1024 * 1024 * 16], type: ['utf', 'asc', 'buf'], dur: [5], }, { diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js index a2478ec69d..ddb7e58d7c 100644 --- a/lib/internal/child_process.js +++ b/lib/internal/child_process.js @@ -26,6 +26,7 @@ const { WriteWrap, kReadBytesOrError, kArrayBufferOffset, + kLastWriteWasAsync, streamBaseState } = internalBinding('stream_wrap'); const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap'); @@ -716,10 +717,10 @@ function setupChannel(target, channel) { } var req = new WriteWrap(); - req.async = false; var string = JSON.stringify(message) + '\n'; var err = channel.writeUtf8String(req, string, handle); + var wasAsyncWrite = streamBaseState[kLastWriteWasAsync]; if (err === 0) { if (handle) { @@ -729,7 +730,7 @@ function setupChannel(target, channel) { obj.postSend(message, handle, options, callback, target); } - if (req.async) { + if (wasAsyncWrite) { req.oncomplete = function() { control.unref(); if (typeof callback === 'function') diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js index 870b5b3e3b..709395fa91 100644 --- a/lib/internal/stream_base_commons.js +++ b/lib/internal/stream_base_commons.js @@ -6,6 +6,8 @@ const { WriteWrap, kReadBytesOrError, kArrayBufferOffset, + kBytesWritten, + kLastWriteWasAsync, streamBaseState } = internalBinding('stream_wrap'); const { UV_EOF } = internalBinding('uv'); @@ -20,7 +22,12 @@ function handleWriteReq(req, data, encoding) { switch (encoding) { case 'buffer': - return handle.writeBuffer(req, data); + { + const ret = handle.writeBuffer(req, data); + if (streamBaseState[kLastWriteWasAsync]) + req.buffer = data; + return ret; + } case 'latin1': case 'binary': return handle.writeLatin1String(req, data); @@ -35,7 +42,13 @@ function handleWriteReq(req, data, encoding) { case 'utf-16le': return handle.writeUcs2String(req, data); default: - return handle.writeBuffer(req, Buffer.from(data, encoding)); + { + const buffer = Buffer.from(data, encoding); + const ret = handle.writeBuffer(req, buffer); + if (streamBaseState[kLastWriteWasAsync]) + req.buffer = buffer; + return ret; + } } } @@ -45,6 +58,8 @@ function createWriteWrap(handle, oncomplete) { req.handle = handle; req.oncomplete = oncomplete; req.async = false; + req.bytes = 0; + req.buffer = null; return req; } @@ -80,6 +95,9 @@ function writeGeneric(self, req, data, encoding, cb) { } function afterWriteDispatched(self, req, err, cb) { + req.bytes = streamBaseState[kBytesWritten]; + req.async = !!streamBaseState[kLastWriteWasAsync]; + if (err !== 0) return self.destroy(errnoException(err, 'write', req.error), cb); diff --git a/src/env.h b/src/env.h index 82f68f1b11..a7ea3de82f 100644 --- a/src/env.h +++ b/src/env.h @@ -128,10 +128,8 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2; V(address_string, "address") \ V(aliases_string, "aliases") \ V(args_string, "args") \ - V(async, "async") \ V(async_ids_stack_string, "async_ids_stack") \ V(buffer_string, "buffer") \ - V(bytes_string, "bytes") \ V(bytes_parsed_string, "bytesParsed") \ V(bytes_read_string, "bytesRead") \ V(bytes_written_string, "bytesWritten") \ diff --git a/src/stream_base.cc b/src/stream_base.cc index 57713d5eaf..adb839c3e5 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -18,13 +18,11 @@ namespace node { using v8::Array; using v8::ArrayBuffer; -using v8::Boolean; using v8::Context; using v8::FunctionCallbackInfo; using v8::HandleScope; using v8::Integer; using v8::Local; -using v8::Number; using v8::Object; using v8::String; using v8::Value; @@ -56,18 +54,9 @@ int StreamBase::Shutdown(const FunctionCallbackInfo& args) { return Shutdown(req_wrap_obj); } -inline void SetWriteResultPropertiesOnWrapObject( - Environment* env, - Local req_wrap_obj, - const StreamWriteResult& res) { - req_wrap_obj->Set( - env->context(), - env->bytes_string(), - Number::New(env->isolate(), res.bytes)).FromJust(); - req_wrap_obj->Set( - env->context(), - env->async(), - Boolean::New(env->isolate(), res.async)).FromJust(); +void StreamBase::SetWriteResult(const StreamWriteResult& res) { + env_->stream_base_state()[kBytesWritten] = res.bytes; + env_->stream_base_state()[kLastWriteWasAsync] = res.async; } int StreamBase::Writev(const FunctionCallbackInfo& args) { @@ -160,7 +149,7 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { } StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj); - SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res); + SetWriteResult(res); if (res.wrap != nullptr && storage_size > 0) { res.wrap->SetAllocatedStorage(storage.release(), storage_size); } @@ -185,10 +174,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { buf.len = Buffer::Length(args[1]); StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj); - - if (res.async) - req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust(); - SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res); + SetWriteResult(res); return res.err; } @@ -247,12 +233,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { // Immediate failure or success if (err != 0 || count == 0) { - req_wrap_obj->Set(env->context(), env->async(), False(env->isolate())) - .FromJust(); - req_wrap_obj->Set(env->context(), - env->bytes_string(), - Integer::NewFromUnsigned(env->isolate(), data_size)) - .FromJust(); + SetWriteResult(StreamWriteResult { false, err, nullptr, data_size }); return err; } @@ -295,7 +276,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj); res.bytes += synchronously_written; - SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res); + SetWriteResult(res); if (res.wrap != nullptr) { res.wrap->SetAllocatedStorage(data.release(), data_size); } diff --git a/src/stream_base.h b/src/stream_base.h index 039009e072..063c8714fd 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -332,6 +332,8 @@ class StreamBase : public StreamResource { enum StreamBaseStateFields { kReadBytesOrError, kArrayBufferOffset, + kBytesWritten, + kLastWriteWasAsync, kNumStreamBaseStateFields }; @@ -339,6 +341,8 @@ class StreamBase : public StreamResource { Environment* env_; EmitToJSStreamListener default_listener_; + void SetWriteResult(const StreamWriteResult& res); + friend class WriteWrap; friend class ShutdownWrap; friend class Environment; // For kNumStreamBaseStateFields. diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index a3c45b940a..dac5ebdeb3 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -83,6 +83,8 @@ void LibuvStreamWrap::Initialize(Local target, NODE_DEFINE_CONSTANT(target, kReadBytesOrError); NODE_DEFINE_CONSTANT(target, kArrayBufferOffset); + NODE_DEFINE_CONSTANT(target, kBytesWritten); + NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync); target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"), env->stream_base_state().GetJSArray()).FromJust(); } diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 5ff8760daa..851a0b3fbc 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -239,7 +239,7 @@ if (common.hasCrypto) { // eslint-disable-line node-core/crypto-check const err = handle.writeLatin1String(wreq, 'hi'.repeat(100000)); if (err) throw new Error(`write failed: ${getSystemErrorName(err)}`); - if (!wreq.async) { + if (!stream_wrap.streamBaseState[stream_wrap.kLastWriteWasAsync]) { testUninitialized(wreq, 'WriteWrap'); // Synchronous finish. Write more data until we hit an // asynchronous write. -- cgit v1.2.3