diff options
author | Mathias Buus <mathiasbuus@gmail.com> | 2018-08-21 21:05:12 +0300 |
---|---|---|
committer | Mathias Buus <mathiasbuus@gmail.com> | 2018-10-30 17:17:40 +0300 |
commit | f24b070cb7fb04df6249fab5264df2146e2b6cac (patch) | |
tree | 46438985fb9676a4e5b951a00f8a30693f25c280 /lib | |
parent | cd1193d9ed83c37a431a19ae33bbf5e25ec15d65 (diff) |
stream: add auto-destroy mode
PR-URL: https://github.com/nodejs/node/pull/22795
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Jeremiah Senkpiel <fishrock123@rocketmail.com>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/_stream_readable.js | 23 | ||||
-rw-r--r-- | lib/_stream_writable.js | 26 | ||||
-rw-r--r-- | lib/internal/streams/destroy.js | 20 |
3 files changed, 57 insertions, 12 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 488d10a10b5..2a2122e0e55 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -46,6 +46,7 @@ let createReadableStreamAsyncIterator; util.inherits(Readable, Stream); +const { errorOrDestroy } = destroyImpl; const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { @@ -117,6 +118,9 @@ function ReadableState(options, stream, isDuplex) { // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; + // Should .destroy() be called after 'end' (and potentially 'finish') + this.autoDestroy = !!options.autoDestroy; + // has it been destroyed this.destroyed = false; @@ -235,7 +239,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (!skipChunkCheck) er = chunkInvalid(state, chunk); if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); } else if (state.objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && !state.objectMode && @@ -245,11 +249,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (addToFront) { if (state.endEmitted) - stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); + errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT()); else addChunk(stream, state, chunk, true); } else if (state.ended) { - stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF()); + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); } else if (state.destroyed) { return false; } else { @@ -581,7 +585,7 @@ function maybeReadMore_(stream, state) { // for virtual (non-string, non-buffer) streams, "length" is somewhat // arbitrary, and perhaps not very meaningful. Readable.prototype._read = function(n) { - this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()')); + errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); }; Readable.prototype.pipe = function(dest, pipeOpts) { @@ -687,7 +691,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) { unpipe(); dest.removeListener('error', onerror); if (EE.listenerCount(dest, 'error') === 0) - dest.emit('error', er); + errorOrDestroy(dest, er); } // Make sure our error handler is attached before userland ones. @@ -1092,5 +1096,14 @@ function endReadableNT(state, stream) { state.endEmitted = true; stream.readable = false; stream.emit('end'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well + const wState = stream._writableState; + if (!wState || (wState.autoDestroy && wState.finished)) { + stream.destroy(); + } + } } } diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js index 3bad957912b..160179cd0e8 100644 --- a/lib/_stream_writable.js +++ b/lib/_stream_writable.js @@ -45,6 +45,8 @@ const { ERR_UNKNOWN_ENCODING } = require('internal/errors').codes; +const { errorOrDestroy } = destroyImpl; + util.inherits(Writable, Stream); function nop() {} @@ -147,6 +149,9 @@ function WritableState(options, stream, isDuplex) { // Should close be emitted on destroy. Defaults to true. this.emitClose = options.emitClose !== false; + // Should .destroy() be called after 'finish' (and potentially 'end') + this.autoDestroy = !!options.autoDestroy; + // count buffered requests this.bufferedRequestCount = 0; @@ -235,14 +240,14 @@ function Writable(options) { // Otherwise people can pipe Writable streams, which is just wrong. Writable.prototype.pipe = function() { - this.emit('error', new ERR_STREAM_CANNOT_PIPE()); + errorOrDestroy(this, new ERR_STREAM_CANNOT_PIPE()); }; function writeAfterEnd(stream, cb) { var er = new ERR_STREAM_WRITE_AFTER_END(); // TODO: defer error events consistently everywhere, not just the cb - stream.emit('error', er); + errorOrDestroy(stream, er); process.nextTick(cb, er); } @@ -258,7 +263,7 @@ function validChunk(stream, state, chunk, cb) { er = new ERR_INVALID_ARG_TYPE('chunk', ['string', 'Buffer'], chunk); } if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); process.nextTick(cb, er); return false; } @@ -422,13 +427,13 @@ function onwriteError(stream, state, sync, er, cb) { // after error process.nextTick(finishMaybe, stream, state); stream._writableState.errorEmitted = true; - stream.emit('error', er); + errorOrDestroy(stream, er); } else { // the caller expect this to happen before if // it is async cb(er); stream._writableState.errorEmitted = true; - stream.emit('error', er); + errorOrDestroy(stream, er); // this can emit finish, but finish must // always follow error finishMaybe(stream, state); @@ -612,7 +617,7 @@ function callFinal(stream, state) { stream._final((err) => { state.pendingcb--; if (err) { - stream.emit('error', err); + errorOrDestroy(stream, err); } state.prefinished = true; stream.emit('prefinish'); @@ -639,6 +644,15 @@ function finishMaybe(stream, state) { if (state.pendingcb === 0) { state.finished = true; stream.emit('finish'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the readable side is ready for autoDestroy as well + const rState = stream._readableState; + if (!rState || (rState.autoDestroy && rState.endEmitted)) { + stream.destroy(); + } + } } } return need; diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js index 3a0383cc3ce..ce9d2545e45 100644 --- a/lib/internal/streams/destroy.js +++ b/lib/internal/streams/destroy.js @@ -82,7 +82,25 @@ function emitErrorNT(self, err) { self.emit('error', err); } +function errorOrDestroy(stream, err) { + // We have tests that rely on errors being emitted + // in the same tick, so changing this is semver major. + // For now when you opt-in to autoDestroy we allow + // the error to be emitted nextTick. In a future + // semver major update we should change the default to this. + + const rState = stream._readableState; + const wState = stream._writableState; + + if ((rState && rState.autoDestroy) || (wState && wState.autoDestroy)) + stream.destroy(err); + else + stream.emit('error', err); +} + + module.exports = { destroy, - undestroy + undestroy, + errorOrDestroy }; |