Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2021-10-18 13:08:47 +0300
committerRichard Lau <rlau@redhat.com>2021-10-19 16:51:53 +0300
commit2bfa87edbc6dcda6b4128cb9b8af1696feac53eb (patch)
tree79a21f685b327568d7d457c4a2b981a90dae870c
parent8fdabcb9184a438d86a279936290d81b2ac2c13c (diff)
stream: fix fromAsyncGen
Fixes: https://github.com/nodejs/node/issues/40497 PR-URL: https://github.com/nodejs/node/pull/40499 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
-rw-r--r--lib/internal/streams/duplexify.js14
-rw-r--r--test/parallel/test-stream-duplex-from.js28
2 files changed, 37 insertions, 5 deletions
diff --git a/lib/internal/streams/duplexify.js b/lib/internal/streams/duplexify.js
index fea0e508411..7fd5df9f279 100644
--- a/lib/internal/streams/duplexify.js
+++ b/lib/internal/streams/duplexify.js
@@ -209,22 +209,28 @@ function fromAsyncGen(fn) {
const signal = ac.signal;
const value = fn(async function*() {
while (true) {
- const { chunk, done, cb } = await promise;
+ const _promise = promise;
+ promise = null;
+ const { chunk, done, cb } = await _promise;
process.nextTick(cb);
if (done) return;
if (signal.aborted) throw new AbortError();
- yield chunk;
({ promise, resolve } = createDeferredPromise());
+ yield chunk;
}
}(), { signal });
return {
value,
write(chunk, encoding, cb) {
- resolve({ chunk, done: false, cb });
+ const _resolve = resolve;
+ resolve = null;
+ _resolve({ chunk, done: false, cb });
},
final(cb) {
- resolve({ done: true, cb });
+ const _resolve = resolve;
+ resolve = null;
+ _resolve({ done: true, cb });
},
destroy(err, cb) {
ac.abort();
diff --git a/test/parallel/test-stream-duplex-from.js b/test/parallel/test-stream-duplex-from.js
index 265b61dfd06..446768d6eef 100644
--- a/test/parallel/test-stream-duplex-from.js
+++ b/test/parallel/test-stream-duplex-from.js
@@ -2,7 +2,7 @@
const common = require('../common');
const assert = require('assert');
-const { Duplex, Readable, Writable } = require('stream');
+const { Duplex, Readable, Writable, pipeline } = require('stream');
{
const d = Duplex.from({
@@ -118,3 +118,29 @@ const { Duplex, Readable, Writable } = require('stream');
assert.strictEqual(d.readable, false);
}));
}
+
+{
+ // https://github.com/nodejs/node/issues/40497
+ pipeline(
+ ['abc\ndef\nghi'],
+ Duplex.from(async function * (source) {
+ let rest = '';
+ for await (const chunk of source) {
+ const lines = (rest + chunk.toString()).split('\n');
+ rest = lines.pop();
+ for (const line of lines) {
+ yield line;
+ }
+ }
+ yield rest;
+ }),
+ async function * (source) {
+ let ret = '';
+ for await (const x of source) {
+ ret += x;
+ }
+ assert.strictEqual(ret, 'abcdefghi');
+ },
+ common.mustCall(() => {}),
+ );
+}