Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/npm/cli.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/readable-stream/lib/_stream_readable.js')
-rw-r--r--node_modules/readable-stream/lib/_stream_readable.js71
1 files changed, 54 insertions, 17 deletions
diff --git a/node_modules/readable-stream/lib/_stream_readable.js b/node_modules/readable-stream/lib/_stream_readable.js
index 33f478d7e..192d45148 100644
--- a/node_modules/readable-stream/lib/_stream_readable.js
+++ b/node_modules/readable-stream/lib/_stream_readable.js
@@ -80,17 +80,16 @@ var _require$codes = require('../errors').codes,
ERR_INVALID_ARG_TYPE = _require$codes.ERR_INVALID_ARG_TYPE,
ERR_STREAM_PUSH_AFTER_EOF = _require$codes.ERR_STREAM_PUSH_AFTER_EOF,
ERR_METHOD_NOT_IMPLEMENTED = _require$codes.ERR_METHOD_NOT_IMPLEMENTED,
- ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT;
-
-var _require2 = require('../experimentalWarning'),
- emitExperimentalWarning = _require2.emitExperimentalWarning; // Lazy loaded to improve the startup performance.
+ ERR_STREAM_UNSHIFT_AFTER_END_EVENT = _require$codes.ERR_STREAM_UNSHIFT_AFTER_END_EVENT; // Lazy loaded to improve the startup performance.
var StringDecoder;
var createReadableStreamAsyncIterator;
+var from;
require('inherits')(Readable, Stream);
+var errorOrDestroy = destroyImpl.errorOrDestroy;
var kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
function prependListener(emitter, event, fn) {
@@ -144,7 +143,9 @@ function ReadableState(options, stream, isDuplex) {
this.resumeScheduled = false;
this.paused = true; // Should close be emitted on destroy. Defaults to true.
- this.emitClose = options.emitClose !== false; // has it been destroyed
+ this.emitClose = options.emitClose !== false; // Should .destroy() be called after 'end' (and potentially 'finish')
+
+ this.autoDestroy = !!options.autoDestroy; // has it been destroyed
this.destroyed = false; // Crypto is kind of old and crusty. Historically, its default string
// encoding is 'binary' so we have to make this configurable.
@@ -257,16 +258,16 @@ function readableAddChunk(stream, chunk, encoding, addToFront, skipChunkCheck) {
if (!skipChunkCheck) er = chunkInvalid(state, chunk);
if (er) {
- stream.emit('error', er);
+ errorOrDestroy(stream, er);
} else if (state.objectMode || chunk && chunk.length > 0) {
if (typeof chunk !== 'string' && !state.objectMode && Object.getPrototypeOf(chunk) !== Buffer.prototype) {
chunk = _uint8ArrayToBuffer(chunk);
}
if (addToFront) {
- if (state.endEmitted) stream.emit('error', new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true);
+ if (state.endEmitted) errorOrDestroy(stream, new ERR_STREAM_UNSHIFT_AFTER_END_EVENT());else addChunk(stream, state, chunk, true);
} else if (state.ended) {
- stream.emit('error', new ERR_STREAM_PUSH_AFTER_EOF());
+ errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
return false;
} else {
@@ -322,17 +323,32 @@ Readable.prototype.isPaused = function () {
Readable.prototype.setEncoding = function (enc) {
if (!StringDecoder) StringDecoder = require('string_decoder/').StringDecoder;
- this._readableState.decoder = new StringDecoder(enc); // if setEncoding(null), decoder.encoding equals utf8
+ var decoder = new StringDecoder(enc);
+ this._readableState.decoder = decoder; // If setEncoding(null), decoder.encoding equals utf8
+
+ this._readableState.encoding = this._readableState.decoder.encoding; // Iterate over current buffer to convert already stored Buffers:
+
+ var p = this._readableState.buffer.head;
+ var content = '';
+
+ while (p !== null) {
+ content += decoder.write(p.data);
+ p = p.next;
+ }
+
+ this._readableState.buffer.clear();
- this._readableState.encoding = this._readableState.decoder.encoding;
+ if (content !== '') this._readableState.buffer.push(content);
+ this._readableState.length = content.length;
return this;
-}; // Don't raise the hwm > 8MB
+}; // Don't raise the hwm > 1GB
-var MAX_HWM = 0x800000;
+var MAX_HWM = 0x40000000;
function computeNewHighWaterMark(n) {
if (n >= MAX_HWM) {
+ // TODO(ronag): Throw ERR_VALUE_OUT_OF_RANGE.
n = MAX_HWM;
} else {
// Get the next highest power of 2 to prevent increasing hwm excessively in
@@ -449,7 +465,7 @@ Readable.prototype.read = function (n) {
if (n > 0) ret = fromList(n, state);else ret = null;
if (ret === null) {
- state.needReadable = true;
+ state.needReadable = state.length <= state.highWaterMark;
n = 0;
} else {
state.length -= n;
@@ -469,6 +485,7 @@ Readable.prototype.read = function (n) {
};
function onEofChunk(stream, state) {
+ debug('onEofChunk');
if (state.ended) return;
if (state.decoder) {
@@ -503,6 +520,7 @@ function onEofChunk(stream, state) {
function emitReadable(stream) {
var state = stream._readableState;
+ debug('emitReadable', state.needReadable, state.emittedReadable);
state.needReadable = false;
if (!state.emittedReadable) {
@@ -518,6 +536,7 @@ function emitReadable_(stream) {
if (!state.destroyed && (state.length || state.ended)) {
stream.emit('readable');
+ state.emittedReadable = false;
} // The stream needs another readable event if
// 1. It is not flowing, as the flow mechanism will take
// care of it.
@@ -583,7 +602,7 @@ function maybeReadMore_(stream, state) {
Readable.prototype._read = function (n) {
- this.emit('error', new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
+ errorOrDestroy(this, new ERR_METHOD_NOT_IMPLEMENTED('_read()'));
};
Readable.prototype.pipe = function (dest, pipeOpts) {
@@ -682,7 +701,7 @@ Readable.prototype.pipe = function (dest, pipeOpts) {
debug('onerror', er);
unpipe();
dest.removeListener('error', onerror);
- if (EElistenerCount(dest, 'error') === 0) dest.emit('error', er);
+ if (EElistenerCount(dest, 'error') === 0) errorOrDestroy(dest, er);
} // Make sure our error handler is attached before userland ones.
@@ -986,8 +1005,6 @@ Readable.prototype.wrap = function (stream) {
if (typeof Symbol === 'function') {
Readable.prototype[Symbol.asyncIterator] = function () {
- emitExperimentalWarning('Readable[Symbol.asyncIterator]');
-
if (createReadableStreamAsyncIterator === undefined) {
createReadableStreamAsyncIterator = require('./internal/streams/async_iterator');
}
@@ -1075,9 +1092,29 @@ function endReadableNT(state, stream) {
state.endEmitted = true;
stream.readable = false;
stream.emit('end');
+
+ if (state.autoDestroy) {
+ // In case of duplex streams we need a way to detect
+ // if the writable side is ready for autoDestroy as well
+ var wState = stream._writableState;
+
+ if (!wState || wState.autoDestroy && wState.finished) {
+ stream.destroy();
+ }
+ }
}
}
+if (typeof Symbol === 'function') {
+ Readable.from = function (iterable, opts) {
+ if (from === undefined) {
+ from = require('./internal/streams/from');
+ }
+
+ return from(Readable, iterable, opts);
+ };
+}
+
function indexOf(xs, x) {
for (var i = 0, l = xs.length; i < l; i++) {
if (xs[i] === x) return i;