aboutsummaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/_http_common.js6
-rw-r--r--lib/_http_incoming.js2
-rw-r--r--lib/_http_server.js32
3 files changed, 35 insertions, 5 deletions
diff --git a/lib/_http_common.js b/lib/_http_common.js
index 297ac8f063..548f349616 100644
--- a/lib/_http_common.js
+++ b/lib/_http_common.js
@@ -157,10 +157,8 @@ function parserOnMessageComplete() {
stream.push(null);
}
- if (parser.socket.readable) {
- // force to read the next incoming message
- readStart(parser.socket);
- }
+ // force to read the next incoming message
+ readStart(parser.socket);
}
diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js
index cc93bfc9f1..2f66400247 100644
--- a/lib/_http_incoming.js
+++ b/lib/_http_incoming.js
@@ -23,7 +23,7 @@ var util = require('util');
var Stream = require('stream');
function readStart(socket) {
- if (socket)
+ if (socket && !socket._paused && socket.readable)
socket.resume();
}
exports.readStart = readStart;
diff --git a/lib/_http_server.js b/lib/_http_server.js
index 15ef5a6387..5a82c94dc8 100644
--- a/lib/_http_server.js
+++ b/lib/_http_server.js
@@ -351,6 +351,7 @@ function connectionListener(socket) {
}
function socketOnData(d) {
+ assert(!socket._paused);
debug('SERVER socketOnData %d', d.length);
var ret = parser.execute(d);
if (ret instanceof Error) {
@@ -382,6 +383,12 @@ function connectionListener(socket) {
socket.destroy();
}
}
+
+ if (socket._paused) {
+ // onIncoming paused the socket, we should pause the parser as well
+ debug('pause parser');
+ socket.parser.pause();
+ }
}
function socketOnEnd() {
@@ -411,9 +418,34 @@ function connectionListener(socket) {
// new message. In this callback we setup the response object and pass it
// to the user.
+ socket._paused = false;
+ function socketOnDrain() {
+ // If we previously paused, then start reading again.
+ if (socket._paused) {
+ socket._paused = false;
+ socket.parser.resume();
+ socket.resume();
+ }
+ }
+ socket.on('drain', socketOnDrain);
+
function parserOnIncoming(req, shouldKeepAlive) {
incoming.push(req);
+ // If the writable end isn't consuming, then stop reading
+ // so that we don't become overwhelmed by a flood of
+ // pipelined requests that may never be resolved.
+ if (!socket._paused) {
+ var needPause = socket._writableState.needDrain;
+ if (needPause) {
+ socket._paused = true;
+ // We also need to pause the parser, but don't do that until after
+ // the call to execute, because we may still be processing the last
+ // chunk.
+ socket.pause();
+ }
+ }
+
var res = new ServerResponse(req);
res.shouldKeepAlive = shouldKeepAlive;