Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2020-06-24 00:08:14 +0300
committerRichard Lau <riclau@uk.ibm.com>2020-09-07 19:30:02 +0300
commit4bb40078da9ff51372459ff187b74866d73c3fb2 (patch)
treeb1d998bd68bc37f2d1bd0ff53e509d3538aa9f11
parentffae5f3809c99f8356006fadb5cbe9e57bd260a4 (diff)
stream: simpler and faster Readable async iterator
Reimplement as an async generator instead of a custom iterator class. Backport-PR-URL: https://github.com/nodejs/node/pull/34887 PR-URL: https://github.com/nodejs/node/pull/34035 Refs: https://github.com/nodejs/node/issues/34680 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
-rw-r--r--benchmark/streams/readable-async-iterator.js38
-rw-r--r--lib/_stream_readable.js66
-rw-r--r--lib/internal/streams/async_iterator.js221
-rw-r--r--lib/internal/streams/pipeline.js10
-rw-r--r--node.gyp1
-rw-r--r--test/parallel/test-readline-async-iterators-destroy.js1
-rw-r--r--test/parallel/test-stream-readable-async-iterators.js53
7 files changed, 121 insertions, 269 deletions
diff --git a/benchmark/streams/readable-async-iterator.js b/benchmark/streams/readable-async-iterator.js
new file mode 100644
index 00000000000..dbe335ab98b
--- /dev/null
+++ b/benchmark/streams/readable-async-iterator.js
@@ -0,0 +1,38 @@
+'use strict';
+
+const common = require('../common');
+const Readable = require('stream').Readable;
+
+const bench = common.createBenchmark(main, {
+ n: [1e5],
+ sync: ['yes', 'no'],
+});
+
+async function main({ n, sync }) {
+ sync = sync === 'yes';
+
+ const s = new Readable({
+ objectMode: true,
+ read() {
+ if (sync) {
+ this.push(1);
+ } else {
+ process.nextTick(() => {
+ this.push(1);
+ });
+ }
+ }
+ });
+
+ bench.start();
+
+ let x = 0;
+ for await (const chunk of s) {
+ x += chunk;
+ if (x > n) {
+ break;
+ }
+ }
+
+ bench.end(n);
+}
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index af878619867..a8e104588f1 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -27,6 +27,7 @@ const {
NumberIsNaN,
ObjectDefineProperties,
ObjectSetPrototypeOf,
+ Promise,
Set,
SymbolAsyncIterator,
Symbol
@@ -59,11 +60,11 @@ const kPaused = Symbol('kPaused');
// Lazy loaded to improve the startup performance.
let StringDecoder;
-let createReadableStreamAsyncIterator;
let from;
ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
+function nop() {}
const { errorOrDestroy } = destroyImpl;
@@ -1075,13 +1076,68 @@ Readable.prototype.wrap = function(stream) {
};
Readable.prototype[SymbolAsyncIterator] = function() {
- if (createReadableStreamAsyncIterator === undefined) {
- createReadableStreamAsyncIterator =
- require('internal/streams/async_iterator');
+ let stream = this;
+
+ if (typeof stream.read !== 'function') {
+ // v1 stream
+ const src = stream;
+ stream = new Readable({
+ objectMode: true,
+ destroy(err, callback) {
+ destroyImpl.destroyer(src, err);
+ callback();
+ }
+ }).wrap(src);
}
- return createReadableStreamAsyncIterator(this);
+
+ const iter = createAsyncIterator(stream);
+ iter.stream = stream;
+ return iter;
};
+async function* createAsyncIterator(stream) {
+ let callback = nop;
+
+ function next(resolve) {
+ if (this === stream) {
+ callback();
+ callback = nop;
+ } else {
+ callback = resolve;
+ }
+ }
+
+ stream
+ .on('readable', next)
+ .on('error', next)
+ .on('end', next)
+ .on('close', next);
+
+ try {
+ const state = stream._readableState;
+ while (true) {
+ const chunk = stream.read();
+ if (chunk !== null) {
+ yield chunk;
+ } else if (state.errored) {
+ throw state.errored;
+ } else if (state.ended) {
+ break;
+ } else if (state.closed) {
+ // TODO(ronag): ERR_PREMATURE_CLOSE?
+ break;
+ } else {
+ await new Promise(next);
+ }
+ }
+ } catch (err) {
+ destroyImpl.destroyer(stream, err);
+ throw err;
+ } finally {
+ destroyImpl.destroyer(stream, null);
+ }
+}
+
// Making it explicit these properties are not enumerable
// because otherwise some prototype manipulation in
// userland will fail.
diff --git a/lib/internal/streams/async_iterator.js b/lib/internal/streams/async_iterator.js
deleted file mode 100644
index 87b7f227d59..00000000000
--- a/lib/internal/streams/async_iterator.js
+++ /dev/null
@@ -1,221 +0,0 @@
-'use strict';
-
-const {
- ObjectCreate,
- ObjectGetPrototypeOf,
- ObjectSetPrototypeOf,
- Promise,
- PromiseReject,
- PromiseResolve,
- Symbol,
-} = primordials;
-
-const finished = require('internal/streams/end-of-stream');
-const destroyImpl = require('internal/streams/destroy');
-
-const kLastResolve = Symbol('lastResolve');
-const kLastReject = Symbol('lastReject');
-const kError = Symbol('error');
-const kEnded = Symbol('ended');
-const kLastPromise = Symbol('lastPromise');
-const kHandlePromise = Symbol('handlePromise');
-const kStream = Symbol('stream');
-
-let Readable;
-
-function createIterResult(value, done) {
- return { value, done };
-}
-
-function readAndResolve(iter) {
- const resolve = iter[kLastResolve];
- if (resolve !== null) {
- const data = iter[kStream].read();
- // We defer if data is null. We can be expecting either 'end' or 'error'.
- if (data !== null) {
- iter[kLastPromise] = null;
- iter[kLastResolve] = null;
- iter[kLastReject] = null;
- resolve(createIterResult(data, false));
- }
- }
-}
-
-function onReadable(iter) {
- // We wait for the next tick, because it might
- // emit an error with `process.nextTick()`.
- process.nextTick(readAndResolve, iter);
-}
-
-function wrapForNext(lastPromise, iter) {
- return (resolve, reject) => {
- lastPromise.then(() => {
- if (iter[kEnded]) {
- resolve(createIterResult(undefined, true));
- return;
- }
-
- iter[kHandlePromise](resolve, reject);
- }, reject);
- };
-}
-
-const AsyncIteratorPrototype = ObjectGetPrototypeOf(
- ObjectGetPrototypeOf(async function* () {}).prototype);
-
-function finish(self, err) {
- return new Promise((resolve, reject) => {
- const stream = self[kStream];
-
- finished(stream, (err) => {
- if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
- reject(err);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- destroyImpl.destroyer(stream, err);
- });
-}
-
-const ReadableStreamAsyncIteratorPrototype = ObjectSetPrototypeOf({
- get stream() {
- return this[kStream];
- },
-
- next() {
- // If we have detected an error in the meanwhile
- // reject straight away.
- const error = this[kError];
- if (error !== null) {
- return PromiseReject(error);
- }
-
- if (this[kEnded]) {
- return PromiseResolve(createIterResult(undefined, true));
- }
-
- if (this[kStream].destroyed) {
- return new Promise((resolve, reject) => {
- if (this[kError]) {
- reject(this[kError]);
- } else if (this[kEnded]) {
- resolve(createIterResult(undefined, true));
- } else {
- finished(this[kStream], (err) => {
- if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
- reject(err);
- } else {
- resolve(createIterResult(undefined, true));
- }
- });
- }
- });
- }
-
- // If we have multiple next() calls we will wait for the previous Promise to
- // finish. This logic is optimized to support for await loops, where next()
- // is only called once at a time.
- const lastPromise = this[kLastPromise];
- let promise;
-
- if (lastPromise) {
- promise = new Promise(wrapForNext(lastPromise, this));
- } else {
- // Fast path needed to support multiple this.push()
- // without triggering the next() queue.
- const data = this[kStream].read();
- if (data !== null) {
- return PromiseResolve(createIterResult(data, false));
- }
-
- promise = new Promise(this[kHandlePromise]);
- }
-
- this[kLastPromise] = promise;
-
- return promise;
- },
-
- return() {
- return finish(this);
- },
-
- throw(err) {
- return finish(this, err);
- },
-}, AsyncIteratorPrototype);
-
-const createReadableStreamAsyncIterator = (stream) => {
- if (typeof stream.read !== 'function') {
- // v1 stream
-
- if (!Readable) {
- Readable = require('_stream_readable');
- }
-
- const src = stream;
- stream = new Readable({ objectMode: true }).wrap(src);
- finished(stream, (err) => destroyImpl.destroyer(src, err));
- }
-
- const iterator = ObjectCreate(ReadableStreamAsyncIteratorPrototype, {
- [kStream]: { value: stream, writable: true },
- [kLastResolve]: { value: null, writable: true },
- [kLastReject]: { value: null, writable: true },
- [kError]: { value: null, writable: true },
- [kEnded]: {
- value: stream.readableEnded || stream._readableState.endEmitted,
- writable: true
- },
- // The function passed to new Promise is cached so we avoid allocating a new
- // closure at every run.
- [kHandlePromise]: {
- value: (resolve, reject) => {
- const data = iterator[kStream].read();
- if (data) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- resolve(createIterResult(data, false));
- } else {
- iterator[kLastResolve] = resolve;
- iterator[kLastReject] = reject;
- }
- },
- writable: true,
- },
- });
- iterator[kLastPromise] = null;
-
- finished(stream, { writable: false }, (err) => {
- if (err && err.code !== 'ERR_STREAM_PREMATURE_CLOSE') {
- const reject = iterator[kLastReject];
- // Reject if we are waiting for data in the Promise returned by next() and
- // store the error.
- if (reject !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- reject(err);
- }
- iterator[kError] = err;
- return;
- }
-
- const resolve = iterator[kLastResolve];
- if (resolve !== null) {
- iterator[kLastPromise] = null;
- iterator[kLastResolve] = null;
- iterator[kLastReject] = null;
- resolve(createIterResult(undefined, true));
- }
- iterator[kEnded] = true;
- });
-
- stream.on('readable', onReadable.bind(null, iterator));
-
- return iterator;
-};
-
-module.exports = createReadableStreamAsyncIterator;
diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index 4786b906f43..aaee0d24992 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -23,7 +23,7 @@ const {
let EE;
let PassThrough;
-let createReadableStreamAsyncIterator;
+let Readable;
function destroyer(stream, reading, writing, callback) {
callback = once(callback);
@@ -113,11 +113,11 @@ function makeAsyncIterable(val) {
}
async function* fromReadable(val) {
- if (!createReadableStreamAsyncIterator) {
- createReadableStreamAsyncIterator =
- require('internal/streams/async_iterator');
+ if (!Readable) {
+ Readable = require('_stream_readable');
}
- yield* createReadableStreamAsyncIterator(val);
+
+ yield* Readable.prototype[SymbolAsyncIterator].call(val);
}
async function pump(iterable, writable, finish) {
diff --git a/node.gyp b/node.gyp
index 88942393ff3..dd832ffc2ad 100644
--- a/node.gyp
+++ b/node.gyp
@@ -222,7 +222,6 @@
'lib/internal/worker/js_transferable.js',
'lib/internal/watchdog.js',
'lib/internal/streams/lazy_transform.js',
- 'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
'lib/internal/streams/duplexpair.js',
'lib/internal/streams/from.js',
diff --git a/test/parallel/test-readline-async-iterators-destroy.js b/test/parallel/test-readline-async-iterators-destroy.js
index e96f831432b..7c14a667062 100644
--- a/test/parallel/test-readline-async-iterators-destroy.js
+++ b/test/parallel/test-readline-async-iterators-destroy.js
@@ -75,6 +75,7 @@ async function testMutualDestroy() {
break;
}
assert.deepStrictEqual(iteratedLines, expectedLines);
+ break;
}
assert.deepStrictEqual(iteratedLines, expectedLines);
diff --git a/test/parallel/test-stream-readable-async-iterators.js b/test/parallel/test-stream-readable-async-iterators.js
index 55d16a1c5d3..604ba3afb47 100644
--- a/test/parallel/test-stream-readable-async-iterators.js
+++ b/test/parallel/test-stream-readable-async-iterators.js
@@ -12,17 +12,6 @@ const assert = require('assert');
async function tests() {
{
- const AsyncIteratorPrototype = Object.getPrototypeOf(
- Object.getPrototypeOf(async function* () {}).prototype);
- const rs = new Readable({
- read() {}
- });
- assert.strictEqual(
- Object.getPrototypeOf(Object.getPrototypeOf(rs[Symbol.asyncIterator]())),
- AsyncIteratorPrototype);
- }
-
- {
// v1 stream
const stream = new Stream();
@@ -53,7 +42,9 @@ async function tests() {
});
const iter = Readable.prototype[Symbol.asyncIterator].call(stream);
- iter.next().catch(common.mustCall((err) => {
+ await iter.next();
+ await iter.next();
+ await iter.next().catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'asd');
}));
}
@@ -189,12 +180,19 @@ async function tests() {
resolved.forEach(common.mustCall(
(item, i) => assert.strictEqual(item.value, 'hello-' + i), max));
- errors.forEach((promise) => {
+ errors.slice(0, 1).forEach((promise) => {
promise.catch(common.mustCall((err) => {
assert.strictEqual(err.message, 'kaboom');
}));
});
+ errors.slice(1).forEach((promise) => {
+ promise.then(common.mustCall(({ done, value }) => {
+ assert.strictEqual(done, true);
+ assert.strictEqual(value, undefined);
+ }));
+ });
+
readable.destroy(new Error('kaboom'));
}
@@ -285,28 +283,6 @@ async function tests() {
}
{
- // Iterator throw.
-
- const readable = new Readable({
- objectMode: true,
- read() {
- this.push('hello');
- }
- });
-
- readable.on('error', common.mustCall((err) => {
- assert.strictEqual(err.message, 'kaboom');
- }));
-
- const it = readable[Symbol.asyncIterator]();
- it.throw(new Error('kaboom')).catch(common.mustCall((err) => {
- assert.strictEqual(err.message, 'kaboom');
- }));
-
- assert.strictEqual(readable.destroyed, true);
- }
-
- {
console.log('destroyed by throw');
const readable = new Readable({
objectMode: true,
@@ -577,12 +553,15 @@ async function tests() {
assert.strictEqual(e, err);
})(), (async () => {
let e;
+ let x;
try {
- await d;
+ x = await d;
} catch (_e) {
e = _e;
}
- assert.strictEqual(e, err);
+ assert.strictEqual(e, undefined);
+ assert.strictEqual(x.done, true);
+ assert.strictEqual(x.value, undefined);
})()]);
}