summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2017-12-29 16:17:12 +0100
committerAnna Henningsen <anna@addaleax.net>2018-01-07 21:21:32 +0100
commitb171adc4d12e96f7b72155ea54a78dd1c8e5823f (patch)
tree270ff46d16f7e3e1c534ddcf1fb674b12e654108
parent8a86d9c1cf35fe4f892d483e3673083f5d8f42cf (diff)
downloadandroid-node-v8-b171adc4d12e96f7b72155ea54a78dd1c8e5823f.tar.gz
android-node-v8-b171adc4d12e96f7b72155ea54a78dd1c8e5823f.tar.bz2
android-node-v8-b171adc4d12e96f7b72155ea54a78dd1c8e5823f.zip
lib: remove queue implementation from JSStreamWrap
The streams implementation generally ensures that only one write() call is active at a time. `JSStreamWrap` instances still kept queue of write reqeuests in spite of that; refactor it away. Also, fold `isAlive()` into a constant function on the native side. PR-URL: https://github.com/nodejs/node/pull/17918 Reviewed-By: Ben Noordhuis <info@bnoordhuis.nl> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Tobias Nießen <tniessen@tnie.de> Reviewed-By: Minwoo Jung <minwoo@nodesource.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com> Reviewed-By: Tiancheng "Timothy" Gu <timothygu99@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
-rw-r--r--lib/internal/wrap_js_stream.js159
-rw-r--r--src/env.h1
-rw-r--r--src/js_stream.cc8
3 files changed, 71 insertions, 97 deletions
diff --git a/lib/internal/wrap_js_stream.js b/lib/internal/wrap_js_stream.js
index 611095655b..1c494e57e1 100644
--- a/lib/internal/wrap_js_stream.js
+++ b/lib/internal/wrap_js_stream.js
@@ -8,6 +8,15 @@ const uv = process.binding('uv');
const debug = util.debuglog('stream_wrap');
const errors = require('internal/errors');
+const kCurrentWriteRequest = Symbol('kCurrentWriteRequest');
+const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest');
+
+function isClosing() { return this.owner.isClosing(); }
+function onreadstart() { return this.owner.readStart(); }
+function onreadstop() { return this.owner.readStop(); }
+function onshutdown(req) { return this.owner.doShutdown(req); }
+function onwrite(req, bufs) { return this.owner.doWrite(req, bufs); }
+
/* This class serves as a wrapper for when the C++ side of Node wants access
* to a standard JS stream. For example, TLS or HTTP do not operate on network
* resources conceptually, although that is the common case and what we are
@@ -27,12 +36,13 @@ class JSStreamWrap extends Socket {
debug('close');
this.doClose(cb);
};
- handle.isAlive = () => this.isAlive();
- handle.isClosing = () => this.isClosing();
- handle.onreadstart = () => this.readStart();
- handle.onreadstop = () => this.readStop();
- handle.onshutdown = (req) => this.doShutdown(req);
- handle.onwrite = (req, bufs) => this.doWrite(req, bufs);
+ // Inside of the following functions, `this` refers to the handle
+ // and `this.owner` refers to this JSStreamWrap instance.
+ handle.isClosing = isClosing;
+ handle.onreadstart = onreadstart;
+ handle.onreadstop = onreadstop;
+ handle.onshutdown = onshutdown;
+ handle.onwrite = onwrite;
stream.pause();
stream.on('error', (err) => this.emit('error', err));
@@ -60,7 +70,10 @@ class JSStreamWrap extends Socket {
super({ handle, manualStart: true });
this.stream = stream;
- this._list = null;
+ this[kCurrentWriteRequest] = null;
+ this[kCurrentShutdownRequest] = null;
+
+ // Start reading.
this.read(0);
}
@@ -69,10 +82,6 @@ class JSStreamWrap extends Socket {
return JSStreamWrap;
}
- isAlive() {
- return true;
- }
-
isClosing() {
return !this.readable || !this.writable;
}
@@ -88,33 +97,56 @@ class JSStreamWrap extends Socket {
}
doShutdown(req) {
+ assert.strictEqual(this[kCurrentShutdownRequest], null);
+ this[kCurrentShutdownRequest] = req;
+
+ // TODO(addaleax): It might be nice if we could get into a state where
+ // DoShutdown() is not called on streams while a write is still pending.
+ //
+ // Currently, the only part of the code base where that happens is the
+ // TLS implementation, which calls both DoWrite() and DoShutdown() on the
+ // underlying network stream inside of its own DoShutdown() method.
+ // Working around that on the native side is not quite trivial (yet?),
+ // so for now that is supported here.
+
+ if (this[kCurrentWriteRequest] !== null)
+ return this.on('drain', () => this.doShutdown(req));
+ assert.strictEqual(this[kCurrentWriteRequest], null);
+
const handle = this._handle;
- const item = this._enqueue('shutdown', req);
this.stream.end(() => {
// Ensure that write was dispatched
setImmediate(() => {
- if (!this._dequeue(item))
- return;
-
- handle.finishShutdown(req, 0);
+ this.finishShutdown(handle, 0);
});
});
return 0;
}
+ // handle === this._handle except when called from doClose().
+ finishShutdown(handle, errCode) {
+ // The shutdown request might already have been cancelled.
+ if (this[kCurrentShutdownRequest] === null)
+ return;
+ const req = this[kCurrentShutdownRequest];
+ this[kCurrentShutdownRequest] = null;
+ handle.finishShutdown(req, errCode);
+ }
+
doWrite(req, bufs) {
- const self = this;
- const handle = this._handle;
+ assert.strictEqual(this[kCurrentWriteRequest], null);
+ assert.strictEqual(this[kCurrentShutdownRequest], null);
+ this[kCurrentWriteRequest] = req;
- var pending = bufs.length;
+ const handle = this._handle;
+ const self = this;
- // Queue the request to be able to cancel it
- const item = this._enqueue('write', req);
+ let pending = bufs.length;
this.stream.cork();
- for (var n = 0; n < bufs.length; n++)
- this.stream.write(bufs[n], done);
+ for (var i = 0; i < bufs.length; ++i)
+ this.stream.write(bufs[i], done);
this.stream.uncork();
function done(err) {
@@ -126,93 +158,42 @@ class JSStreamWrap extends Socket {
let errCode = 0;
if (err) {
- const code = uv[`UV_${err.code}`];
- errCode = (err.code && code) ? code : uv.UV_EPIPE;
+ errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE;
}
// Ensure that write was dispatched
- setImmediate(function() {
- // Do not invoke callback twice
- if (!self._dequeue(item))
- return;
-
- handle.finishWrite(req, errCode);
+ setImmediate(() => {
+ self.finishWrite(handle, errCode);
});
}
return 0;
}
- _enqueue(type, req) {
- const item = new QueueItem(type, req);
- if (this._list === null) {
- this._list = item;
- return item;
- }
-
- item.next = this._list.next;
- item.prev = this._list;
- item.next.prev = item;
- item.prev.next = item;
-
- return item;
- }
-
- _dequeue(item) {
- assert(item instanceof QueueItem);
-
- var next = item.next;
- var prev = item.prev;
-
- if (next === null && prev === null)
- return false;
-
- item.next = null;
- item.prev = null;
-
- if (next === item) {
- prev = null;
- next = null;
- } else {
- prev.next = next;
- next.prev = prev;
- }
-
- if (this._list === item)
- this._list = next;
+ // handle === this._handle except when called from doClose().
+ finishWrite(handle, errCode) {
+ // The write request might already have been cancelled.
+ if (this[kCurrentWriteRequest] === null)
+ return;
+ const req = this[kCurrentWriteRequest];
+ this[kCurrentWriteRequest] = null;
- return true;
+ handle.finishWrite(req, errCode);
}
doClose(cb) {
const handle = this._handle;
setImmediate(() => {
- while (this._list !== null) {
- const item = this._list;
- const req = item.req;
- this._dequeue(item);
-
- const errCode = uv.UV_ECANCELED;
- if (item.type === 'write') {
- handle.finishWrite(req, errCode);
- } else if (item.type === 'shutdown') {
- handle.finishShutdown(req, errCode);
- }
- }
-
// Should be already set by net.js
assert.strictEqual(this._handle, null);
+
+ this.finishWrite(handle, uv.UV_ECANCELED);
+ this.finishShutdown(handle, uv.UV_ECANCELED);
+
cb();
});
}
}
-function QueueItem(type, req) {
- this.type = type;
- this.req = req;
- this.prev = this;
- this.next = this;
-}
-
module.exports = JSStreamWrap;
diff --git a/src/env.h b/src/env.h
index 88c0023707..9411444616 100644
--- a/src/env.h
+++ b/src/env.h
@@ -164,7 +164,6 @@ class ModuleWrap;
V(internal_string, "internal") \
V(ipv4_string, "IPv4") \
V(ipv6_string, "IPv6") \
- V(isalive_string, "isAlive") \
V(isclosing_string, "isClosing") \
V(issuer_string, "issuer") \
V(issuercert_string, "issuerCertificate") \
diff --git a/src/js_stream.cc b/src/js_stream.cc
index c4e32feeba..dba6d1a52b 100644
--- a/src/js_stream.cc
+++ b/src/js_stream.cc
@@ -80,13 +80,7 @@ AsyncWrap* JSStream::GetAsyncWrap() {
bool JSStream::IsAlive() {
- HandleScope scope(env()->isolate());
- Context::Scope context_scope(env()->context());
- v8::Local<v8::Value> fn = object()->Get(env()->isalive_string());
- if (!fn->IsFunction())
- return false;
- return MakeCallback(fn.As<v8::Function>(), 0, nullptr)
- .ToLocalChecked()->IsTrue();
+ return true;
}