From b171adc4d12e96f7b72155ea54a78dd1c8e5823f Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Fri, 29 Dec 2017 16:17:12 +0100 Subject: lib: remove queue implementation from JSStreamWrap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The streams implementation generally ensures that only one write() call is active at a time. `JSStreamWrap` instances still kept queue of write reqeuests in spite of that; refactor it away. Also, fold `isAlive()` into a constant function on the native side. PR-URL: https://github.com/nodejs/node/pull/17918 Reviewed-By: Ben Noordhuis Reviewed-By: James M Snell Reviewed-By: Tobias Nießen Reviewed-By: Minwoo Jung Reviewed-By: Colin Ihrig Reviewed-By: Tiancheng "Timothy" Gu Reviewed-By: Matteo Collina --- lib/internal/wrap_js_stream.js | 159 ++++++++++++++++++----------------------- src/env.h | 1 - src/js_stream.cc | 8 +-- 3 files changed, 71 insertions(+), 97 deletions(-) diff --git a/lib/internal/wrap_js_stream.js b/lib/internal/wrap_js_stream.js index 611095655b..1c494e57e1 100644 --- a/lib/internal/wrap_js_stream.js +++ b/lib/internal/wrap_js_stream.js @@ -8,6 +8,15 @@ const uv = process.binding('uv'); const debug = util.debuglog('stream_wrap'); const errors = require('internal/errors'); +const kCurrentWriteRequest = Symbol('kCurrentWriteRequest'); +const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest'); + +function isClosing() { return this.owner.isClosing(); } +function onreadstart() { return this.owner.readStart(); } +function onreadstop() { return this.owner.readStop(); } +function onshutdown(req) { return this.owner.doShutdown(req); } +function onwrite(req, bufs) { return this.owner.doWrite(req, bufs); } + /* This class serves as a wrapper for when the C++ side of Node wants access * to a standard JS stream. For example, TLS or HTTP do not operate on network * resources conceptually, although that is the common case and what we are @@ -27,12 +36,13 @@ class JSStreamWrap extends Socket { debug('close'); this.doClose(cb); }; - handle.isAlive = () => this.isAlive(); - handle.isClosing = () => this.isClosing(); - handle.onreadstart = () => this.readStart(); - handle.onreadstop = () => this.readStop(); - handle.onshutdown = (req) => this.doShutdown(req); - handle.onwrite = (req, bufs) => this.doWrite(req, bufs); + // Inside of the following functions, `this` refers to the handle + // and `this.owner` refers to this JSStreamWrap instance. + handle.isClosing = isClosing; + handle.onreadstart = onreadstart; + handle.onreadstop = onreadstop; + handle.onshutdown = onshutdown; + handle.onwrite = onwrite; stream.pause(); stream.on('error', (err) => this.emit('error', err)); @@ -60,7 +70,10 @@ class JSStreamWrap extends Socket { super({ handle, manualStart: true }); this.stream = stream; - this._list = null; + this[kCurrentWriteRequest] = null; + this[kCurrentShutdownRequest] = null; + + // Start reading. this.read(0); } @@ -69,10 +82,6 @@ class JSStreamWrap extends Socket { return JSStreamWrap; } - isAlive() { - return true; - } - isClosing() { return !this.readable || !this.writable; } @@ -88,33 +97,56 @@ class JSStreamWrap extends Socket { } doShutdown(req) { + assert.strictEqual(this[kCurrentShutdownRequest], null); + this[kCurrentShutdownRequest] = req; + + // TODO(addaleax): It might be nice if we could get into a state where + // DoShutdown() is not called on streams while a write is still pending. + // + // Currently, the only part of the code base where that happens is the + // TLS implementation, which calls both DoWrite() and DoShutdown() on the + // underlying network stream inside of its own DoShutdown() method. + // Working around that on the native side is not quite trivial (yet?), + // so for now that is supported here. + + if (this[kCurrentWriteRequest] !== null) + return this.on('drain', () => this.doShutdown(req)); + assert.strictEqual(this[kCurrentWriteRequest], null); + const handle = this._handle; - const item = this._enqueue('shutdown', req); this.stream.end(() => { // Ensure that write was dispatched setImmediate(() => { - if (!this._dequeue(item)) - return; - - handle.finishShutdown(req, 0); + this.finishShutdown(handle, 0); }); }); return 0; } + // handle === this._handle except when called from doClose(). + finishShutdown(handle, errCode) { + // The shutdown request might already have been cancelled. + if (this[kCurrentShutdownRequest] === null) + return; + const req = this[kCurrentShutdownRequest]; + this[kCurrentShutdownRequest] = null; + handle.finishShutdown(req, errCode); + } + doWrite(req, bufs) { - const self = this; - const handle = this._handle; + assert.strictEqual(this[kCurrentWriteRequest], null); + assert.strictEqual(this[kCurrentShutdownRequest], null); + this[kCurrentWriteRequest] = req; - var pending = bufs.length; + const handle = this._handle; + const self = this; - // Queue the request to be able to cancel it - const item = this._enqueue('write', req); + let pending = bufs.length; this.stream.cork(); - for (var n = 0; n < bufs.length; n++) - this.stream.write(bufs[n], done); + for (var i = 0; i < bufs.length; ++i) + this.stream.write(bufs[i], done); this.stream.uncork(); function done(err) { @@ -126,93 +158,42 @@ class JSStreamWrap extends Socket { let errCode = 0; if (err) { - const code = uv[`UV_${err.code}`]; - errCode = (err.code && code) ? code : uv.UV_EPIPE; + errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE; } // Ensure that write was dispatched - setImmediate(function() { - // Do not invoke callback twice - if (!self._dequeue(item)) - return; - - handle.finishWrite(req, errCode); + setImmediate(() => { + self.finishWrite(handle, errCode); }); } return 0; } - _enqueue(type, req) { - const item = new QueueItem(type, req); - if (this._list === null) { - this._list = item; - return item; - } - - item.next = this._list.next; - item.prev = this._list; - item.next.prev = item; - item.prev.next = item; - - return item; - } - - _dequeue(item) { - assert(item instanceof QueueItem); - - var next = item.next; - var prev = item.prev; - - if (next === null && prev === null) - return false; - - item.next = null; - item.prev = null; - - if (next === item) { - prev = null; - next = null; - } else { - prev.next = next; - next.prev = prev; - } - - if (this._list === item) - this._list = next; + // handle === this._handle except when called from doClose(). + finishWrite(handle, errCode) { + // The write request might already have been cancelled. + if (this[kCurrentWriteRequest] === null) + return; + const req = this[kCurrentWriteRequest]; + this[kCurrentWriteRequest] = null; - return true; + handle.finishWrite(req, errCode); } doClose(cb) { const handle = this._handle; setImmediate(() => { - while (this._list !== null) { - const item = this._list; - const req = item.req; - this._dequeue(item); - - const errCode = uv.UV_ECANCELED; - if (item.type === 'write') { - handle.finishWrite(req, errCode); - } else if (item.type === 'shutdown') { - handle.finishShutdown(req, errCode); - } - } - // Should be already set by net.js assert.strictEqual(this._handle, null); + + this.finishWrite(handle, uv.UV_ECANCELED); + this.finishShutdown(handle, uv.UV_ECANCELED); + cb(); }); } } -function QueueItem(type, req) { - this.type = type; - this.req = req; - this.prev = this; - this.next = this; -} - module.exports = JSStreamWrap; diff --git a/src/env.h b/src/env.h index 88c0023707..9411444616 100644 --- a/src/env.h +++ b/src/env.h @@ -164,7 +164,6 @@ class ModuleWrap; V(internal_string, "internal") \ V(ipv4_string, "IPv4") \ V(ipv6_string, "IPv6") \ - V(isalive_string, "isAlive") \ V(isclosing_string, "isClosing") \ V(issuer_string, "issuer") \ V(issuercert_string, "issuerCertificate") \ diff --git a/src/js_stream.cc b/src/js_stream.cc index c4e32feeba..dba6d1a52b 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -80,13 +80,7 @@ AsyncWrap* JSStream::GetAsyncWrap() { bool JSStream::IsAlive() { - HandleScope scope(env()->isolate()); - Context::Scope context_scope(env()->context()); - v8::Local fn = object()->Get(env()->isalive_string()); - if (!fn->IsFunction()) - return false; - return MakeCallback(fn.As(), 0, nullptr) - .ToLocalChecked()->IsTrue(); + return true; } -- cgit v1.2.3