summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2018-10-23 08:23:02 +0200
committerMichaƫl Zasso <targos@protonmail.com>2018-10-28 15:15:15 +0100
commitf01518edfd83e2235d84485d87621e61f675b4a7 (patch)
tree20a562e8e3d86cce4787fef5847baaced6015a53
parent0a23538e49e27b95ee35b051b6507eca74e2bb20 (diff)
downloadandroid-node-v8-f01518edfd83e2235d84485d87621e61f675b4a7.tar.gz
android-node-v8-f01518edfd83e2235d84485d87621e61f675b4a7.tar.bz2
android-node-v8-f01518edfd83e2235d84485d87621e61f675b4a7.zip
src: improve StreamBase write throughput
Improve performance by transferring information about write status to JS through an `AliasedBuffer`, rather than object properties set from C++. PR-URL: https://github.com/nodejs/node/pull/23843 Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Anatoli Papirovski <apapirovski@mac.com>
-rw-r--r--benchmark/net/net-c2s.js2
-rw-r--r--benchmark/net/net-pipe.js2
-rw-r--r--benchmark/net/net-s2c.js2
-rw-r--r--benchmark/net/net-wrap-js-stream-passthrough.js2
-rw-r--r--lib/internal/child_process.js5
-rw-r--r--lib/internal/stream_base_commons.js22
-rw-r--r--src/env.h2
-rw-r--r--src/stream_base.cc33
-rw-r--r--src/stream_base.h4
-rw-r--r--src/stream_wrap.cc2
-rw-r--r--test/sequential/test-async-wrap-getasyncid.js2
11 files changed, 41 insertions, 37 deletions
diff --git a/benchmark/net/net-c2s.js b/benchmark/net/net-c2s.js
index 4add79a166..dc2a5bc015 100644
--- a/benchmark/net/net-c2s.js
+++ b/benchmark/net/net-c2s.js
@@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT;
const bench = common.createBenchmark(main, {
- len: [102400, 1024 * 1024 * 16],
+ len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
});
diff --git a/benchmark/net/net-pipe.js b/benchmark/net/net-pipe.js
index 3dd3bb78cc..e0b2842fd1 100644
--- a/benchmark/net/net-pipe.js
+++ b/benchmark/net/net-pipe.js
@@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT;
const bench = common.createBenchmark(main, {
- len: [102400, 1024 * 1024 * 16],
+ len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
});
diff --git a/benchmark/net/net-s2c.js b/benchmark/net/net-s2c.js
index 2ddf8fd6c5..6ee5afa663 100644
--- a/benchmark/net/net-s2c.js
+++ b/benchmark/net/net-s2c.js
@@ -5,7 +5,7 @@ const common = require('../common.js');
const PORT = common.PORT;
const bench = common.createBenchmark(main, {
- len: [102400, 1024 * 1024 * 16],
+ len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5]
});
diff --git a/benchmark/net/net-wrap-js-stream-passthrough.js b/benchmark/net/net-wrap-js-stream-passthrough.js
index 05a66f4e7a..c4d11fa56c 100644
--- a/benchmark/net/net-wrap-js-stream-passthrough.js
+++ b/benchmark/net/net-wrap-js-stream-passthrough.js
@@ -5,7 +5,7 @@ const common = require('../common.js');
const { PassThrough } = require('stream');
const bench = common.createBenchmark(main, {
- len: [102400, 1024 * 1024 * 16],
+ len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
}, {
diff --git a/lib/internal/child_process.js b/lib/internal/child_process.js
index a2478ec69d..ddb7e58d7c 100644
--- a/lib/internal/child_process.js
+++ b/lib/internal/child_process.js
@@ -26,6 +26,7 @@ const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
+ kLastWriteWasAsync,
streamBaseState
} = internalBinding('stream_wrap');
const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
@@ -716,10 +717,10 @@ function setupChannel(target, channel) {
}
var req = new WriteWrap();
- req.async = false;
var string = JSON.stringify(message) + '\n';
var err = channel.writeUtf8String(req, string, handle);
+ var wasAsyncWrite = streamBaseState[kLastWriteWasAsync];
if (err === 0) {
if (handle) {
@@ -729,7 +730,7 @@ function setupChannel(target, channel) {
obj.postSend(message, handle, options, callback, target);
}
- if (req.async) {
+ if (wasAsyncWrite) {
req.oncomplete = function() {
control.unref();
if (typeof callback === 'function')
diff --git a/lib/internal/stream_base_commons.js b/lib/internal/stream_base_commons.js
index 870b5b3e3b..709395fa91 100644
--- a/lib/internal/stream_base_commons.js
+++ b/lib/internal/stream_base_commons.js
@@ -6,6 +6,8 @@ const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
+ kBytesWritten,
+ kLastWriteWasAsync,
streamBaseState
} = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv');
@@ -20,7 +22,12 @@ function handleWriteReq(req, data, encoding) {
switch (encoding) {
case 'buffer':
- return handle.writeBuffer(req, data);
+ {
+ const ret = handle.writeBuffer(req, data);
+ if (streamBaseState[kLastWriteWasAsync])
+ req.buffer = data;
+ return ret;
+ }
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
@@ -35,7 +42,13 @@ function handleWriteReq(req, data, encoding) {
case 'utf-16le':
return handle.writeUcs2String(req, data);
default:
- return handle.writeBuffer(req, Buffer.from(data, encoding));
+ {
+ const buffer = Buffer.from(data, encoding);
+ const ret = handle.writeBuffer(req, buffer);
+ if (streamBaseState[kLastWriteWasAsync])
+ req.buffer = buffer;
+ return ret;
+ }
}
}
@@ -45,6 +58,8 @@ function createWriteWrap(handle, oncomplete) {
req.handle = handle;
req.oncomplete = oncomplete;
req.async = false;
+ req.bytes = 0;
+ req.buffer = null;
return req;
}
@@ -80,6 +95,9 @@ function writeGeneric(self, req, data, encoding, cb) {
}
function afterWriteDispatched(self, req, err, cb) {
+ req.bytes = streamBaseState[kBytesWritten];
+ req.async = !!streamBaseState[kLastWriteWasAsync];
+
if (err !== 0)
return self.destroy(errnoException(err, 'write', req.error), cb);
diff --git a/src/env.h b/src/env.h
index 82f68f1b11..a7ea3de82f 100644
--- a/src/env.h
+++ b/src/env.h
@@ -128,10 +128,8 @@ constexpr size_t kFsStatsBufferLength = kFsStatsFieldsNumber * 2;
V(address_string, "address") \
V(aliases_string, "aliases") \
V(args_string, "args") \
- V(async, "async") \
V(async_ids_stack_string, "async_ids_stack") \
V(buffer_string, "buffer") \
- V(bytes_string, "bytes") \
V(bytes_parsed_string, "bytesParsed") \
V(bytes_read_string, "bytesRead") \
V(bytes_written_string, "bytesWritten") \
diff --git a/src/stream_base.cc b/src/stream_base.cc
index 57713d5eaf..adb839c3e5 100644
--- a/src/stream_base.cc
+++ b/src/stream_base.cc
@@ -18,13 +18,11 @@ namespace node {
using v8::Array;
using v8::ArrayBuffer;
-using v8::Boolean;
using v8::Context;
using v8::FunctionCallbackInfo;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
-using v8::Number;
using v8::Object;
using v8::String;
using v8::Value;
@@ -56,18 +54,9 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
return Shutdown(req_wrap_obj);
}
-inline void SetWriteResultPropertiesOnWrapObject(
- Environment* env,
- Local<Object> req_wrap_obj,
- const StreamWriteResult& res) {
- req_wrap_obj->Set(
- env->context(),
- env->bytes_string(),
- Number::New(env->isolate(), res.bytes)).FromJust();
- req_wrap_obj->Set(
- env->context(),
- env->async(),
- Boolean::New(env->isolate(), res.async)).FromJust();
+void StreamBase::SetWriteResult(const StreamWriteResult& res) {
+ env_->stream_base_state()[kBytesWritten] = res.bytes;
+ env_->stream_base_state()[kLastWriteWasAsync] = res.async;
}
int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
@@ -160,7 +149,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
}
StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
- SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
+ SetWriteResult(res);
if (res.wrap != nullptr && storage_size > 0) {
res.wrap->SetAllocatedStorage(storage.release(), storage_size);
}
@@ -185,10 +174,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
buf.len = Buffer::Length(args[1]);
StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);
-
- if (res.async)
- req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
- SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
+ SetWriteResult(res);
return res.err;
}
@@ -247,12 +233,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
// Immediate failure or success
if (err != 0 || count == 0) {
- req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
- .FromJust();
- req_wrap_obj->Set(env->context(),
- env->bytes_string(),
- Integer::NewFromUnsigned(env->isolate(), data_size))
- .FromJust();
+ SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
return err;
}
@@ -295,7 +276,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
res.bytes += synchronously_written;
- SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
+ SetWriteResult(res);
if (res.wrap != nullptr) {
res.wrap->SetAllocatedStorage(data.release(), data_size);
}
diff --git a/src/stream_base.h b/src/stream_base.h
index 039009e072..063c8714fd 100644
--- a/src/stream_base.h
+++ b/src/stream_base.h
@@ -332,6 +332,8 @@ class StreamBase : public StreamResource {
enum StreamBaseStateFields {
kReadBytesOrError,
kArrayBufferOffset,
+ kBytesWritten,
+ kLastWriteWasAsync,
kNumStreamBaseStateFields
};
@@ -339,6 +341,8 @@ class StreamBase : public StreamResource {
Environment* env_;
EmitToJSStreamListener default_listener_;
+ void SetWriteResult(const StreamWriteResult& res);
+
friend class WriteWrap;
friend class ShutdownWrap;
friend class Environment; // For kNumStreamBaseStateFields.
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index a3c45b940a..dac5ebdeb3 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -83,6 +83,8 @@ void LibuvStreamWrap::Initialize(Local<Object> target,
NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
+ NODE_DEFINE_CONSTANT(target, kBytesWritten);
+ NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
env->stream_base_state().GetJSArray()).FromJust();
}
diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js
index 5ff8760daa..851a0b3fbc 100644
--- a/test/sequential/test-async-wrap-getasyncid.js
+++ b/test/sequential/test-async-wrap-getasyncid.js
@@ -239,7 +239,7 @@ if (common.hasCrypto) { // eslint-disable-line node-core/crypto-check
const err = handle.writeLatin1String(wreq, 'hi'.repeat(100000));
if (err)
throw new Error(`write failed: ${getSystemErrorName(err)}`);
- if (!wreq.async) {
+ if (!stream_wrap.streamBaseState[stream_wrap.kLastWriteWasAsync]) {
testUninitialized(wreq, 'WriteWrap');
// Synchronous finish. Write more data until we hit an
// asynchronous write.