diff options
Diffstat (limited to 'lib/file-stream.js')
-rw-r--r-- | lib/file-stream.js | 26 |
1 files changed, 11 insertions, 15 deletions
diff --git a/lib/file-stream.js b/lib/file-stream.js index a3682b1..c344913 100644 --- a/lib/file-stream.js +++ b/lib/file-stream.js @@ -1,6 +1,5 @@ -const stream = require('stream') +const { Readable } = require('streamx') const debugFactory = require('debug') -const eos = require('end-of-stream') const debug = debugFactory('webtorrent:file-stream') @@ -12,9 +11,9 @@ const debug = debugFactory('webtorrent:file-stream') * @param {number} opts.start stream slice of file, starting from this byte (inclusive) * @param {number} opts.end stream slice of file, ending with this byte (inclusive) */ -class FileStream extends stream.Readable { +class FileStream extends Readable { constructor (file, opts) { - super(opts) + super(opts ?? {}) this._torrent = file._torrent @@ -39,26 +38,22 @@ class FileStream extends stream.Readable { this._torrent.select(this._startPiece, this._endPiece, true, () => { this._notify() }) - - // Ensure that cleanup happens even if destroy() is never called (readable-stream v3 currently doesn't call it automaticallly) - eos(this, (err) => { - this.destroy(err) - }) } - _read () { + _read (cb) { if (this._reading) return this._reading = true - this._notify() + this._notify(cb) } - _notify () { - if (!this._reading || this._missing === 0) return + _notify (cb = () => {}) { + if (!this._reading || this._missing === 0) return cb() if (!this._torrent.bitfield.get(this._piece)) { + cb() return this._torrent.critical(this._piece, this._piece + this._criticalLength) } - if (this._notifying) return + if (this._notifying) return cb() this._notifying = true if (this._torrent.destroyed) return this.destroy(new Error('Torrent removed')) @@ -92,11 +87,12 @@ class FileStream extends stream.Readable { this.push(buffer) if (this._missing === 0) this.push(null) + cb() }) this._piece += 1 } - _destroy (err, cb) { + _destroy (cb, err) { if (!this._torrent.destroyed) { this._torrent.deselect(this._startPiece, this._endPiece, true) } |