diff options
author | Anna Henningsen <anna@addaleax.net> | 2017-10-12 07:17:48 +0200 |
---|---|---|
committer | Anna Henningsen <anna@addaleax.net> | 2017-10-19 18:06:27 +0200 |
commit | 542e94cdce9e3a351af3fca9ef2825f4be88e943 (patch) | |
tree | a5bb6d2a7692b3c551fc4cc2b54af4f44f0ce6a1 /lib/_stream_wrap.js | |
parent | ca12ae6015e6936ecc8cea89222eee42911d1bf1 (diff) | |
download | android-node-v8-542e94cdce9e3a351af3fca9ef2825f4be88e943.tar.gz android-node-v8-542e94cdce9e3a351af3fca9ef2825f4be88e943.tar.bz2 android-node-v8-542e94cdce9e3a351af3fca9ef2825f4be88e943.zip |
lib: move _stream_wrap into internals
This makes a subsequent possible deprecation easier.
PR-URL: https://github.com/nodejs/node/pull/16158
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: Franziska Hinkelmann <franziska.hinkelmann@gmail.com>
Reviewed-By: Tobias Nießen <tniessen@tnie.de>
Diffstat (limited to 'lib/_stream_wrap.js')
-rw-r--r-- | lib/_stream_wrap.js | 222 |
1 files changed, 1 insertions, 221 deletions
diff --git a/lib/_stream_wrap.js b/lib/_stream_wrap.js index eaba8ebfa8..10a0cf57e7 100644 --- a/lib/_stream_wrap.js +++ b/lib/_stream_wrap.js @@ -1,223 +1,3 @@ 'use strict'; -const assert = require('assert'); -const util = require('util'); -const { Socket } = require('net'); -const { JSStream } = process.binding('js_stream'); -const uv = process.binding('uv'); -const debug = util.debuglog('stream_wrap'); -const errors = require('internal/errors'); - -function StreamWrap(stream) { - const handle = new JSStream(); - - this.stream = stream; - - this._list = null; - - const self = this; - handle.close = function(cb) { - debug('close'); - self.doClose(cb); - }; - handle.isAlive = function() { - return self.isAlive(); - }; - handle.isClosing = function() { - return self.isClosing(); - }; - handle.onreadstart = function() { - return self.readStart(); - }; - handle.onreadstop = function() { - return self.readStop(); - }; - handle.onshutdown = function(req) { - return self.doShutdown(req); - }; - handle.onwrite = function(req, bufs) { - return self.doWrite(req, bufs); - }; - - this.stream.pause(); - this.stream.on('error', function onerror(err) { - self.emit('error', err); - }); - this.stream.on('data', function ondata(chunk) { - if (typeof chunk === 'string' || this._readableState.objectMode === true) { - // Make sure that no further `data` events will happen - this.pause(); - this.removeListener('data', ondata); - - self.emit('error', new errors.Error('ERR_STREAM_WRAP')); - return; - } - - debug('data', chunk.length); - if (self._handle) - self._handle.readBuffer(chunk); - }); - this.stream.once('end', function onend() { - debug('end'); - if (self._handle) - self._handle.emitEOF(); - }); - - Socket.call(this, { - handle: handle - }); -} -util.inherits(StreamWrap, Socket); -module.exports = StreamWrap; - -// require('_stream_wrap').StreamWrap -StreamWrap.StreamWrap = StreamWrap; - -StreamWrap.prototype.isAlive = function isAlive() { - return true; -}; - -StreamWrap.prototype.isClosing = function isClosing() { - return !this.readable || !this.writable; -}; - -StreamWrap.prototype.readStart = function readStart() { - this.stream.resume(); - return 0; -}; - -StreamWrap.prototype.readStop = function readStop() { - this.stream.pause(); - return 0; -}; - -StreamWrap.prototype.doShutdown = function doShutdown(req) { - const self = this; - const handle = this._handle; - const item = this._enqueue('shutdown', req); - - this.stream.end(function() { - // Ensure that write was dispatched - setImmediate(function() { - if (!self._dequeue(item)) - return; - - handle.finishShutdown(req, 0); - }); - }); - return 0; -}; - -StreamWrap.prototype.doWrite = function doWrite(req, bufs) { - const self = this; - const handle = self._handle; - - var pending = bufs.length; - - // Queue the request to be able to cancel it - const item = self._enqueue('write', req); - - self.stream.cork(); - for (var n = 0; n < bufs.length; n++) - self.stream.write(bufs[n], done); - self.stream.uncork(); - - function done(err) { - if (!err && --pending !== 0) - return; - - // Ensure that this is called once in case of error - pending = 0; - - let errCode = 0; - if (err) { - const code = uv[`UV_${err.code}`]; - errCode = (err.code && code) ? code : uv.UV_EPIPE; - } - - // Ensure that write was dispatched - setImmediate(function() { - // Do not invoke callback twice - if (!self._dequeue(item)) - return; - - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); - }); - } - - return 0; -}; - -function QueueItem(type, req) { - this.type = type; - this.req = req; - this.prev = this; - this.next = this; -} - -StreamWrap.prototype._enqueue = function _enqueue(type, req) { - const item = new QueueItem(type, req); - if (this._list === null) { - this._list = item; - return item; - } - - item.next = this._list.next; - item.prev = this._list; - item.next.prev = item; - item.prev.next = item; - - return item; -}; - -StreamWrap.prototype._dequeue = function _dequeue(item) { - assert(item instanceof QueueItem); - - var next = item.next; - var prev = item.prev; - - if (next === null && prev === null) - return false; - - item.next = null; - item.prev = null; - - if (next === item) { - prev = null; - next = null; - } else { - prev.next = next; - next.prev = prev; - } - - if (this._list === item) - this._list = next; - - return true; -}; - -StreamWrap.prototype.doClose = function doClose(cb) { - const self = this; - const handle = self._handle; - - setImmediate(function() { - while (self._list !== null) { - const item = self._list; - const req = item.req; - self._dequeue(item); - - const errCode = uv.UV_ECANCELED; - if (item.type === 'write') { - handle.doAfterWrite(req); - handle.finishWrite(req, errCode); - } else if (item.type === 'shutdown') { - handle.finishShutdown(req, errCode); - } - } - - // Should be already set by net.js - assert(self._handle === null); - cb(); - }); -}; +module.exports = require('internal/wrap_js_stream'); |