Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/api/stream.md54
-rw-r--r--lib/_http_client.js12
-rw-r--r--lib/internal/streams/add-abort-signal.js41
-rw-r--r--lib/internal/streams/readable.js1
-rw-r--r--lib/stream.js1
-rw-r--r--node.gyp1
-rw-r--r--test/parallel/test-bootstrap-modules.js1
-rw-r--r--test/parallel/test-stream-pipeline.js32
-rw-r--r--test/parallel/test-stream-readable-destroy.js37
-rw-r--r--test/parallel/test-stream-writable-destroy.js16
10 files changed, 183 insertions, 13 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 6a22446743e..529bf2fa015 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -45,8 +45,8 @@ There are four fundamental stream types within Node.js:
is written and read (for example, [`zlib.createDeflate()`][]).
Additionally, this module includes the utility functions
-[`stream.pipeline()`][], [`stream.finished()`][] and
-[`stream.Readable.from()`][].
+[`stream.pipeline()`][], [`stream.finished()`][], [`stream.Readable.from()`][]
+and [`stream.addAbortSignal()`][].
### Streams Promises API
<!-- YAML
@@ -1799,6 +1799,55 @@ Calling `Readable.from(string)` or `Readable.from(buffer)` will not have
the strings or buffers be iterated to match the other streams semantics
for performance reasons.
+### `stream.addAbortSignal(signal, stream)`
+<!-- YAML
+added: REPLACEME
+-->
+* `signal` {AbortSignal} A signal representing possible cancellation
+* `stream` {Stream} a stream to attach a signal to
+
+Attaches an AbortSignal to a readable or writeable stream. This lets code
+control stream destruction using an `AbortController`.
+
+Calling `abort` on the `AbortController` corresponding to the passed
+`AbortSignal` will behave the same way as calling `.destroy(new AbortError())`
+on the stream.
+
+```js
+const fs = require('fs');
+
+const controller = new AbortController();
+const read = addAbortSignal(
+ controller.signal,
+ fs.createReadStream(('object.json'))
+);
+// Later, abort the operation closing the stream
+controller.abort();
+```
+
+Or using an `AbortSignal` with a readable stream as an async iterable:
+
+```js
+const controller = new AbortController();
+setTimeout(() => controller.abort(), 10_000); // set a timeout
+const stream = addAbortSignal(
+ controller.signal,
+ fs.createReadStream(('object.json'))
+);
+(async () => {
+ try {
+ for await (const chunk of stream) {
+ await process(chunk);
+ }
+ } catch (e) {
+ if (e.name === 'AbortError') {
+ // The operation was cancelled
+ } else {
+ throw e;
+ }
+ }
+})();
+```
## API for stream implementers
<!--type=misc-->
@@ -3123,6 +3172,7 @@ contain multi-byte characters.
[`stream.finished()`]: #stream_stream_finished_stream_options_callback
[`stream.pipe()`]: #stream_readable_pipe_destination_options
[`stream.pipeline()`]: #stream_stream_pipeline_source_transforms_destination_callback
+[`stream.addAbortSignal()`]: #stream_stream_addabortsignal_signal_stream
[`stream.uncork()`]: #stream_writable_uncork
[`stream.unpipe()`]: #stream_readable_unpipe_destination
[`stream.wrap()`]: #stream_readable_wrap_stream
diff --git a/lib/_http_client.js b/lib/_http_client.js
index 75d36f19e6c..6fb5dd65cb3 100644
--- a/lib/_http_client.js
+++ b/lib/_http_client.js
@@ -51,7 +51,7 @@ const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { URL, urlToOptions, searchParamsSymbol } = require('internal/url');
const { kOutHeaders, kNeedDrain } = require('internal/http');
-const { AbortError, connResetException, codes } = require('internal/errors');
+const { connResetException, codes } = require('internal/errors');
const {
ERR_HTTP_HEADERS_SENT,
ERR_INVALID_ARG_TYPE,
@@ -61,7 +61,6 @@ const {
} = codes;
const {
validateInteger,
- validateAbortSignal,
} = require('internal/validators');
const { getTimerDuration } = require('internal/timers');
const {
@@ -69,6 +68,8 @@ const {
DTRACE_HTTP_CLIENT_RESPONSE
} = require('internal/dtrace');
+const { addAbortSignal } = require('stream');
+
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/;
const kError = Symbol('kError');
@@ -174,12 +175,7 @@ function ClientRequest(input, options, cb) {
const signal = options.signal;
if (signal) {
- validateAbortSignal(signal, 'options.signal');
- const listener = (e) => this.destroy(new AbortError());
- signal.addEventListener('abort', listener);
- this.once('close', () => {
- signal.removeEventListener('abort', listener);
- });
+ addAbortSignal(signal, this);
}
let method = options.method;
const methodIsString = (typeof method === 'string');
diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js
new file mode 100644
index 00000000000..27fefe96d21
--- /dev/null
+++ b/lib/internal/streams/add-abort-signal.js
@@ -0,0 +1,41 @@
+'use strict';
+
+const {
+ AbortError,
+ codes,
+} = require('internal/errors');
+
+const eos = require('internal/streams/end-of-stream');
+const { ERR_INVALID_ARG_TYPE } = codes;
+
+// This method is inlined here for readable-stream
+// https://github.com/nodejs/node/pull/36061#discussion_r533718029
+const validateAbortSignal = (signal, name) => {
+ if (signal !== undefined &&
+ (signal === null ||
+ typeof signal !== 'object' ||
+ !('aborted' in signal))) {
+ throw new ERR_INVALID_ARG_TYPE(name, 'AbortSignal', signal);
+ }
+};
+
+function isStream(obj) {
+ return !!(obj && typeof obj.pipe === 'function');
+}
+
+module.exports = function addAbortSignal(signal, stream) {
+ validateAbortSignal(signal, 'signal');
+ if (!isStream(stream)) {
+ throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
+ }
+ const onAbort = () => {
+ stream.destroy(new AbortError());
+ };
+ if (signal.aborted) {
+ onAbort();
+ } else {
+ signal.addEventListener('abort', onAbort);
+ eos(stream, () => signal.removeEventListener('abort', onAbort));
+ }
+ return stream;
+};
diff --git a/lib/internal/streams/readable.js b/lib/internal/streams/readable.js
index 93153908fef..105caa1e151 100644
--- a/lib/internal/streams/readable.js
+++ b/lib/internal/streams/readable.js
@@ -50,6 +50,7 @@ const {
getHighWaterMark,
getDefaultHighWaterMark
} = require('internal/streams/state');
+
const {
ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF,
diff --git a/lib/stream.js b/lib/stream.js
index 11f5ca997fe..fb6cf416cde 100644
--- a/lib/stream.js
+++ b/lib/stream.js
@@ -43,6 +43,7 @@ Stream.Duplex = require('internal/streams/duplex');
Stream.Transform = require('internal/streams/transform');
Stream.PassThrough = require('internal/streams/passthrough');
Stream.pipeline = pipeline;
+Stream.addAbortSignal = require('internal/streams/add-abort-signal');
Stream.finished = eos;
function lazyLoadPromises() {
diff --git a/node.gyp b/node.gyp
index 643c8a43c6f..865a7de9317 100644
--- a/node.gyp
+++ b/node.gyp
@@ -245,6 +245,7 @@
'lib/internal/worker/js_transferable.js',
'lib/internal/watchdog.js',
'lib/internal/streams/lazy_transform.js',
+ 'lib/internal/streams/add-abort-signal.js',
'lib/internal/streams/buffer_list.js',
'lib/internal/streams/duplexpair.js',
'lib/internal/streams/from.js',
diff --git a/test/parallel/test-bootstrap-modules.js b/test/parallel/test-bootstrap-modules.js
index a6f2c15c196..0887b8a4836 100644
--- a/test/parallel/test-bootstrap-modules.js
+++ b/test/parallel/test-bootstrap-modules.js
@@ -78,6 +78,7 @@ const expectedModules = new Set([
'NativeModule internal/process/warning',
'NativeModule internal/querystring',
'NativeModule internal/source_map/source_map_cache',
+ 'NativeModule internal/streams/add-abort-signal',
'NativeModule internal/streams/buffer_list',
'NativeModule internal/streams/destroy',
'NativeModule internal/streams/duplex',
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js
index 23887122fc7..5b12531f92e 100644
--- a/test/parallel/test-stream-pipeline.js
+++ b/test/parallel/test-stream-pipeline.js
@@ -8,7 +8,8 @@ const {
Transform,
pipeline,
PassThrough,
- Duplex
+ Duplex,
+ addAbortSignal,
} = require('stream');
const assert = require('assert');
const http = require('http');
@@ -1261,3 +1262,32 @@ const net = require('net');
() => common.mustNotCall(),
);
}
+
+
+{
+ const ac = new AbortController();
+ const r = Readable.from(async function* () {
+ for (let i = 0; i < 10; i++) {
+ await Promise.resolve();
+ yield String(i);
+ if (i === 5) {
+ ac.abort();
+ }
+ }
+ }());
+ let res = '';
+ const w = new Writable({
+ write(chunk, encoding, callback) {
+ res += chunk;
+ callback();
+ }
+ });
+ const cb = common.mustCall((err) => {
+ assert.strictEqual(err.name, 'AbortError');
+ assert.strictEqual(res, '012345');
+ assert.strictEqual(w.destroyed, true);
+ assert.strictEqual(r.destroyed, true);
+ assert.strictEqual(pipelined.destroyed, true);
+ });
+ const pipelined = addAbortSignal(ac.signal, pipeline([r, w], cb));
+}
diff --git a/test/parallel/test-stream-readable-destroy.js b/test/parallel/test-stream-readable-destroy.js
index 8ab78ec8cce..1e765f36977 100644
--- a/test/parallel/test-stream-readable-destroy.js
+++ b/test/parallel/test-stream-readable-destroy.js
@@ -1,7 +1,7 @@
'use strict';
const common = require('../common');
-const { Readable } = require('stream');
+const { Readable, addAbortSignal } = require('stream');
const assert = require('assert');
{
@@ -268,3 +268,38 @@ const assert = require('assert');
}));
read.resume();
}
+
+{
+ const controller = new AbortController();
+ const read = addAbortSignal(controller.signal, new Readable({
+ read() {
+ this.push('asd');
+ },
+ }));
+
+ read.on('error', common.mustCall((e) => {
+ assert.strictEqual(e.name, 'AbortError');
+ }));
+ controller.abort();
+ read.on('data', common.mustNotCall());
+}
+
+{
+ const controller = new AbortController();
+ const read = addAbortSignal(controller.signal, new Readable({
+ objectMode: true,
+ read() {
+ return false;
+ }
+ }));
+ read.push('asd');
+
+ read.on('error', common.mustCall((e) => {
+ assert.strictEqual(e.name, 'AbortError');
+ }));
+ assert.rejects((async () => {
+ /* eslint-disable-next-line no-unused-vars */
+ for await (const chunk of read) {}
+ })(), /AbortError/);
+ setTimeout(() => controller.abort(), 0);
+}
diff --git a/test/parallel/test-stream-writable-destroy.js b/test/parallel/test-stream-writable-destroy.js
index dca01724f73..0fcaf892897 100644
--- a/test/parallel/test-stream-writable-destroy.js
+++ b/test/parallel/test-stream-writable-destroy.js
@@ -1,7 +1,7 @@
'use strict';
const common = require('../common');
-const { Writable } = require('stream');
+const { Writable, addAbortSignal } = require('stream');
const assert = require('assert');
{
@@ -417,3 +417,17 @@ const assert = require('assert');
}));
write.write('asd');
}
+
+{
+ const ac = new AbortController();
+ const write = addAbortSignal(ac.signal, new Writable({
+ write(chunk, enc, cb) { cb(); }
+ }));
+
+ write.on('error', common.mustCall((e) => {
+ assert.strictEqual(e.name, 'AbortError');
+ assert.strictEqual(write.destroyed, true);
+ }));
+ write.write('asd');
+ ac.abort();
+}