Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/doc/api
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2021-06-17 23:25:34 +0300
committerRobert Nagy <ronagy@icloud.com>2021-08-25 18:44:28 +0300
commitc04d621eccfa1a8e65188ca6921ecd1b3b0a934c (patch)
tree802cda8be07e519071452e5f6451ce5c8a89680d /doc/api
parent2acd866990d8e42acc0f82e85693e52fe3adc350 (diff)
stream: add signal support to pipeline generators
Generators in pipeline must be able to be aborted or pipeline can deadlock. PR-URL: https://github.com/nodejs/node/pull/39067 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'doc/api')
-rw-r--r--doc/api/stream.md51
1 files changed, 44 insertions, 7 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 67cda03d69a..0d93e5492c9 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -1870,16 +1870,14 @@ const { pipeline } = require('stream/promises');
async function run() {
const ac = new AbortController();
- const options = {
- signal: ac.signal,
- };
+ const signal = ac.signal;
setTimeout(() => ac.abort(), 1);
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
- options,
+ { signal },
);
}
@@ -1895,10 +1893,10 @@ const fs = require('fs');
async function run() {
await pipeline(
fs.createReadStream('lowercase.txt'),
- async function* (source) {
+ async function* (source, signal) {
source.setEncoding('utf8'); // Work with strings rather than `Buffer`s.
for await (const chunk of source) {
- yield chunk.toUpperCase();
+ yield await processChunk(chunk, { signal });
}
},
fs.createWriteStream('uppercase.txt')
@@ -1909,6 +1907,28 @@ async function run() {
run().catch(console.error);
```
+Remember to handle the `signal` argument passed into the async generator.
+Especially in the case where the async generator is the source for the
+pipeline (i.e. first argument) or the pipeline will never complete.
+
+```js
+const { pipeline } = require('stream/promises');
+const fs = require('fs');
+
+async function run() {
+ await pipeline(
+ async function * (signal) {
+ await someLongRunningfn({ signal });
+ yield 'asd';
+ },
+ fs.createWriteStream('uppercase.txt')
+ );
+ console.log('Pipeline succeeded.');
+}
+
+run().catch(console.error);
+```
+
`stream.pipeline()` will call `stream.destroy(err)` on all streams except:
* `Readable` streams which have emitted `'end'` or `'close'`.
* `Writable` streams which have emitted `'finish'` or `'close'`.
@@ -3407,13 +3427,20 @@ the `Readable.from()` utility method:
```js
const { Readable } = require('stream');
+const ac = new AbortController();
+const signal = ac.signal;
+
async function * generate() {
yield 'a';
+ await someLongRunningFn({ signal });
yield 'b';
yield 'c';
}
const readable = Readable.from(generate());
+readable.on('close', () => {
+ ac.abort();
+});
readable.on('data', (chunk) => {
console.log(chunk);
@@ -3433,6 +3460,11 @@ const { pipeline: pipelinePromise } = require('stream/promises');
const writable = fs.createWriteStream('./file');
+const ac = new AbortController();
+const signal = ac.signal;
+
+const iterator = createIterator({ signal });
+
// Callback Pattern
pipeline(iterator, writable, (err, value) => {
if (err) {
@@ -3440,6 +3472,8 @@ pipeline(iterator, writable, (err, value) => {
} else {
console.log(value, 'value returned');
}
+}).on('close', () => {
+ ac.abort();
});
// Promise Pattern
@@ -3447,7 +3481,10 @@ pipelinePromise(iterator, writable)
.then((value) => {
console.log(value, 'value returned');
})
- .catch(console.error);
+ .catch((err) => {
+ console.error(err);
+ ac.abort();
+ });
```
<!--type=misc-->