From 8b97ee8cc18b05e0d20135ea8f1651e97bb65c6f Mon Sep 17 00:00:00 2001 From: ThaUnknown <6506529+ThaUnknown@users.noreply.github.com> Date: Sun, 26 Jun 2022 18:32:59 +0200 Subject: fix: use streamx instead of stream --- lib/file-stream.js | 24 ++++++++++-------------- lib/file.js | 2 +- package.json | 1 + 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/lib/file-stream.js b/lib/file-stream.js index a3682b1..cb6eb63 100644 --- a/lib/file-stream.js +++ b/lib/file-stream.js @@ -1,6 +1,5 @@ -const stream = require('stream') +const { Readable } = require('streamx') const debugFactory = require('debug') -const eos = require('end-of-stream') const debug = debugFactory('webtorrent:file-stream') @@ -12,7 +11,7 @@ const debug = debugFactory('webtorrent:file-stream') * @param {number} opts.start stream slice of file, starting from this byte (inclusive) * @param {number} opts.end stream slice of file, ending with this byte (inclusive) */ -class FileStream extends stream.Readable { +class FileStream extends Readable { constructor (file, opts) { super(opts) @@ -39,26 +38,22 @@ class FileStream extends stream.Readable { this._torrent.select(this._startPiece, this._endPiece, true, () => { this._notify() }) - - // Ensure that cleanup happens even if destroy() is never called (readable-stream v3 currently doesn't call it automaticallly) - eos(this, (err) => { - this.destroy(err) - }) } - _read () { + _read (cb) { if (this._reading) return this._reading = true - this._notify() + this._notify(cb) } - _notify () { - if (!this._reading || this._missing === 0) return + _notify (cb = () => {}) { + if (!this._reading || this._missing === 0) return cb() if (!this._torrent.bitfield.get(this._piece)) { + cb() return this._torrent.critical(this._piece, this._piece + this._criticalLength) } - if (this._notifying) return + if (this._notifying) return cb() this._notifying = true if (this._torrent.destroyed) return this.destroy(new Error('Torrent removed')) @@ -92,11 +87,12 @@ class FileStream extends stream.Readable { this.push(buffer) if (this._missing === 0) this.push(null) + cb() }) this._piece += 1 } - _destroy (err, cb) { + _destroy (cb, err) { if (!this._torrent.destroyed) { this._torrent.deselect(this._startPiece, this._endPiece, true) } diff --git a/lib/file.js b/lib/file.js index c5e5f1e..35d9438 100644 --- a/lib/file.js +++ b/lib/file.js @@ -1,5 +1,5 @@ const EventEmitter = require('events') -const { PassThrough } = require('stream') +const { PassThrough } = require('streamx') const path = require('path') const render = require('render-media') const streamToBlob = require('stream-to-blob') diff --git a/package.json b/package.json index b4b8d73..6175ad1 100644 --- a/package.json +++ b/package.json @@ -74,6 +74,7 @@ "stream-to-blob": "^2.0.1", "stream-to-blob-url": "^3.0.2", "stream-with-known-length-to-buffer": "^1.0.4", + "streamx": "^2.12.4", "throughput": "^1.0.1", "torrent-discovery": "^9.4.13", "torrent-piece": "^2.0.1", -- cgit v1.2.3