Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2021-05-02 19:17:18 +0300
committerDaniele Belardi <dwon.dnl@gmail.com>2021-06-15 20:43:49 +0300
commitf4609bdf3fabdf441da6af17c2022565f4e18f9d (patch)
tree82bc067adc7902be0b866abbb85ff7c4ae547f65 /lib
parentc0becbc1bdebf0038dbf6683e347db09011d3004 (diff)
stream: bypass legacy destroy for pipeline and async iteration
PR-URL: https://github.com/nodejs/node/pull/38505 Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/_http_client.js2
-rw-r--r--lib/_http_incoming.js10
-rw-r--r--lib/_http_server.js12
-rw-r--r--lib/internal/streams/destroy.js59
-rw-r--r--lib/stream.js2
5 files changed, 73 insertions, 12 deletions
diff --git a/lib/_http_client.js b/lib/_http_client.js
index f3b88b62730..fde7fde86bb 100644
--- a/lib/_http_client.js
+++ b/lib/_http_client.js
@@ -53,6 +53,7 @@ const {
prepareError,
} = require('_http_common');
const { OutgoingMessage } = require('_http_outgoing');
+const { kDestroy } = require('internal/streams/destroy');
const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
@@ -609,6 +610,7 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.res = res;
res.req = req;
+ res[kDestroy] = null;
// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js
index e0f1354e6c9..3961b583de9 100644
--- a/lib/_http_incoming.js
+++ b/lib/_http_incoming.js
@@ -31,6 +31,7 @@ const {
} = primordials;
const { Readable, finished } = require('stream');
+const { kDestroy } = require('internal/streams/destroy');
const kHeaders = Symbol('kHeaders');
const kHeadersCount = Symbol('kHeadersCount');
@@ -188,13 +189,18 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
this.socket.destroy(err);
const cleanup = finished(this.socket, (e) => {
cleanup();
- onError(this, e || err, cb);
+ process.nextTick(onError, this, e || err, cb);
});
} else {
- onError(this, err, cb);
+ process.nextTick(onError, this, err, cb);
}
};
+IncomingMessage.prototype[kDestroy] = function(err) {
+ this.socket = null;
+ this.destroy(err);
+};
+
IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
function _addHeaderLines(headers, n) {
if (headers && headers.length) {
diff --git a/lib/_http_server.js b/lib/_http_server.js
index 97df58a007d..946394f9add 100644
--- a/lib/_http_server.js
+++ b/lib/_http_server.js
@@ -231,9 +231,7 @@ function onServerResponseClose() {
// where the ServerResponse object has already been deconstructed.
// Fortunately, that requires only a single if check. :-)
if (this._httpMessage) {
- this._httpMessage.destroyed = true;
- this._httpMessage._closed = true;
- this._httpMessage.emit('close');
+ emitCloseNT(this._httpMessage);
}
}
@@ -837,9 +835,11 @@ function resOnFinish(req, res, socket, state, server) {
}
function emitCloseNT(self) {
- self.destroyed = true;
- self._closed = true;
- self.emit('close');
+ if (!self.destroyed) {
+ self.destroyed = true;
+ self._closed = true;
+ self.emit('close');
+ }
}
// The following callback is issued after the headers have been read on a
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index a2892c67a0f..4238e12074d 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -5,6 +5,7 @@ const {
codes: {
ERR_MULTIPLE_CALLBACK,
},
+ AbortError,
} = require('internal/errors');
const {
Symbol,
@@ -363,15 +364,65 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}
+const kDestroyed = Symbol('kDestroyed');
+
+function emitCloseLegacy(stream) {
+ stream.emit('close');
+}
+
+function emitErrorCloseLegacy(stream, err) {
+ stream.emit('error', err);
+ process.nextTick(emitCloseLegacy, stream);
+}
+
+function isDestroyed(stream) {
+ return stream.destroyed || stream[kDestroyed];
+}
+
+function isReadable(stream) {
+ return stream.readable && !stream.readableEnded && !isDestroyed(stream);
+}
+
+function isWritable(stream) {
+ return stream.writable && !stream.writableEnded && !isDestroyed(stream);
+}
+
// Normalize destroy for legacy.
function destroyer(stream, err) {
- if (isRequest(stream)) return stream.abort();
- if (isRequest(stream.req)) return stream.req.abort();
- if (typeof stream.destroy === 'function') return stream.destroy(err);
- if (typeof stream.close === 'function') return stream.close();
+ if (isDestroyed(stream)) {
+ return;
+ }
+
+ if (!err && (isReadable(stream) || isWritable(stream))) {
+ err = new AbortError();
+ }
+
+ // TODO: Remove isRequest branches.
+ if (typeof stream[kDestroy] === 'function') {
+ stream[kDestroy](err);
+ } else if (isRequest(stream)) {
+ stream.abort();
+ } else if (isRequest(stream.req)) {
+ stream.req.abort();
+ } else if (typeof stream.destroy === 'function') {
+ stream.destroy(err);
+ } else if (typeof stream.close === 'function') {
+ // TODO: Don't lose err?
+ stream.close();
+ } else if (err) {
+ process.nextTick(emitErrorCloseLegacy, stream);
+ } else {
+ process.nextTick(emitCloseLegacy, stream);
+ }
+
+ if (!stream.destroyed) {
+ stream[kDestroyed] = true;
+ }
}
module.exports = {
+ kDestroy,
+ isDestroyed,
construct,
destroyer,
destroy,
diff --git a/lib/stream.js b/lib/stream.js
index 85adda81b32..16a2370232e 100644
--- a/lib/stream.js
+++ b/lib/stream.js
@@ -30,6 +30,7 @@ const {
} = require('internal/util');
const pipeline = require('internal/streams/pipeline');
+const { destroyer } = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const internalBuffer = require('internal/buffer');
@@ -45,6 +46,7 @@ Stream.pipeline = pipeline;
const { addAbortSignal } = require('internal/streams/add-abort-signal');
Stream.addAbortSignal = addAbortSignal;
Stream.finished = eos;
+Stream.destroy = destroyer;
ObjectDefineProperty(Stream, 'promises', {
configurable: true,