summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/net.js8
-rw-r--r--src/stream_wrap.cc22
-rw-r--r--src/stream_wrap.h4
-rw-r--r--src/tls_wrap.cc10
-rw-r--r--src/tls_wrap.h4
-rw-r--r--test/sequential/test-http-keep-alive-large-write.js80
-rw-r--r--test/sequential/test-https-keep-alive-large-write.js87
7 files changed, 210 insertions, 5 deletions
diff --git a/lib/net.js b/lib/net.js
index 53b9d33f48..5356357fc9 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -397,6 +397,14 @@ Socket.prototype.setTimeout = function(msecs, callback) {
Socket.prototype._onTimeout = function() {
+ // `.prevWriteQueueSize` !== `.updateWriteQueueSize()` means there is
+ // an active write in progress, so we suppress the timeout.
+ const prevWriteQueueSize = this._handle.writeQueueSize;
+ if (prevWriteQueueSize > 0 &&
+ prevWriteQueueSize !== this._handle.updateWriteQueueSize()) {
+ this._unrefTimer();
+ return;
+ }
debug('_onTimeout');
this.emit('timeout');
};
diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc
index 660702eb35..0107cbad2d 100644
--- a/src/stream_wrap.cc
+++ b/src/stream_wrap.cc
@@ -104,6 +104,7 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env,
void LibuvStreamWrap::AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags) {
+ env->SetProtoMethod(target, "updateWriteQueueSize", UpdateWriteQueueSize);
env->SetProtoMethod(target, "setBlocking", SetBlocking);
StreamBase::AddMethods<LibuvStreamWrap>(env, target, flags);
}
@@ -144,11 +145,14 @@ bool LibuvStreamWrap::IsIPCPipe() {
}
-void LibuvStreamWrap::UpdateWriteQueueSize() {
+uint32_t LibuvStreamWrap::UpdateWriteQueueSize() {
HandleScope scope(env()->isolate());
- Local<Integer> write_queue_size =
- Integer::NewFromUnsigned(env()->isolate(), stream()->write_queue_size);
- object()->Set(env()->write_queue_size_string(), write_queue_size);
+ uint32_t write_queue_size = stream()->write_queue_size;
+ object()->Set(env()->context(),
+ env()->write_queue_size_string(),
+ Integer::NewFromUnsigned(env()->isolate(),
+ write_queue_size)).FromJust();
+ return write_queue_size;
}
@@ -273,6 +277,16 @@ void LibuvStreamWrap::OnRead(uv_stream_t* handle,
}
+void LibuvStreamWrap::UpdateWriteQueueSize(
+ const FunctionCallbackInfo<Value>& args) {
+ LibuvStreamWrap* wrap;
+ ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
+
+ uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
+ args.GetReturnValue().Set(write_queue_size);
+}
+
+
void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo<Value>& args) {
LibuvStreamWrap* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
diff --git a/src/stream_wrap.h b/src/stream_wrap.h
index d8fbcf709a..43df504e81 100644
--- a/src/stream_wrap.h
+++ b/src/stream_wrap.h
@@ -84,13 +84,15 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase {
}
AsyncWrap* GetAsyncWrap() override;
- void UpdateWriteQueueSize();
+ uint32_t UpdateWriteQueueSize();
static void AddMethods(Environment* env,
v8::Local<v8::FunctionTemplate> target,
int flags = StreamBase::kFlagNone);
private:
+ static void UpdateWriteQueueSize(
+ const v8::FunctionCallbackInfo<v8::Value>& args);
static void SetBlocking(const v8::FunctionCallbackInfo<v8::Value>& args);
// Callbacks for libuv
diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc
index 0da332f16e..63e3494047 100644
--- a/src/tls_wrap.cc
+++ b/src/tls_wrap.cc
@@ -932,6 +932,15 @@ int TLSWrap::SelectSNIContextCallback(SSL* s, int* ad, void* arg) {
#endif // SSL_CTRL_SET_TLSEXT_SERVERNAME_CB
+void TLSWrap::UpdateWriteQueueSize(const FunctionCallbackInfo<Value>& args) {
+ TLSWrap* wrap;
+ ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
+
+ uint32_t write_queue_size = wrap->UpdateWriteQueueSize();
+ args.GetReturnValue().Set(write_queue_size);
+}
+
+
void TLSWrap::Initialize(Local<Object> target,
Local<Value> unused,
Local<Context> context) {
@@ -958,6 +967,7 @@ void TLSWrap::Initialize(Local<Object> target,
env->SetProtoMethod(t, "enableSessionCallbacks", EnableSessionCallbacks);
env->SetProtoMethod(t, "destroySSL", DestroySSL);
env->SetProtoMethod(t, "enableCertCb", EnableCertCb);
+ env->SetProtoMethod(t, "updateWriteQueueSize", UpdateWriteQueueSize);
StreamBase::AddMethods<TLSWrap>(env, t, StreamBase::kFlagHasWritev);
SSLWrap<TLSWrap>::AddMethods(env, t);
diff --git a/src/tls_wrap.h b/src/tls_wrap.h
index 8d75d6fcc9..99d2dc9121 100644
--- a/src/tls_wrap.h
+++ b/src/tls_wrap.h
@@ -188,6 +188,10 @@ class TLSWrap : public AsyncWrap,
// If true - delivered EOF to the js-land, either after `close_notify`, or
// after the `UV_EOF` on socket.
bool eof_;
+
+ private:
+ static void UpdateWriteQueueSize(
+ const v8::FunctionCallbackInfo<v8::Value>& args);
};
} // namespace node
diff --git a/test/sequential/test-http-keep-alive-large-write.js b/test/sequential/test-http-keep-alive-large-write.js
new file mode 100644
index 0000000000..2cdf539e76
--- /dev/null
+++ b/test/sequential/test-http-keep-alive-large-write.js
@@ -0,0 +1,80 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const http = require('http');
+
+// This test assesses whether long-running writes can complete
+// or timeout because the socket is not aware that the backing
+// stream is still writing.
+// To simulate a slow client, we write a really large chunk and
+// then proceed through the following cycle:
+// 1) Receive first 'data' event and record currently written size
+// 2) Once we've read up to currently written size recorded above,
+// we pause the stream and wait longer than the server timeout
+// 3) Socket.prototype._onTimeout triggers and should confirm
+// that the backing stream is still active and writing
+// 4) Our timer fires, we resume the socket and start at 1)
+
+const minReadSize = 250000;
+const serverTimeout = common.platformTimeout(500);
+let offsetTimeout = common.platformTimeout(100);
+let serverConnectionHandle;
+let writeSize = 3000000;
+let didReceiveData = false;
+// this represents each cycles write size, where the cycle consists
+// of `write > read > _onTimeout`
+let currentWriteSize = 0;
+
+const server = http.createServer(common.mustCall((req, res) => {
+ const content = Buffer.alloc(writeSize, 0x44);
+
+ res.writeHead(200, {
+ 'Content-Type': 'application/octet-stream',
+ 'Content-Length': content.length.toString(),
+ 'Vary': 'Accept-Encoding'
+ });
+
+ serverConnectionHandle = res.socket._handle;
+ res.write(content);
+ res.end();
+}));
+server.setTimeout(serverTimeout);
+server.on('timeout', () => {
+ assert.strictEqual(didReceiveData, false, 'Should not timeout');
+});
+
+server.listen(0, common.mustCall(() => {
+ http.get({
+ path: '/',
+ port: server.address().port
+ }, common.mustCall((res) => {
+ const resume = () => res.resume();
+ let receivedBufferLength = 0;
+ let firstReceivedAt;
+ res.on('data', common.mustCallAtLeast((buf) => {
+ if (receivedBufferLength === 0) {
+ currentWriteSize = Math.max(
+ minReadSize,
+ writeSize - serverConnectionHandle.writeQueueSize
+ );
+ didReceiveData = false;
+ firstReceivedAt = Date.now();
+ }
+ receivedBufferLength += buf.length;
+ if (receivedBufferLength >= currentWriteSize) {
+ didReceiveData = true;
+ writeSize = serverConnectionHandle.writeQueueSize;
+ receivedBufferLength = 0;
+ res.pause();
+ setTimeout(
+ resume,
+ serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
+ );
+ offsetTimeout = 0;
+ }
+ }, 1));
+ res.on('end', common.mustCall(() => {
+ server.close();
+ }));
+ }));
+}));
diff --git a/test/sequential/test-https-keep-alive-large-write.js b/test/sequential/test-https-keep-alive-large-write.js
new file mode 100644
index 0000000000..88468dc03f
--- /dev/null
+++ b/test/sequential/test-https-keep-alive-large-write.js
@@ -0,0 +1,87 @@
+'use strict';
+const common = require('../common');
+if (!common.hasCrypto)
+ common.skip('missing crypto');
+const assert = require('assert');
+const fixtures = require('../common/fixtures');
+const https = require('https');
+
+// This test assesses whether long-running writes can complete
+// or timeout because the socket is not aware that the backing
+// stream is still writing.
+// To simulate a slow client, we write a really large chunk and
+// then proceed through the following cycle:
+// 1) Receive first 'data' event and record currently written size
+// 2) Once we've read up to currently written size recorded above,
+// we pause the stream and wait longer than the server timeout
+// 3) Socket.prototype._onTimeout triggers and should confirm
+// that the backing stream is still active and writing
+// 4) Our timer fires, we resume the socket and start at 1)
+
+const minReadSize = 250000;
+const serverTimeout = common.platformTimeout(500);
+let offsetTimeout = common.platformTimeout(100);
+let serverConnectionHandle;
+let writeSize = 2000000;
+let didReceiveData = false;
+// this represents each cycles write size, where the cycle consists
+// of `write > read > _onTimeout`
+let currentWriteSize = 0;
+
+const server = https.createServer({
+ key: fixtures.readKey('agent1-key.pem'),
+ cert: fixtures.readKey('agent1-cert.pem')
+}, common.mustCall((req, res) => {
+ const content = Buffer.alloc(writeSize, 0x44);
+
+ res.writeHead(200, {
+ 'Content-Type': 'application/octet-stream',
+ 'Content-Length': content.length.toString(),
+ 'Vary': 'Accept-Encoding'
+ });
+
+ serverConnectionHandle = res.socket._handle;
+ res.write(content);
+ res.end();
+}));
+server.setTimeout(serverTimeout);
+server.on('timeout', () => {
+ assert.strictEqual(didReceiveData, false, 'Should not timeout');
+});
+
+server.listen(0, common.mustCall(() => {
+ https.get({
+ path: '/',
+ port: server.address().port,
+ rejectUnauthorized: false
+ }, common.mustCall((res) => {
+ const resume = () => res.resume();
+ let receivedBufferLength = 0;
+ let firstReceivedAt;
+ res.on('data', common.mustCallAtLeast((buf) => {
+ if (receivedBufferLength === 0) {
+ currentWriteSize = Math.max(
+ minReadSize,
+ writeSize - serverConnectionHandle.writeQueueSize
+ );
+ didReceiveData = false;
+ firstReceivedAt = Date.now();
+ }
+ receivedBufferLength += buf.length;
+ if (receivedBufferLength >= currentWriteSize) {
+ didReceiveData = true;
+ writeSize = serverConnectionHandle.writeQueueSize;
+ receivedBufferLength = 0;
+ res.pause();
+ setTimeout(
+ resume,
+ serverTimeout + offsetTimeout - (Date.now() - firstReceivedAt)
+ );
+ offsetTimeout = 0;
+ }
+ }, 1));
+ res.on('end', common.mustCall(() => {
+ server.close();
+ }));
+ }));
+}));