diff options
Diffstat (limited to 'lib/internal/js_stream_socket.js')
-rw-r--r-- | lib/internal/js_stream_socket.js | 227 |
1 files changed, 227 insertions, 0 deletions
diff --git a/lib/internal/js_stream_socket.js b/lib/internal/js_stream_socket.js new file mode 100644 index 0000000000..8343b6c264 --- /dev/null +++ b/lib/internal/js_stream_socket.js @@ -0,0 +1,227 @@ +'use strict'; + +const assert = require('assert'); +const util = require('util'); +const { Socket } = require('net'); +const { JSStream } = internalBinding('js_stream'); +const uv = internalBinding('uv'); +const debug = util.debuglog('stream_socket'); +const { owner_symbol } = require('internal/async_hooks').symbols; +const { ERR_STREAM_WRAP } = require('internal/errors').codes; + +const kCurrentWriteRequest = Symbol('kCurrentWriteRequest'); +const kCurrentShutdownRequest = Symbol('kCurrentShutdownRequest'); +const kPendingShutdownRequest = Symbol('kPendingShutdownRequest'); + +function isClosing() { return this[owner_symbol].isClosing(); } +function onreadstart() { return this[owner_symbol].readStart(); } +function onreadstop() { return this[owner_symbol].readStop(); } +function onshutdown(req) { return this[owner_symbol].doShutdown(req); } +function onwrite(req, bufs) { return this[owner_symbol].doWrite(req, bufs); } + +/* This class serves as a wrapper for when the C++ side of Node wants access + * to a standard JS stream. For example, TLS or HTTP do not operate on network + * resources conceptually, although that is the common case and what we are + * optimizing for; in theory, they are completely composable and can work with + * any stream resource they see. + * + * For the common case, i.e. a TLS socket wrapping around a net.Socket, we + * can skip going through the JS layer and let TLS access the raw C++ handle + * of a net.Socket. The flipside of this is that, to maintain composability, + * we need a way to create "fake" net.Socket instances that call back into a + * "real" JavaScript stream. JSStreamSocket is exactly this. + */ +class JSStreamSocket extends Socket { + constructor(stream) { + const handle = new JSStream(); + handle.close = (cb) => { + debug('close'); + this.doClose(cb); + }; + // Inside of the following functions, `this` refers to the handle + // and `this[owner_symbol]` refers to this JSStreamSocket instance. + handle.isClosing = isClosing; + handle.onreadstart = onreadstart; + handle.onreadstop = onreadstop; + handle.onshutdown = onshutdown; + handle.onwrite = onwrite; + + stream.pause(); + stream.on('error', (err) => this.emit('error', err)); + const ondata = (chunk) => { + if (typeof chunk === 'string' || + stream._readableState.objectMode === true) { + // Make sure that no further `data` events will happen. + stream.pause(); + stream.removeListener('data', ondata); + + this.emit('error', new ERR_STREAM_WRAP()); + return; + } + + debug('data', chunk.length); + if (this._handle) + this._handle.readBuffer(chunk); + }; + stream.on('data', ondata); + stream.once('end', () => { + debug('end'); + if (this._handle) + this._handle.emitEOF(); + }); + // Some `Stream` don't pass `hasError` parameters when closed. + stream.once('close', () => { + // Errors emitted from `stream` have also been emitted to this instance + // so that we don't pass errors to `destroy()` again. + this.destroy(); + }); + + super({ handle, manualStart: true }); + this.stream = stream; + this[kCurrentWriteRequest] = null; + this[kCurrentShutdownRequest] = null; + this[kPendingShutdownRequest] = null; + this.readable = stream.readable; + this.writable = stream.writable; + + // Start reading. + this.read(0); + } + + // Allow legacy requires in the test suite to keep working: + // const { StreamWrap } = require('internal/js_stream_socket') + static get StreamWrap() { + return JSStreamSocket; + } + + isClosing() { + return !this.readable || !this.writable; + } + + readStart() { + this.stream.resume(); + return 0; + } + + readStop() { + this.stream.pause(); + return 0; + } + + doShutdown(req) { + // TODO(addaleax): It might be nice if we could get into a state where + // DoShutdown() is not called on streams while a write is still pending. + // + // Currently, the only part of the code base where that happens is the + // TLS implementation, which calls both DoWrite() and DoShutdown() on the + // underlying network stream inside of its own DoShutdown() method. + // Working around that on the native side is not quite trivial (yet?), + // so for now that is supported here. + + if (this[kCurrentWriteRequest] !== null) { + this[kPendingShutdownRequest] = req; + return 0; + } + assert.strictEqual(this[kCurrentWriteRequest], null); + assert.strictEqual(this[kCurrentShutdownRequest], null); + this[kCurrentShutdownRequest] = req; + + const handle = this._handle; + + setImmediate(() => { + // Ensure that write is dispatched asynchronously. + this.stream.end(() => { + this.finishShutdown(handle, 0); + }); + }); + return 0; + } + + // handle === this._handle except when called from doClose(). + finishShutdown(handle, errCode) { + // The shutdown request might already have been cancelled. + if (this[kCurrentShutdownRequest] === null) + return; + const req = this[kCurrentShutdownRequest]; + this[kCurrentShutdownRequest] = null; + handle.finishShutdown(req, errCode); + } + + doWrite(req, bufs) { + assert.strictEqual(this[kCurrentWriteRequest], null); + assert.strictEqual(this[kCurrentShutdownRequest], null); + + const handle = this._handle; + const self = this; + + let pending = bufs.length; + + this.stream.cork(); + for (var i = 0; i < bufs.length; ++i) + this.stream.write(bufs[i], done); + this.stream.uncork(); + + // Only set the request here, because the `write()` calls could throw. + this[kCurrentWriteRequest] = req; + + function done(err) { + if (!err && --pending !== 0) + return; + + // Ensure that this is called once in case of error + pending = 0; + + let errCode = 0; + if (err) { + errCode = uv[`UV_${err.code}`] || uv.UV_EPIPE; + } + + // Ensure that write was dispatched + setImmediate(() => { + self.finishWrite(handle, errCode); + }); + } + + return 0; + } + + // handle === this._handle except when called from doClose(). + finishWrite(handle, errCode) { + // The write request might already have been cancelled. + if (this[kCurrentWriteRequest] === null) + return; + const req = this[kCurrentWriteRequest]; + this[kCurrentWriteRequest] = null; + + handle.finishWrite(req, errCode); + if (this[kPendingShutdownRequest]) { + const req = this[kPendingShutdownRequest]; + this[kPendingShutdownRequest] = null; + this.doShutdown(req); + } + } + + doClose(cb) { + const handle = this._handle; + + // When sockets of the "net" module destroyed, they will call + // `this._handle.close()` which will also emit EOF if not emitted before. + // This feature makes sockets on the other side emit "end" and "close" + // even though we haven't called `end()`. As `stream` are likely to be + // instances of `net.Socket`, calling `stream.destroy()` manually will + // avoid issues that don't properly close wrapped connections. + this.stream.destroy(); + + setImmediate(() => { + // Should be already set by net.js + assert.strictEqual(this._handle, null); + + this.finishWrite(handle, uv.UV_ECANCELED); + this.finishShutdown(handle, uv.UV_ECANCELED); + + cb(); + }); + } +} + +module.exports = JSStreamSocket; |