summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--benchmark/http/upgrade.js53
-rw-r--r--doc/api/stream.md22
-rw-r--r--lib/_http_client.js4
-rw-r--r--lib/_http_server.js4
-rw-r--r--lib/_stream_duplex.js10
-rw-r--r--lib/_stream_readable.js25
-rw-r--r--lib/_stream_writable.js10
-rw-r--r--lib/net.js4
-rw-r--r--test/parallel/test-stream-push-order.js2
-rw-r--r--test/parallel/test-stream-readable-reading-readingMore.js2
-rw-r--r--test/parallel/test-stream2-transform.js2
-rw-r--r--test/parallel/test-stream2-unpipe-leak.js2
-rw-r--r--test/parallel/test-stream3-pause-then-read.js2
13 files changed, 118 insertions, 24 deletions
diff --git a/benchmark/http/upgrade.js b/benchmark/http/upgrade.js
new file mode 100644
index 0000000000..0feaecc8ff
--- /dev/null
+++ b/benchmark/http/upgrade.js
@@ -0,0 +1,53 @@
+'use strict';
+
+const common = require('../common.js');
+const PORT = common.PORT;
+const net = require('net');
+
+const bench = common.createBenchmark(main, {
+ n: [5, 1000]
+});
+
+const reqData = 'GET / HTTP/1.1\r\n' +
+ 'Upgrade: WebSocket\r\n' +
+ 'Connection: Upgrade\r\n' +
+ '\r\n' +
+ 'WjN}|M(6';
+
+const resData = 'HTTP/1.1 101 Web Socket Protocol Handshake\r\n' +
+ 'Upgrade: WebSocket\r\n' +
+ 'Connection: Upgrade\r\n' +
+ '\r\n\r\n';
+
+function main({ n }) {
+ process.env.PORT = PORT;
+ var server = require('../fixtures/simple-http-server.js')
+ .listen(common.PORT)
+ .on('listening', function() {
+ bench.start();
+ doBench(server.address(), n, function() {
+ bench.end(n);
+ server.close();
+ });
+ })
+ .on('upgrade', function(req, socket, upgradeHead) {
+ socket.resume();
+ socket.write(resData);
+ socket.end();
+ });
+}
+
+function doBench(address, count, done) {
+ if (count === 0) {
+ done();
+ return;
+ }
+
+ const conn = net.createConnection(address.port);
+ conn.write(reqData);
+ conn.resume();
+
+ conn.on('end', function() {
+ doBench(address, count - 1, done);
+ });
+}
diff --git a/doc/api/stream.md b/doc/api/stream.md
index f3780ee4c4..dc74df7720 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -63,8 +63,8 @@ object mode is not safe.
<!--type=misc-->
Both [Writable][] and [Readable][] streams will store data in an internal
-buffer that can be retrieved using `writable._writableState.getBuffer()` or
-`readable._readableState.buffer`, respectively.
+buffer that can be retrieved using `writable.writableBuffer` or
+`readable.readableBuffer`, respectively.
The amount of data potentially buffered depends on the `highWaterMark` option
passed into the streams constructor. For normal streams, the `highWaterMark`
@@ -602,22 +602,22 @@ Readable stream implementation.
Specifically, at any given point in time, every Readable is in one of three
possible states:
-* `readable._readableState.flowing = null`
-* `readable._readableState.flowing = false`
-* `readable._readableState.flowing = true`
+* `readable.readableFlowing = null`
+* `readable.readableFlowing = false`
+* `readable.readableFlowing = true`
-When `readable._readableState.flowing` is `null`, no mechanism for consuming the
+When `readable.readableFlowing` is `null`, no mechanism for consuming the
streams data is provided so the stream will not generate its data. While in this
state, attaching a listener for the `'data'` event, calling the `readable.pipe()`
method, or calling the `readable.resume()` method will switch
-`readable._readableState.flowing` to `true`, causing the Readable to begin
+`readable.readableFlowing` to `true`, causing the Readable to begin
actively emitting events as data is generated.
Calling `readable.pause()`, `readable.unpipe()`, or receiving "back pressure"
-will cause the `readable._readableState.flowing` to be set as `false`,
+will cause the `readable.readableFlowing` to be set as `false`,
temporarily halting the flowing of events but *not* halting the generation of
data. While in this state, attaching a listener for the `'data'` event
-would not cause `readable._readableState.flowing` to switch to `true`.
+would not cause `readable.readableFlowing` to switch to `true`.
```js
const { PassThrough, Writable } = require('stream');
@@ -626,14 +626,14 @@ const writable = new Writable();
pass.pipe(writable);
pass.unpipe(writable);
-// flowing is now false
+// readableFlowing is now false
pass.on('data', (chunk) => { console.log(chunk.toString()); });
pass.write('ok'); // will not emit 'data'
pass.resume(); // must be called to make 'data' being emitted
```
-While `readable._readableState.flowing` is `false`, data may be accumulating
+While `readable.readableFlowing` is `false`, data may be accumulating
within the streams internal buffer.
#### Choose One
diff --git a/lib/_http_client.js b/lib/_http_client.js
index bdda708493..d9a2d10ae2 100644
--- a/lib/_http_client.js
+++ b/lib/_http_client.js
@@ -432,9 +432,7 @@ function socketOnData(d) {
socket.removeListener('close', socketCloseListener);
socket.removeListener('error', socketErrorListener);
- // TODO(isaacs): Need a way to reset a stream to fresh state
- // IE, not flowing, and not explicitly paused.
- socket._readableState.flowing = null;
+ socket.readableFlowing = null;
req.emit(eventName, res, socket, bodyHead);
req.emit('close');
diff --git a/lib/_http_server.js b/lib/_http_server.js
index a1cbb63c1d..cab4824501 100644
--- a/lib/_http_server.js
+++ b/lib/_http_server.js
@@ -502,9 +502,7 @@ function onParserExecuteCommon(server, socket, parser, state, ret, d) {
debug('SERVER have listener for %s', eventName);
var bodyHead = d.slice(bytesParsed, d.length);
- // TODO(isaacs): Need a way to reset a stream to fresh state
- // IE, not flowing, and not explicitly paused.
- socket._readableState.flowing = null;
+ socket.readableFlowing = null;
server.emit(eventName, req, socket, bodyHead);
} else {
// Got upgrade header or CONNECT method, but have no handler.
diff --git a/lib/_stream_duplex.js b/lib/_stream_duplex.js
index 05f6493408..e99d246396 100644
--- a/lib/_stream_duplex.js
+++ b/lib/_stream_duplex.js
@@ -74,6 +74,16 @@ Object.defineProperty(Duplex.prototype, 'writableHighWaterMark', {
}
});
+Object.defineProperty(Duplex.prototype, 'writableBuffer', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+});
+
// the no-half-open enforcer
function onend() {
// if we allow half-open state, or if the writable side ended,
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 01d886e6ca..2653d3835d 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -925,6 +925,31 @@ Object.defineProperty(Readable.prototype, 'readableHighWaterMark', {
}
});
+Object.defineProperty(Readable.prototype, 'readableBuffer', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function() {
+ return this._readableState && this._readableState.buffer;
+ }
+});
+
+Object.defineProperty(Readable.prototype, 'readableFlowing', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function() {
+ return this._readableState.flowing;
+ },
+ set: function(state) {
+ if (this._readableState) {
+ this._readableState.flowing = state;
+ }
+ }
+});
+
// exposed for testing purposes only.
Readable._fromList = fromList;
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index f5f05486d2..97fbebdd66 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -324,6 +324,16 @@ Writable.prototype.setDefaultEncoding = function setDefaultEncoding(encoding) {
return this;
};
+Object.defineProperty(Writable.prototype, 'writableBuffer', {
+ // making it explicit this property is not enumerable
+ // because otherwise some prototype manipulation in
+ // userland will fail
+ enumerable: false,
+ get: function() {
+ return this._writableState && this._writableState.getBuffer();
+ }
+});
+
function decodeChunk(state, chunk, encoding) {
if (!state.objectMode &&
state.decodeStrings !== false &&
diff --git a/lib/net.js b/lib/net.js
index 5562db6ed6..aa27e70050 100644
--- a/lib/net.js
+++ b/lib/net.js
@@ -254,7 +254,7 @@ function Socket(options) {
// stop the handle from reading and pause the stream
this._handle.reading = false;
this._handle.readStop();
- this._readableState.flowing = false;
+ this.readableFlowing = false;
} else if (!options.manualStart) {
this.read(0);
}
@@ -819,7 +819,7 @@ protoGetter('bytesWritten', function bytesWritten() {
if (!state)
return undefined;
- state.getBuffer().forEach(function(el) {
+ this.writableBuffer.forEach(function(el) {
if (el.chunk instanceof Buffer)
bytes += el.chunk.length;
else
diff --git a/test/parallel/test-stream-push-order.js b/test/parallel/test-stream-push-order.js
index be2db9f44a..ce4f336b02 100644
--- a/test/parallel/test-stream-push-order.js
+++ b/test/parallel/test-stream-push-order.js
@@ -47,6 +47,6 @@ s.read(0);
// ACTUALLY [1, 3, 5, 6, 4, 2]
process.on('exit', function() {
- assert.deepStrictEqual(s._readableState.buffer.join(','), '1,2,3,4,5,6');
+ assert.deepStrictEqual(s.readableBuffer.join(','), '1,2,3,4,5,6');
console.log('ok');
});
diff --git a/test/parallel/test-stream-readable-reading-readingMore.js b/test/parallel/test-stream-readable-reading-readingMore.js
index bee3a1c82a..e31d2dd921 100644
--- a/test/parallel/test-stream-readable-reading-readingMore.js
+++ b/test/parallel/test-stream-readable-reading-readingMore.js
@@ -15,7 +15,7 @@ assert.strictEqual(state.readingMore, false);
readable.on('data', common.mustCall((data) => {
// while in a flowing state, should try to read more.
- if (state.flowing)
+ if (readable.readableFlowing)
assert.strictEqual(state.readingMore, true);
// reading as long as we've not ended
diff --git a/test/parallel/test-stream2-transform.js b/test/parallel/test-stream2-transform.js
index 819b088e27..16a0523994 100644
--- a/test/parallel/test-stream2-transform.js
+++ b/test/parallel/test-stream2-transform.js
@@ -46,7 +46,7 @@ const Transform = require('_stream_transform');
assert.strictEqual(tx._readableState.length, 10);
assert.strictEqual(transformed, 10);
assert.strictEqual(tx._transformState.writechunk.length, 5);
- assert.deepStrictEqual(tx._writableState.getBuffer().map(function(c) {
+ assert.deepStrictEqual(tx.writableBuffer.map(function(c) {
return c.chunk.length;
}), [6, 7, 8, 9, 10]);
}
diff --git a/test/parallel/test-stream2-unpipe-leak.js b/test/parallel/test-stream2-unpipe-leak.js
index cc331d5821..5c19be061f 100644
--- a/test/parallel/test-stream2-unpipe-leak.js
+++ b/test/parallel/test-stream2-unpipe-leak.js
@@ -66,7 +66,7 @@ assert.strictEqual(dest.listeners('finish').length, 0);
console.error(src._readableState);
process.on('exit', function() {
- src._readableState.buffer.length = 0;
+ src.readableBuffer.length = 0;
console.error(src._readableState);
assert(src._readableState.length >= src.readableHighWaterMark);
console.log('ok');
diff --git a/test/parallel/test-stream3-pause-then-read.js b/test/parallel/test-stream3-pause-then-read.js
index d75fe69708..f7bfadaf9d 100644
--- a/test/parallel/test-stream3-pause-then-read.js
+++ b/test/parallel/test-stream3-pause-then-read.js
@@ -68,7 +68,7 @@ function readn(n, then) {
r.once('readable', read);
else {
assert.strictEqual(c.length, n);
- assert(!r._readableState.flowing);
+ assert(!r.readableFlowing);
then();
}
})();