Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/npm/cli.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'node_modules/minipass/index.js')
-rw-r--r--node_modules/minipass/index.js241
1 files changed, 200 insertions, 41 deletions
diff --git a/node_modules/minipass/index.js b/node_modules/minipass/index.js
index ae2dd9064..55ea0f3dd 100644
--- a/node_modules/minipass/index.js
+++ b/node_modules/minipass/index.js
@@ -1,41 +1,57 @@
'use strict'
const EE = require('events')
+const Stream = require('stream')
const Yallist = require('yallist')
+const SD = require('string_decoder').StringDecoder
+
const EOF = Symbol('EOF')
const MAYBE_EMIT_END = Symbol('maybeEmitEnd')
const EMITTED_END = Symbol('emittedEnd')
+const EMITTING_END = Symbol('emittingEnd')
const CLOSED = Symbol('closed')
const READ = Symbol('read')
const FLUSH = Symbol('flush')
-const doIter = process.env._MP_NO_ITERATOR_SYMBOLS_ !== '1'
-const ASYNCITERATOR = doIter && Symbol.asyncIterator || Symbol('asyncIterator not implemented')
-const ITERATOR = doIter && Symbol.iterator || Symbol('iterator not implemented')
const FLUSHCHUNK = Symbol('flushChunk')
-const SD = require('string_decoder').StringDecoder
const ENCODING = Symbol('encoding')
const DECODER = Symbol('decoder')
const FLOWING = Symbol('flowing')
+const PAUSED = Symbol('paused')
const RESUME = Symbol('resume')
const BUFFERLENGTH = Symbol('bufferLength')
const BUFFERPUSH = Symbol('bufferPush')
const BUFFERSHIFT = Symbol('bufferShift')
const OBJECTMODE = Symbol('objectMode')
-const SILENT_END = Symbol('silentEnd')
-
-// Buffer in node 4.x < 4.5.0 doesn't have working Buffer.from
-// or Buffer.alloc, and Buffer in node 10 deprecated the ctor.
-// .M, this is fine .\^/M..
-let B = Buffer
-/* istanbul ignore next */
-if (!B.alloc) {
- B = require('safe-buffer').Buffer
-}
-
-module.exports = class MiniPass extends EE {
+const DESTROYED = Symbol('destroyed')
+
+// TODO remove when Node v8 support drops
+const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
+const ASYNCITERATOR = doIter && Symbol.asyncIterator
+ || Symbol('asyncIterator not implemented')
+const ITERATOR = doIter && Symbol.iterator
+ || Symbol('iterator not implemented')
+
+// events that mean 'the stream is over'
+// these are treated specially, and re-emitted
+// if they are listened for after emitting.
+const isEndish = ev =>
+ ev === 'end' ||
+ ev === 'finish' ||
+ ev === 'prefinish'
+
+const isArrayBuffer = b => b instanceof ArrayBuffer ||
+ typeof b === 'object' &&
+ b.constructor &&
+ b.constructor.name === 'ArrayBuffer' &&
+ b.byteLength >= 0
+
+const isArrayBufferView = b => !Buffer.isBuffer(b) && ArrayBuffer.isView(b)
+
+module.exports = class Minipass extends Stream {
constructor (options) {
super()
- this[SILENT_END] = false
this[FLOWING] = false
+ // whether we're explicitly paused
+ this[PAUSED] = false
this.pipes = new Yallist()
this.buffer = new Yallist()
this[OBJECTMODE] = options && options.objectMode || false
@@ -48,10 +64,12 @@ module.exports = class MiniPass extends EE {
this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
this[EOF] = false
this[EMITTED_END] = false
+ this[EMITTING_END] = false
this[CLOSED] = false
this.writable = true
this.readable = true
this[BUFFERLENGTH] = 0
+ this[DESTROYED] = false
}
get bufferLength () { return this[BUFFERLENGTH] }
@@ -78,25 +96,61 @@ module.exports = class MiniPass extends EE {
this.encoding = enc
}
+ get objectMode () { return this[OBJECTMODE] }
+ set objectMode (ॐ ) { this[OBJECTMODE] = this[OBJECTMODE] || !!ॐ }
+
write (chunk, encoding, cb) {
if (this[EOF])
throw new Error('write after end')
+ if (this[DESTROYED]) {
+ this.emit('error', Object.assign(
+ new Error('Cannot call write after a stream was destroyed'),
+ { code: 'ERR_STREAM_DESTROYED' }
+ ))
+ return true
+ }
+
if (typeof encoding === 'function')
cb = encoding, encoding = 'utf8'
if (!encoding)
encoding = 'utf8'
+ // convert array buffers and typed array views into buffers
+ // at some point in the future, we may want to do the opposite!
+ // leave strings and buffers as-is
+ // anything else switches us into object mode
+ if (!this[OBJECTMODE] && !Buffer.isBuffer(chunk)) {
+ if (isArrayBufferView(chunk))
+ chunk = Buffer.from(chunk.buffer, chunk.byteOffset, chunk.byteLength)
+ else if (isArrayBuffer(chunk))
+ chunk = Buffer.from(chunk)
+ else if (typeof chunk !== 'string')
+ // use the setter so we throw if we have encoding set
+ this.objectMode = true
+ }
+
+ // this ensures at this point that the chunk is a buffer or string
+ // don't buffer it up or send it to the decoder
+ if (!this.objectMode && !chunk.length) {
+ const ret = this.flowing
+ if (this[BUFFERLENGTH] !== 0)
+ this.emit('readable')
+ if (cb)
+ cb()
+ return ret
+ }
+
// fast-path writing strings of same encoding to a stream with
// an empty buffer, skipping the buffer/decoder dance
if (typeof chunk === 'string' && !this[OBJECTMODE] &&
// unless it is a string already ready for us to use
!(encoding === this[ENCODING] && !this[DECODER].lastNeed)) {
- chunk = B.from(chunk, encoding)
+ chunk = Buffer.from(chunk, encoding)
}
- if (B.isBuffer(chunk) && this[ENCODING])
+ if (Buffer.isBuffer(chunk) && this[ENCODING])
chunk = this[DECODER].write(chunk)
try {
@@ -104,13 +158,17 @@ module.exports = class MiniPass extends EE {
? (this.emit('data', chunk), this.flowing)
: (this[BUFFERPUSH](chunk), false)
} finally {
- this.emit('readable')
+ if (this[BUFFERLENGTH] !== 0)
+ this.emit('readable')
if (cb)
cb()
}
}
read (n) {
+ if (this[DESTROYED])
+ return null
+
try {
if (this[BUFFERLENGTH] === 0 || n === 0 || n > this[BUFFERLENGTH])
return null
@@ -125,7 +183,7 @@ module.exports = class MiniPass extends EE {
])
else
this.buffer = new Yallist([
- B.concat(Array.from(this.buffer), this[BUFFERLENGTH])
+ Buffer.concat(Array.from(this.buffer), this[BUFFERLENGTH])
])
}
@@ -163,12 +221,22 @@ module.exports = class MiniPass extends EE {
this.once('end', cb)
this[EOF] = true
this.writable = false
- if (this.flowing)
+
+ // if we haven't written anything, then go ahead and emit,
+ // even if we're not reading.
+ // we'll re-emit if a new 'end' listener is added anyway.
+ // This makes MP more suitable to write-only use cases.
+ if (this.flowing || !this[PAUSED])
this[MAYBE_EMIT_END]()
+ return this
}
// don't let the internal resume be overwritten
[RESUME] () {
+ if (this[DESTROYED])
+ return
+
+ this[PAUSED] = false
this[FLOWING] = true
this.emit('resume')
if (this.buffer.length)
@@ -185,12 +253,21 @@ module.exports = class MiniPass extends EE {
pause () {
this[FLOWING] = false
+ this[PAUSED] = true
+ }
+
+ get destroyed () {
+ return this[DESTROYED]
}
get flowing () {
return this[FLOWING]
}
+ get paused () {
+ return this[PAUSED]
+ }
+
[BUFFERPUSH] (chunk) {
if (this[OBJECTMODE])
this[BUFFERLENGTH] += 1
@@ -221,13 +298,24 @@ module.exports = class MiniPass extends EE {
}
pipe (dest, opts) {
+ if (this[DESTROYED])
+ return
+
+ const ended = this[EMITTED_END]
+ opts = opts || {}
if (dest === process.stdout || dest === process.stderr)
- (opts = opts || {}).end = false
+ opts.end = false
+ else
+ opts.end = opts.end !== false
+
const p = { dest: dest, opts: opts, ondrain: _ => this[RESUME]() }
this.pipes.push(p)
dest.on('drain', p.ondrain)
this[RESUME]()
+ // piping an ended stream ends immediately
+ if (ended && p.opts.end)
+ p.dest.end()
return dest
}
@@ -241,9 +329,9 @@ module.exports = class MiniPass extends EE {
} finally {
if (ev === 'data' && !this.pipes.length && !this.flowing)
this[RESUME]()
- else if (ev === 'end' && this[SILENT_END] && this[EMITTED_END]) {
- this[SILENT_END] = false
- super.emit('end')
+ else if (isEndish(ev) && this[EMITTED_END]) {
+ super.emit(ev)
+ this.removeAllListeners(ev)
}
}
}
@@ -253,23 +341,34 @@ module.exports = class MiniPass extends EE {
}
[MAYBE_EMIT_END] () {
- if (!this[EMITTED_END] && this.buffer.length === 0 && this[EOF]) {
+ if (!this[EMITTING_END] &&
+ !this[EMITTED_END] &&
+ !this[DESTROYED] &&
+ this.buffer.length === 0 &&
+ this[EOF]) {
+ this[EMITTING_END] = true
this.emit('end')
this.emit('prefinish')
this.emit('finish')
if (this[CLOSED])
this.emit('close')
+ this[EMITTING_END] = false
}
}
emit (ev, data) {
- if (ev === 'data') {
+ // error and close are only events allowed after calling destroy()
+ if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
+ return
+ else if (ev === 'data') {
if (!data)
return
if (this.pipes.length)
- this.pipes.forEach(p => p.dest.write(data) || this.pause())
+ this.pipes.forEach(p =>
+ p.dest.write(data) === false && this.pause())
} else if (ev === 'end') {
+ // only actual end gets this treatment
if (this[EMITTED_END] === true)
return
@@ -286,16 +385,17 @@ module.exports = class MiniPass extends EE {
this.pipes.forEach(p => {
p.dest.removeListener('drain', p.ondrain)
- if (!p.opts || p.opts.end !== false)
+ if (p.opts.end)
p.dest.end()
})
} else if (ev === 'close') {
this[CLOSED] = true
// don't emit close before 'end' and 'finish'
- if (!this[EMITTED_END])
+ if (!this[EMITTED_END] && !this[DESTROYED])
return
}
+ // TODO: replace with a spread operator when Node v4 support drops
const args = new Array(arguments.length)
args[0] = ev
args[1] = data
@@ -306,23 +406,47 @@ module.exports = class MiniPass extends EE {
}
try {
- const ret = super.emit.apply(this, args)
- if (ev === 'end' && ret === false)
- this[SILENT_END] = true
- return ret
+ return super.emit.apply(this, args)
} finally {
- if (ev !== 'end')
+ if (!isEndish(ev))
this[MAYBE_EMIT_END]()
+ else
+ this.removeAllListeners(ev)
}
}
// const all = await stream.collect()
collect () {
+ const buf = []
+ if (!this[OBJECTMODE])
+ buf.dataLength = 0
+ // set the promise first, in case an error is raised
+ // by triggering the flow here.
+ const p = this.promise()
+ this.on('data', c => {
+ buf.push(c)
+ if (!this[OBJECTMODE])
+ buf.dataLength += c.length
+ })
+ return p.then(() => buf)
+ }
+
+ // const data = await stream.concat()
+ concat () {
+ return this[OBJECTMODE]
+ ? Promise.reject(new Error('cannot concat in objectMode'))
+ : this.collect().then(buf =>
+ this[OBJECTMODE]
+ ? Promise.reject(new Error('cannot concat in objectMode'))
+ : this[ENCODING] ? buf.join('') : Buffer.concat(buf, buf.dataLength))
+ }
+
+ // stream.promise().then(() => done, er => emitted error)
+ promise () {
return new Promise((resolve, reject) => {
- const buf = []
- this.on('data', c => buf.push(c))
- this.on('end', () => resolve(buf))
- this.on('error', reject)
+ this.on(DESTROYED, () => reject(new Error('stream destroyed')))
+ this.on('end', () => resolve())
+ this.on('error', er => reject(er))
})
}
@@ -354,13 +478,14 @@ module.exports = class MiniPass extends EE {
this.removeListener('data', ondata)
resolve({ done: true })
}
+ const ondestroy = () => onerr(new Error('stream destroyed'))
return new Promise((res, rej) => {
reject = rej
resolve = res
+ this.once(DESTROYED, ondestroy)
this.once('error', onerr)
this.once('end', onend)
this.once('data', ondata)
- this.resume()
})
}
@@ -376,4 +501,38 @@ module.exports = class MiniPass extends EE {
}
return { next }
}
+
+ destroy (er) {
+ if (this[DESTROYED]) {
+ if (er)
+ this.emit('error', er)
+ else
+ this.emit(DESTROYED)
+ return this
+ }
+
+ this[DESTROYED] = true
+
+ // throw away all buffered data, it's never coming out
+ this.buffer = new Yallist()
+ this[BUFFERLENGTH] = 0
+
+ if (typeof this.close === 'function' && !this[CLOSED])
+ this.close()
+
+ if (er)
+ this.emit('error', er)
+ else // if no error to emit, still reject pending promises
+ this.emit(DESTROYED)
+
+ return this
+ }
+
+ static isStream (s) {
+ return !!s && (s instanceof Minipass || s instanceof Stream ||
+ s instanceof EE && (
+ typeof s.pipe === 'function' || // readable
+ (typeof s.write === 'function' && typeof s.end === 'function') // writable
+ ))
+ }
}