diff options
Diffstat (limited to 'doc/api')
-rw-r--r-- | doc/api/stream.md | 51 |
1 files changed, 44 insertions, 7 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md index 67cda03d69a..0d93e5492c9 100644 --- a/doc/api/stream.md +++ b/doc/api/stream.md @@ -1870,16 +1870,14 @@ const { pipeline } = require('stream/promises'); async function run() { const ac = new AbortController(); - const options = { - signal: ac.signal, - }; + const signal = ac.signal; setTimeout(() => ac.abort(), 1); await pipeline( fs.createReadStream('archive.tar'), zlib.createGzip(), fs.createWriteStream('archive.tar.gz'), - options, + { signal }, ); } @@ -1895,10 +1893,10 @@ const fs = require('fs'); async function run() { await pipeline( fs.createReadStream('lowercase.txt'), - async function* (source) { + async function* (source, signal) { source.setEncoding('utf8'); // Work with strings rather than `Buffer`s. for await (const chunk of source) { - yield chunk.toUpperCase(); + yield await processChunk(chunk, { signal }); } }, fs.createWriteStream('uppercase.txt') @@ -1909,6 +1907,28 @@ async function run() { run().catch(console.error); ``` +Remember to handle the `signal` argument passed into the async generator. +Especially in the case where the async generator is the source for the +pipeline (i.e. first argument) or the pipeline will never complete. + +```js +const { pipeline } = require('stream/promises'); +const fs = require('fs'); + +async function run() { + await pipeline( + async function * (signal) { + await someLongRunningfn({ signal }); + yield 'asd'; + }, + fs.createWriteStream('uppercase.txt') + ); + console.log('Pipeline succeeded.'); +} + +run().catch(console.error); +``` + `stream.pipeline()` will call `stream.destroy(err)` on all streams except: * `Readable` streams which have emitted `'end'` or `'close'`. * `Writable` streams which have emitted `'finish'` or `'close'`. @@ -3407,13 +3427,20 @@ the `Readable.from()` utility method: ```js const { Readable } = require('stream'); +const ac = new AbortController(); +const signal = ac.signal; + async function * generate() { yield 'a'; + await someLongRunningFn({ signal }); yield 'b'; yield 'c'; } const readable = Readable.from(generate()); +readable.on('close', () => { + ac.abort(); +}); readable.on('data', (chunk) => { console.log(chunk); @@ -3433,6 +3460,11 @@ const { pipeline: pipelinePromise } = require('stream/promises'); const writable = fs.createWriteStream('./file'); +const ac = new AbortController(); +const signal = ac.signal; + +const iterator = createIterator({ signal }); + // Callback Pattern pipeline(iterator, writable, (err, value) => { if (err) { @@ -3440,6 +3472,8 @@ pipeline(iterator, writable, (err, value) => { } else { console.log(value, 'value returned'); } +}).on('close', () => { + ac.abort(); }); // Promise Pattern @@ -3447,7 +3481,10 @@ pipelinePromise(iterator, writable) .then((value) => { console.log(value, 'value returned'); }) - .catch(console.error); + .catch((err) => { + console.error(err); + ac.abort(); + }); ``` <!--type=misc--> |