diff options
Diffstat (limited to 'node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r-- | node_modules/readable-stream/lib/_stream_readable.js | 71 |
1 files changed, 54 insertions, 17 deletions
diff --git a/node_modules/readable-stream/lib/_stream_readable.js b/node_modules/readable-stream/lib/_stream_readable.js index 33f478d7e..192d45148 100644 --- a/node_modules/readable-stream/lib/_stream_readable.js +++ b/node_modules/readable-stream/lib/_stream_readable.js @@ -80,17 +80,16 @@ var _require$codes = require('../errors').codes, ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE, ERR_STREAM_PUSH_AFTER_EOF = _require$codes.ERR_STREAM_PUSH_AFTER_EOF, ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED, - ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT; - -var _require2 = require('../experimentalWarning'), - emitExperimentalWarning = _require2.emitExperimentalWarning; // Lazy loaded to improve the startup performance. + ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT; // Lazy loaded to improve the startup performance. var StringDecoder; var createReadableStreamAsyncIterator; +var from; require('inherits')(Readable, Stream); +var errorOrDestroy = destroyImpl.errorOrDestroy; var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { @@ -144,7 +143,9 @@ function ReadableState(options, stream, isDuplex) { this.resumeScheduled = false; this.paused = true; // Should close be emitted on destroy. Defaults to true. - this.emitClose = options.emitClose !== false; // has it been destroyed + this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'end' (and potentially 'finish') + + this.autoDestroy = !!options.autoDestroy; // has it been destroyed this.destroyed = false; // Crypto is kind of old and crusty. Historically, its default string // encoding is 'binary' so we have to make this configurable. @@ -257,16 +258,16 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) { if (!skipChunkCheck) er = chunkInvalid(state, chunk); if (er) { - stream.emit('error', er); + errorOrDestroy(stream, er); } else if (state.objectMode || chunk && chunk.length > 0) { if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) { chunk = _uint8ArrayToBuffer(chunk); } if (addToFront) { - if (state.endEmitted) stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true); + if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true); } else if (state.ended) { - stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF()); + errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF()); } else if (state.destroyed) { return false; } else { @@ -322,17 +323,32 @@ Readable.prototype.isPaused = function () { Readable.prototype.setEncoding = function (enc) { if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder; - this._readableState.decoder = new StringDecoder(enc); // if setEncoding(null), decoder.encoding equals utf8 + var decoder = new StringDecoder(enc); + this._readableState.decoder = decoder; // If setEncoding(null), decoder.encoding equals utf8 + + this._readableState.encoding = this._readableState.decoder.encoding; // Iterate over current buffer to convert already stored Buffers: + + var p = this._readableState.buffer.head; + var content = ''; + + while (p !== null) { + content += decoder.write(p.data); + p = p.next; + } + + this._readableState.buffer.clear(); - this._readableState.encoding = this._readableState.decoder.encoding; + if (content !== '') this._readableState.buffer.push(content); + this._readableState.length = content.length; return this; -}; // Don't raise the hwm > 8MB +}; // Don't raise the hwm > 1GB -var MAX_HWM = 0x800000; +var MAX_HWM = 0x40000000; function computeNewHighWaterMark(n) { if (n >= MAX_HWM) { + // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE. n = MAX_HWM; } else { // Get the next highest power of 2 to prevent increasing hwm excessively in @@ -449,7 +465,7 @@ Readable.prototype.read = function (n) { if (n > 0) ret = fromList(n, state);else ret = null; if (ret === null) { - state.needReadable = true; + state.needReadable = state.length <= state.highWaterMark; n = 0; } else { state.length -= n; @@ -469,6 +485,7 @@ Readable.prototype.read = function (n) { }; function onEofChunk(stream, state) { + debug('onEofChunk'); if (state.ended) return; if (state.decoder) { @@ -503,6 +520,7 @@ function onEofChunk(stream, state) { function emitReadable(stream) { var state = stream._readableState; + debug('emitReadable', state.needReadable, state.emittedReadable); state.needReadable = false; if (!state.emittedReadable) { @@ -518,6 +536,7 @@ function emitReadable_(stream) { if (!state.destroyed && (state.length || state.ended)) { stream.emit('readable'); + state.emittedReadable = false; } // The stream needs another readable event if // 1. It is not flowing, as the flow mechanism will take // care of it. @@ -583,7 +602,7 @@ function maybeReadMore_(stream, state) { Readable.prototype._read = function (n) { - this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()')); + errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()')); }; Readable.prototype.pipe = function (dest, pipeOpts) { @@ -682,7 +701,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) { debug('onerror', er); unpipe(); dest.removeListener('error', onerror); - if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er); + if (EElistenerCount(dest, 'error') === 0) errorOrDestroy(dest, er); } // Make sure our error handler is attached before userland ones. @@ -986,8 +1005,6 @@ Readable.prototype.wrap = function (stream) { if (typeof Symbol === 'function') { Readable.prototype[Symbol.asyncIterator] = function () { - emitExperimentalWarning('Readable[Symbol.asyncIterator]'); - if (createReadableStreamAsyncIterator === undefined) { createReadableStreamAsyncIterator = require('./internal/streams/async_iterator'); } @@ -1075,9 +1092,29 @@ function endReadableNT(state, stream) { state.endEmitted = true; stream.readable = false; stream.emit('end'); + + if (state.autoDestroy) { + // In case of duplex streams we need a way to detect + // if the writable side is ready for autoDestroy as well + var wState = stream._writableState; + + if (!wState || wState.autoDestroy && wState.finished) { + stream.destroy(); + } + } } } +if (typeof Symbol === 'function') { + Readable.from = function (iterable, opts) { + if (from === undefined) { + from = require('./internal/streams/from'); + } + + return from(Readable, iterable, opts); + }; +} + function indexOf(xs, x) { for (var i = 0, l = xs.length; i < l; i++) { if (xs[i] === x) return i; |