diff options
author | Feross Aboukhadijeh <feross@feross.org> | 2015-05-18 02:35:11 +0300 |
---|---|---|
committer | Feross Aboukhadijeh <feross@feross.org> | 2015-05-18 02:35:11 +0300 |
commit | 4309d70fb2473e0097bed3b0b7525579de4a52a4 (patch) | |
tree | 02aaadd20685170b4a7ce10d2d926b3f4e8fdbc4 | |
parent | 1dae3cba494c0371012a8b781573279af4c75fe9 (diff) |
remove torrent from client on fatal torrent error
also, fixed a bug with choking where we were inspecting wire._destroyed
instead of wire.destroyed.
-rw-r--r-- | index.js | 3 | ||||
-rw-r--r-- | lib/file-stream.js | 8 | ||||
-rw-r--r-- | lib/torrent.js | 46 |
3 files changed, 36 insertions, 21 deletions
@@ -168,6 +168,7 @@ WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { torrent.on('error', function (err) { self.emit('error', err, torrent) + self.remove(torrent) }) torrent.on('listening', function (port) { @@ -231,7 +232,7 @@ WebTorrent.prototype.seed = function (input, opts, onseed) { /** * Remove a torrent from the client. - * @param {string|Buffer} torrentId + * @param {string|Buffer|Torrent} torrentId * @param {function} cb */ WebTorrent.prototype.remove = function (torrentId, cb) { diff --git a/lib/file-stream.js b/lib/file-stream.js index 5ea5bc2..4d34dca 100644 --- a/lib/file-stream.js +++ b/lib/file-stream.js @@ -26,6 +26,7 @@ function FileStream (file, opts) { if (!opts.start) opts.start = 0 if (!opts.end) opts.end = file.length - 1 + self.destroyed = false self.length = opts.end - opts.start + 1 var offset = opts.start + file.offset @@ -40,7 +41,6 @@ function FileStream (file, opts) { self._missing = self.length self._reading = false self._notifying = false - self._destroyed = false self._criticalLength = Math.min((1024 * 1024 / pieceLength) | 0, 2) self._offset = offset - (self.startPiece * pieceLength) } @@ -71,7 +71,7 @@ FileStream.prototype.notify = function () { debug('after read %s (length %s) (err %s)', p, buffer.length, err && err.message) self._notifying = false - if (self._destroyed) return + if (self.destroyed) return if (err) { self._storage.emit('error', err) @@ -115,6 +115,6 @@ FileStream.prototype.pipe = function (dst) { FileStream.prototype.destroy = function () { var self = this - if (self._destroyed) return - self._destroyed = true + if (self.destroyed) return + self.destroyed = true } diff --git a/lib/torrent.js b/lib/torrent.js index 6a72736..8e61303 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -65,13 +65,13 @@ function Torrent (torrentId, opts) { self._rechokeIntervalId = null self.ready = false + self.destroyed = false self.files = [] self.metadata = null self.parsedTorrent = null self.storage = null self.numBlockedPeers = 0 self._amInterested = false - self._destroyed = false self._selections = [] self._critical = [] self._storageImpl = opts.storage || Storage @@ -94,14 +94,14 @@ function Torrent (torrentId, opts) { }, function (err, data) { if (err) { err = new Error('Error downloading torrent: ' + err.message) - return self.emit('error', err) + return self._onError(err) } onTorrentId(data) }) } else if (typeof fs.readFile === 'function') { // assume it's a filesystem path fs.readFile(torrentId, function (err, torrent) { - if (err) return self.emit('error', new Error('Invalid torrent identifier')) + if (err) return self._onError(new Error('Invalid torrent identifier')) onTorrentId(torrent) }) } else throw new Error('Invalid torrent identifier') @@ -110,13 +110,13 @@ function Torrent (torrentId, opts) { try { self.parsedTorrent = parseTorrent(torrentId) } catch (err) { - return self.emit('error', new Error('Malformed torrent data: ' + err.message)) + return self._onError(new Error('Malformed torrent data: ' + err.message)) } self.infoHash = self.parsedTorrent.infoHash if (!self.infoHash) { - return self.emit('error', new Error('Malformed torrent data: Missing info hash.')) + return self._onError(new Error('Malformed torrent data: Missing info hash.')) } if (self.parsedTorrent.name) self.name = self.parsedTorrent.name // preliminary name @@ -138,7 +138,8 @@ function Torrent (torrentId, opts) { self.swarm = new Swarm(self.infoHash, self.client.peerId, { handshake: { dht: !!self.client.dht } }) - reemit(self.swarm, self, ['warning', 'error']) + self.swarm.on('error', self._onError.bind(self)) + reemit(self.swarm, self, ['warning']) self.swarm.on('wire', self._onWire.bind(self)) // update overall client stats @@ -206,7 +207,7 @@ Object.defineProperty(Torrent.prototype, 'magnetURI', { Torrent.prototype._onSwarmListening = function () { var self = this - if (self._destroyed) return + if (self.destroyed) return if (self.swarm.server) self.client.torrentPort = self.swarm.address().port @@ -220,11 +221,12 @@ Torrent.prototype._onSwarmListening = function () { rtcConfig: self.client._rtcConfig, wrtc: self.client._wrtc }) + self.discovery.on('error', self._onError.bind(self)) self.discovery.setTorrent(self.infoHash) self.discovery.on('peer', self.addPeer.bind(self)) // expose discovery events - reemit(self.discovery, self, ['dhtAnnounce', 'warning', 'error']) + reemit(self.discovery, self, ['dhtAnnounce', 'warning']) // if full metadata was included in initial torrent id, use it if (self.parsedTorrent.info) self._onMetadata(self.parsedTorrent) @@ -237,7 +239,7 @@ Torrent.prototype._onSwarmListening = function () { */ Torrent.prototype._onMetadata = function (metadata) { var self = this - if (self.metadata || self._destroyed) return + if (self.metadata || self.destroyed) return debug('got metadata') if (metadata && metadata.infoHash) { @@ -249,7 +251,7 @@ Torrent.prototype._onMetadata = function (metadata) { try { self.parsedTorrent = parseTorrent(self.metadata) } catch (err) { - return self.emit('error', err) + return self._onError(err) } } @@ -336,9 +338,14 @@ Torrent.prototype._onMetadata = function (metadata) { */ Torrent.prototype.destroy = function (cb) { var self = this + if (self.destroyed) return + self.destroyed = true debug('destroy') - self._destroyed = true - clearInterval(self._rechokeIntervalId) + + if (self._rechokeIntervalId) { + clearInterval(self._rechokeIntervalId) + self._rechokeIntervalId = null + } var tasks = [] if (self.swarm) tasks.push(function (cb) { self.swarm.destroy(cb) }) @@ -514,7 +521,7 @@ Torrent.prototype._onWireWithMetadata = function (wire) { var timeoutMs = self.chokeTimeout function onChokeTimeout () { - if (self._destroyed || wire._destroyed) return + if (self.destroyed || wire.destroyed) return if (self.swarm.numQueued > 2 * (self.swarm.numConns - self.swarm.numPeers) && wire.amInterested) { @@ -592,7 +599,7 @@ Torrent.prototype._onWireWithMetadata = function (wire) { */ Torrent.prototype._onStorage = function () { var self = this - if (self._destroyed) return + if (self.destroyed) return debug('on storage') // allow writes to storage only after initial piece verification is finished @@ -631,7 +638,7 @@ Torrent.prototype._onStoragePiece = function (piece) { */ Torrent.prototype._updateSelections = function () { var self = this - if (!self.swarm || self._destroyed) return + if (!self.swarm || self.destroyed) return if (!self.metadata) return self.once('metadata', self._updateSelections.bind(self)) process.nextTick(self._gcSelections.bind(self)) @@ -692,7 +699,7 @@ Torrent.prototype._updateInterest = function () { */ Torrent.prototype._update = function () { var self = this - if (self._destroyed) return + if (self.destroyed) return // update wires in random order for better request distribution randomizedForEach(self.swarm.wires, self._updateWire.bind(self)) @@ -1043,6 +1050,13 @@ Torrent.prototype.createServer = function (opts) { } } +Torrent.prototype._onError = function (err) { + var self = this + debug('torrent error: %s', err.message || err) + self.emit('error', err) + self.destroy() +} + function getPipelineLength (wire, duration) { return Math.ceil(2 + duration * wire.downloadSpeed() / Storage.BLOCK_LENGTH) } |