Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2021-06-11 12:15:45 +0300
committerDanielle Adams <adamzdanielle@gmail.com>2021-06-15 05:31:04 +0300
commitc20e28e1a0ee23c9c92b3c8ee27ba8e71977ef25 (patch)
tree8956f07ba7da0a477d0cfc901cb01a140a70728e /lib
parent4c6193fea10e00e75e51cd1b03469cc98e6e2f99 (diff)
stream: fix pipeline pump
Refs: https://github.com/nodejs/node/issues/39005 PR-URL: https://github.com/nodejs/node/pull/39006 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/internal/streams/pipeline.js72
1 files changed, 58 insertions, 14 deletions
diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index f5b6fb90e11..202a8cf9810 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -5,6 +5,7 @@
const {
ArrayIsArray,
+ Promise,
SymbolAsyncIterator,
} = primordials;
@@ -13,21 +14,27 @@ const eos = require('internal/streams/end-of-stream');
const { once } = require('internal/util');
const destroyImpl = require('internal/streams/destroy');
const {
- ERR_INVALID_ARG_TYPE,
- ERR_INVALID_RETURN_VALUE,
- ERR_MISSING_ARGS,
- ERR_STREAM_DESTROYED
-} = require('internal/errors').codes;
+ aggregateTwoErrors,
+ codes: {
+ ERR_INVALID_ARG_TYPE,
+ ERR_INVALID_RETURN_VALUE,
+ ERR_MISSING_ARGS,
+ ERR_STREAM_DESTROYED,
+ ERR_STREAM_PREMATURE_CLOSE,
+ },
+} = require('internal/errors');
const { validateCallback } = require('internal/validators');
+function noop() {}
+
const {
isIterable,
isReadable,
isStream,
} = require('internal/streams/utils');
+const assert = require('internal/assert');
-let EE;
let PassThrough;
let Readable;
@@ -101,25 +108,62 @@ async function* fromReadable(val) {
}
async function pump(iterable, writable, finish) {
- if (!EE) {
- EE = require('events');
- }
let error;
+ let callback = noop;
+ const resume = (err) => {
+ error = aggregateTwoErrors(error, err);
+ const _callback = callback;
+ callback = noop;
+ _callback();
+ };
+ const onClose = () => {
+ resume(new ERR_STREAM_PREMATURE_CLOSE());
+ };
+
+ const waitForDrain = () => new Promise((resolve) => {
+ assert(callback === noop);
+ if (error || writable.destroyed) {
+ resolve();
+ } else {
+ callback = resolve;
+ }
+ });
+
+ writable
+ .on('drain', resume)
+ .on('error', resume)
+ .on('close', onClose);
+
try {
- if (writable.writableNeedDrain === true) {
- await EE.once(writable, 'drain');
+ if (writable.writableNeedDrain) {
+ await waitForDrain();
+ }
+
+ if (error) {
+ return;
}
for await (const chunk of iterable) {
if (!writable.write(chunk)) {
- if (writable.destroyed) return;
- await EE.once(writable, 'drain');
+ await waitForDrain();
+ }
+ if (error) {
+ return;
}
}
+
+ if (error) {
+ return;
+ }
+
writable.end();
} catch (err) {
- error = err;
+ error = aggregateTwoErrors(error, err);
} finally {
+ writable
+ .off('drain', resume)
+ .off('error', resume)
+ .off('close', onClose);
finish(error);
}
}