summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCalvin Metcalf <cmetcalf@appgeo.com>2017-05-16 11:08:49 -0400
committerMatteo Collina <hello@matteocollina.com>2017-12-15 23:05:45 +0100
commite20af3371bb1f0ee16795528cbcbc674408842cb (patch)
tree0a0426a7c304adcf9b0bfc9c0e3e7c1a762a8fd3
parent31addac8bbcd3e0a8cefba8327cfcc7fd9bdeb68 (diff)
downloadandroid-node-v8-e20af3371bb1f0ee16795528cbcbc674408842cb.tar.gz
android-node-v8-e20af3371bb1f0ee16795528cbcbc674408842cb.tar.bz2
android-node-v8-e20af3371bb1f0ee16795528cbcbc674408842cb.zip
stream: add flow and buffer properties to streams
This adds computed properties to readable and writable streams to allow access to the readable buffer, the writable buffer, and flow state without accessing the readable or writable state. These are the only uses of readable and writable state in the docs so adding these work arounds allows them to be removed from the docs. This also updates net, http_client and http_server to use the new methods instead of manipulating readable and writable state directly. See: https://github.com/nodejs/node/issues/445 PR-URL: https://github.com/nodejs/node/pull/12855 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de> Reviewed-By: James M Snell <jasnell@gmail.com>
-rw-r--r--benchmark/http/upgrade.js53
-rw-r--r--doc/api/stream.md22
-rw-r--r--lib/_http_client.js4
-rw-r--r--lib/_http_server.js4
-rw-r--r--lib/_stream_duplex.js10
-rw-r--r--lib/_stream_readable.js25
-rw-r--r--lib/_stream_writable.js10
-rw-r--r--lib/net.js4
-rw-r--r--test/parallel/test-stream-push-order.js2
-rw-r--r--test/parallel/test-stream-readable-reading-readingMore.js2
-rw-r--r--test/parallel/test-stream2-transform.js2
-rw-r--r--test/parallel/test-stream2-unpipe-leak.js2
-rw-r--r--test/parallel/test-stream3-pause-then-read.js2
13 files changed, 118 insertions, 24 deletions
diff --git a/benchmark/http/upgrade.js b/benchmark/http/upgrade.js
new file mode 100644
index 0000000000..0feaecc8ff
--- /dev/null
+++ b/benchmark/http/upgrade.js
@@ -0,0 +1,53 @@
+'use strict';
+
+const common = require('../common.js');
+const PORT = common.PORT;
+const net = require('net');
+
+const bench = common.createBenchmark(main, {
+ n: [5, 1000]
+});
+
+const reqData = 'GET / HTTP/1.1\r\n' +
+ 'Upgrade: WebSocket\r\n' +
+ 'Connection: Upgrade\r\n' +
+ '\r\n' +
+ 'WjN}|M(6';
+
+const resData = 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
+ 'Upgrade: WebSocket\r\n' +
+ 'Connection: Upgrade\r\n' +
+ '\r\n\r\n';
+
+function main({ n }) {
+ process.env.PORT = PORT;
+ var server = require('../fixtures/simple-http-server.js')
+ .listen(common.PORT)
+ .on('listening', function() {
+ bench.start();
+ doBench(server.address(), n, function() {
+ bench.end(n);
+ server.close();
+ });
+ })
+ .on('upgrade', function(req, socket, upgradeHead) {
+ socket.resume();
+ socket.write(resData);
+ socket.end();
+ });
+}
+
+function doBench(address, count, done) {
+ if (count === 0) {
+ done();
+ return;
+ }
+
+ const conn = net.createConnection(address.port);
+ conn.write(reqData);
+ conn.resume();
+
+ conn.on('end', function() {
+ doBench(address, count - 1, done);
+ });
+}
diff --git a/doc/api/stream.md b/doc/api/stream.md
index f3780ee4c4..dc74df7720 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -63,8 +63,8 @@ object mode is not safe.
<!--type=misc-->
Both [Writable][] and [Readable][] streams will store data in an internal
-buffer that can be retrieved using `writable._writableState.getBuffer()` or
-`readable._readableState.buffer`, respectively.
+buffer that can be retrieved using `writable.writableBuffer` or
+`readable.readableBuffer`, respectively.
The amount of data potentially buffered depends on the `highWaterMark` option
passed into the streams constructor. For normal streams, the `highWaterMark`
@@ -602,22 +602,22 @@ Readable stream implementation.
Specifically, at any given point in time, every Readable is in one of three
possible states:
-* `readable._readableState.flowing = null`
-* `readable._readableState.flowing = false`
-* `readable._readableState.flowing = true`
+* `readable.readableFlowing = null`
+* `readable.readableFlowing = false`
+* `readable.readableFlowing = true`
-When `readable._readableState.flowing` is `null`, no mechanism for consuming the
+When `readable.readableFlowing` is `null`, no mechanism for consuming the
streams data is provided so the stream will not generate its data. While in this
state, attaching a listener for the `'data'` event, calling the `readable.pipe()`
method, or calling the `readable.resume()` method will switch
-`readable._readableState.flowing` to `true`, causing the Readable to begin
+`readable.readableFlowing` to `true`, causing the Readable to begin
actively emitting events as data is generated.
Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure"
-will cause the `readable._readableState.flowing` to be set as `false`,
+will cause the `readable.readableFlowing` to be set as `false`,
temporarily halting the flowing of events but *not* halting the generation of
data. While in this state, attaching a listener for the `'data'` event
-would not cause `readable._readableState.flowing` to switch to `true`.
+would not cause `readable.readableFlowing` to switch to `true`.
```js
const { PassThrough, Writable } = require('stream');
@@ -626,14 +626,14 @@ const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
-// flowing is now false
+// readableFlowing is now false
pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // will not emit 'data'
pass.resume(); // must be called to make 'data' being emitted
```
-While `readable._readableState.flowing` is `false`, data may be accumulating
+While `readable.readableFlowing` is `false`, data may be accumulating
within the streams internal buffer.
#### Choose One
diff --git a/lib/_http_client.js b/lib/_http_client.js
index bdda708493..d9a2d10ae2 100644
--- a/lib/_http_client.js
+++ b/lib/_http_client.js
@@ -432,9 +432,7 @@ function socketOnData(d) {
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
- // TODO(isaacs): Need a way to reset a stream to fresh state
- // IE, not flowing, and not explicitly paused.
- socket._readableState.flowing = null;
+ socket.readableFlowing = null;
req.emit(eventName, res, socket, bodyHead);
req.emit('close');
diff --git a/lib/_http_server.js b/lib/_http_server.js
index a1cbb63c1d..cab4824501 100644
--- a/lib/_http_server.js
+++ b/lib/_http_server.js
@@ -502,9 +502,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
debug('SERVER have listener for %s', eventName);
var bodyHead = d.slice(bytesParsed, d.length);
- // TODO(isaacs): Need a way to reset a stream to fresh state
- // IE, not flowing, and not explicitly paused.
- socket._readableState.flowing = null;
+ socket.readableFlowing = null;
server.emit(eventName, req, socket, bodyHead);
} else {
// Got upgrade header or CONNECT method, but have no handler.
diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js
index 05f6493408..e99d246396 100644
--- a/lib/_stream_duplex.js
+++ b/lib/_stream_duplex.js
@@ -74,6 +74,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
}
});
+Object.defineProperty(Duplex.prototype, 'writableBuffer', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+});
+
// the no-half-open enforcer
function onend() {
// if we allow half-open state, or if the writable side ended,
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 01d886e6ca..2653d3835d 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -925,6 +925,31 @@ Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
}
});
+Object.defineProperty(Readable.prototype, 'readableBuffer', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function() {
+ return this._readableState && this._readableState.buffer;
+ }
+});
+
+Object.defineProperty(Readable.prototype, 'readableFlowing', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function() {
+ return this._readableState.flowing;
+ },
+ set: function(state) {
+ if (this._readableState) {
+ this._readableState.flowing = state;
+ }
+ }
+});
+
// exposed for testing purposes only.
Readable._fromList = fromList;
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index f5f05486d2..97fbebdd66 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -324,6 +324,16 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
return this;
};
+Object.defineProperty(Writable.prototype, 'writableBuffer', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+});
+
function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
diff --git a/lib/net.js b/lib/net.js
index 5562db6ed6..aa27e70050 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -254,7 +254,7 @@ function Socket(options) {
// stop the handle from reading and pause the stream
this._handle.reading = false;
this._handle.readStop();
- this._readableState.flowing = false;
+ this.readableFlowing = false;
} else if (!options.manualStart) {
this.read(0);
}
@@ -819,7 +819,7 @@ protoGetter('bytesWritten', function bytesWritten() {
if (!state)
return undefined;
- state.getBuffer().forEach(function(el) {
+ this.writableBuffer.forEach(function(el) {
if (el.chunk instanceof Buffer)
bytes += el.chunk.length;
else
diff --git a/test/parallel/test-stream-push-order.js b/test/parallel/test-stream-push-order.js
index be2db9f44a..ce4f336b02 100644
--- a/test/parallel/test-stream-push-order.js
+++ b/test/parallel/test-stream-push-order.js
@@ -47,6 +47,6 @@ s.read(0);
// ACTUALLY [1, 3, 5, 6, 4, 2]
process.on('exit', function() {
- assert.deepStrictEqual(s._readableState.buffer.join(','), '1,2,3,4,5,6');
+ assert.deepStrictEqual(s.readableBuffer.join(','), '1,2,3,4,5,6');
console.log('ok');
});
diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js
index bee3a1c82a..e31d2dd921 100644
--- a/test/parallel/test-stream-readable-reading-readingMore.js
+++ b/test/parallel/test-stream-readable-reading-readingMore.js
@@ -15,7 +15,7 @@ assert.strictEqual(state.readingMore, false);
readable.on('data', common.mustCall((data) => {
// while in a flowing state, should try to read more.
- if (state.flowing)
+ if (readable.readableFlowing)
assert.strictEqual(state.readingMore, true);
// reading as long as we've not ended
diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js
index 819b088e27..16a0523994 100644
--- a/test/parallel/test-stream2-transform.js
+++ b/test/parallel/test-stream2-transform.js
@@ -46,7 +46,7 @@ const Transform = require('_stream_transform');
assert.strictEqual(tx._readableState.length, 10);
assert.strictEqual(transformed, 10);
assert.strictEqual(tx._transformState.writechunk.length, 5);
- assert.deepStrictEqual(tx._writableState.getBuffer().map(function(c) {
+ assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);
}
diff --git a/test/parallel/test-stream2-unpipe-leak.js b/test/parallel/test-stream2-unpipe-leak.js
index cc331d5821..5c19be061f 100644
--- a/test/parallel/test-stream2-unpipe-leak.js
+++ b/test/parallel/test-stream2-unpipe-leak.js
@@ -66,7 +66,7 @@ assert.strictEqual(dest.listeners('finish').length, 0);
console.error(src._readableState);
process.on('exit', function() {
- src._readableState.buffer.length = 0;
+ src.readableBuffer.length = 0;
console.error(src._readableState);
assert(src._readableState.length >= src.readableHighWaterMark);
console.log('ok');
diff --git a/test/parallel/test-stream3-pause-then-read.js b/test/parallel/test-stream3-pause-then-read.js
index d75fe69708..f7bfadaf9d 100644
--- a/test/parallel/test-stream3-pause-then-read.js
+++ b/test/parallel/test-stream3-pause-then-read.js
@@ -68,7 +68,7 @@ function readn(n, then) {
r.once('readable', read);
else {
assert.strictEqual(c.length, n);
- assert(!r._readableState.flowing);
+ assert(!r.readableFlowing);
then();
}
})();