Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristopher Luke <zaide@zaidesthings.com>2017-03-16 08:53:35 +0300
committerEvan Lucas <evanlucas@me.com>2017-05-03 04:41:22 +0300
commitf5a702e763b3b044178b1ce4fedc5ee1b9ffe993 (patch)
tree1a1df94d75f5b87246dc8141785985b9ad11b544
parent592db37e2fd6bbbbe4cbe32ce7e8f68c3cfc4160 (diff)
stream: Fixes missing 'unpipe' event
Currently when the destination emits an 'error', 'finish' or 'close' event the pipe calls unpipe to emit 'unpipe' and trigger the clean up of all it's listeners. When the source emits an 'end' event without {end: false} it calls end() on the destination leading it to emit a 'close', this will again lead to the pipe calling unpipe. However the source emitting an 'end' event along side {end: false} is the only time the cleanup gets ran directly without unpipe being called. This fixes that so the 'unpipe' event does get emitted and cleanup in turn gets ran by that event. Fixes: https://github.com/nodejs/node/issues/11837 PR-URL: https://github.com/nodejs/node/pull/11876 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
-rw-r--r--lib/_stream_readable.js4
-rw-r--r--test/parallel/test-stream-unpipe-event.js87
2 files changed, 89 insertions, 2 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 8f11e5db2ab..69845a3f900 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -494,7 +494,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest !== process.stdout &&
dest !== process.stderr;
- var endFn = doEnd ? onend : cleanup;
+ var endFn = doEnd ? onend : unpipe;
if (state.endEmitted)
process.nextTick(endFn);
else
@@ -530,7 +530,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest.removeListener('error', onerror);
dest.removeListener('unpipe', onunpipe);
src.removeListener('end', onend);
- src.removeListener('end', cleanup);
+ src.removeListener('end', unpipe);
src.removeListener('data', ondata);
cleanedUp = true;
diff --git a/test/parallel/test-stream-unpipe-event.js b/test/parallel/test-stream-unpipe-event.js
new file mode 100644
index 00000000000..a9e634a11a3
--- /dev/null
+++ b/test/parallel/test-stream-unpipe-event.js
@@ -0,0 +1,87 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const {Writable, Readable} = require('stream');
+class NullWriteable extends Writable {
+ _write(chunk, encoding, callback) {
+ return callback();
+ }
+}
+class QuickEndReadable extends Readable {
+ _read() {
+ this.push(null);
+ }
+}
+class NeverEndReadable extends Readable {
+ _read() {}
+}
+
+function noop() {}
+
+{
+ const dest = new NullWriteable();
+ const src = new QuickEndReadable();
+ dest.on('pipe', common.mustCall(noop));
+ dest.on('unpipe', common.mustCall(noop));
+ src.pipe(dest);
+ setImmediate(() => {
+ assert.strictEqual(src._readableState.pipesCount, 0);
+ });
+}
+
+{
+ const dest = new NullWriteable();
+ const src = new NeverEndReadable();
+ dest.on('pipe', common.mustCall(noop));
+ dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
+ src.pipe(dest);
+ setImmediate(() => {
+ assert.strictEqual(src._readableState.pipesCount, 1);
+ });
+}
+
+{
+ const dest = new NullWriteable();
+ const src = new NeverEndReadable();
+ dest.on('pipe', common.mustCall(noop));
+ dest.on('unpipe', common.mustCall(noop));
+ src.pipe(dest);
+ src.unpipe(dest);
+ setImmediate(() => {
+ assert.strictEqual(src._readableState.pipesCount, 0);
+ });
+}
+
+{
+ const dest = new NullWriteable();
+ const src = new QuickEndReadable();
+ dest.on('pipe', common.mustCall(noop));
+ dest.on('unpipe', common.mustCall(noop));
+ src.pipe(dest, {end: false});
+ setImmediate(() => {
+ assert.strictEqual(src._readableState.pipesCount, 0);
+ });
+}
+
+{
+ const dest = new NullWriteable();
+ const src = new NeverEndReadable();
+ dest.on('pipe', common.mustCall(noop));
+ dest.on('unpipe', common.mustNotCall('unpipe should not have been emitted'));
+ src.pipe(dest, {end: false});
+ setImmediate(() => {
+ assert.strictEqual(src._readableState.pipesCount, 1);
+ });
+}
+
+{
+ const dest = new NullWriteable();
+ const src = new NeverEndReadable();
+ dest.on('pipe', common.mustCall(noop));
+ dest.on('unpipe', common.mustCall(noop));
+ src.pipe(dest, {end: false});
+ src.unpipe(dest);
+ setImmediate(() => {
+ assert.strictEqual(src._readableState.pipesCount, 0);
+ });
+}