Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorayanamist <contact@ayanamist.com>2014-01-03 15:37:16 +0400
committerFedor Indutny <fedor.indutny@gmail.com>2014-01-05 19:44:45 +0400
commitb922b5e90d2c14dd332b95827c2533e083df7e55 (patch)
treeb3bb6326380159798bf9af5a2f4cc790ad91b8e7
parentaa56d9d35405a56b8478c0a83106ffa4ada903db (diff)
stream: writes may return false but forget to emit drain
If a write is above the highWaterMark, _write still manages to fully send it synchronously, _writableState.length will be adjusted down to 0 synchronously with the write returning false, but 'drain' will not be emitted until process.nextTick. If another small write which is below highWaterMark is issued before process.nextTick happens, _writableState.needDrain will be reset to false, and the drain event will never be fired. So we should check needDrain before setting it up, which prevents it from inproperly resetting to false.
-rw-r--r--lib/_stream_writable.js4
-rw-r--r--test/simple/test-stream-big-packet.js73
2 files changed, 76 insertions, 1 deletions
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 601f5b713f9..403cb7b4773 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -203,7 +203,9 @@ function writeOrBuffer(stream, state, chunk, encoding, cb) {
state.length += len;
var ret = state.length < state.highWaterMark;
- state.needDrain = !ret;
+ // we must ensure that previous needDrain will not be reset to false.
+ if (!ret)
+ state.needDrain = true;
if (state.writing)
state.buffer.push(new WriteReq(chunk, encoding, cb));
diff --git a/test/simple/test-stream-big-packet.js b/test/simple/test-stream-big-packet.js
new file mode 100644
index 00000000000..9ec29ca0dcc
--- /dev/null
+++ b/test/simple/test-stream-big-packet.js
@@ -0,0 +1,73 @@
+// Copyright Joyent, Inc. and other Node contributors.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a
+// copy of this software and associated documentation files (the
+// "Software"), to deal in the Software without restriction, including
+// without limitation the rights to use, copy, modify, merge, publish,
+// distribute, sublicense, and/or sell copies of the Software, and to permit
+// persons to whom the Software is furnished to do so, subject to the
+// following conditions:
+//
+// The above copyright notice and this permission notice shall be included
+// in all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
+// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
+// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
+// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
+// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
+// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
+// USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+var common = require('../common');
+var assert = require('assert');
+var util = require('util');
+var stream = require('stream');
+
+var passed = false;
+
+function PassThrough () {
+ stream.Transform.call(this);
+};
+util.inherits(PassThrough, stream.Transform);
+PassThrough.prototype._transform = function (chunk, encoding, done) {
+ this.push(chunk);
+ done();
+};
+
+function TestStream () {
+ stream.Transform.call(this);
+};
+util.inherits(TestStream, stream.Transform);
+TestStream.prototype._transform = function (chunk, encoding, done) {
+ if (!passed) {
+ // Char 'a' only exists in the last write
+ passed = chunk.toString().indexOf('a') >= 0;
+ }
+ done();
+};
+
+var s1 = new PassThrough();
+var s2 = new PassThrough();
+var s3 = new TestStream();
+s1.pipe(s3);
+// Don't let s2 auto close which may close s3
+s2.pipe(s3, {end: false});
+
+// We must write a buffer larger than highWaterMark
+var big = new Buffer(s1._writableState.highWaterMark + 1);
+big.fill('x');
+
+// Since big is larger than highWaterMark, it will be buffered internally.
+assert(!s1.write(big));
+// 'tiny' is small enough to pass through internal buffer.
+assert(s2.write('tiny'));
+
+// Write some small data in next IO loop, which will never be written to s3
+// Because 'drain' event is not emitted from s1 and s1 is still paused
+setImmediate(s1.write.bind(s1), 'later');
+
+// Assert after two IO loops when all operations have been done.
+process.on('exit', function () {
+ assert(passed, 'Large buffer is not handled properly by Writable Stream');
+});