diff options
author | Diego RodrÃguez Baquero <diegorbaquero@gmail.com> | 2018-08-24 03:19:21 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-24 03:19:21 +0300 |
commit | 92ec1a249d5374178557d80c1fc013ff2c76440d (patch) | |
tree | 6ded94ab17434a770d353ba424fab90cecf29719 /lib | |
parent | f0e004b902c8c60a66b3ec45c3caddf81e18550a (diff) | |
parent | afbef26d5fafa745dc0793ed0d6a5761f5f0eb89 (diff) |
Merge pull request #1480 from webtorrent/modernize
Modernize some lib files
Diffstat (limited to 'lib')
-rw-r--r-- | lib/server.js | 75 | ||||
-rw-r--r-- | lib/tcp-pool.js | 188 | ||||
-rw-r--r-- | lib/torrent.js | 2697 |
3 files changed, 1454 insertions, 1506 deletions
diff --git a/lib/server.js b/lib/server.js index 8ea9a48..60595a8 100644 --- a/lib/server.js +++ b/lib/server.js @@ -1,44 +1,41 @@ -module.exports = Server - -var arrayRemove = require('unordered-array-remove') -var http = require('http') -var mime = require('mime') -var pump = require('pump') -var rangeParser = require('range-parser') -var url = require('url') - -function Server (torrent, opts) { - var server = http.createServer() - if (!opts) opts = {} +const arrayRemove = require('unordered-array-remove') +const http = require('http') +const mime = require('mime') +const pump = require('pump') +const rangeParser = require('range-parser') +const url = require('url') + +function Server (torrent, opts = {}) { + const server = http.createServer() if (!opts.origin) opts.origin = '*' // allow all origins by default - var sockets = [] - var pendingReady = [] - var closed = false + const sockets = [] + const pendingReady = [] + let closed = false server.on('connection', onConnection) server.on('request', onRequest) - var _close = server.close - server.close = function (cb) { + const _close = server.close + server.close = cb => { closed = true server.removeListener('connection', onConnection) server.removeListener('request', onRequest) while (pendingReady.length) { - var onReady = pendingReady.pop() + const onReady = pendingReady.pop() torrent.removeListener('ready', onReady) } torrent = null _close.call(server, cb) } - server.destroy = function (cb) { - sockets.forEach(function (socket) { + server.destroy = cb => { + sockets.forEach(socket => { socket.destroy() }) // Only call `server.close` if user has not called it already - if (!cb) cb = function () {} + if (!cb) cb = () => {} if (closed) process.nextTick(cb) else server.close(cb) } @@ -68,13 +65,13 @@ function Server (torrent, opts) { function onConnection (socket) { socket.setTimeout(36000000) sockets.push(socket) - socket.once('close', function () { + socket.once('close', () => { arrayRemove(sockets, sockets.indexOf(socket)) }) } function onRequest (req, res) { - var pathname = url.parse(req.url).pathname + const pathname = url.parse(req.url).pathname if (pathname === '/favicon.ico') { return serve404Page() @@ -132,12 +129,12 @@ function Server (torrent, opts) { return serveIndexPage() } - var index = Number(pathname.split('/')[1]) + const index = Number(pathname.split('/')[1]) if (Number.isNaN(index) || index >= torrent.files.length) { return serve404Page() } - var file = torrent.files[index] + const file = torrent.files[index] serveFile(file) } @@ -145,14 +142,11 @@ function Server (torrent, opts) { res.statusCode = 200 res.setHeader('Content-Type', 'text/html') - var listHtml = torrent.files.map(function (file, i) { - return '<li><a download="' + file.name + '" href="/' + i + '/' + file.name + '">' + file.path + '</a> ' + - '(' + file.length + ' bytes)</li>' - }).join('<br>') + const listHtml = torrent.files.map((file, i) => `<li><a download="${file.name}" href="/${i}/${file.name}">${file.path}</a> (${file.length} bytes)</li>`).join('<br>') - var html = getPageHTML( - torrent.name + ' - WebTorrent', - '<h1>' + torrent.name + '</h1><ol>' + listHtml + '</ol>' + const html = getPageHTML( + `${torrent.name} - WebTorrent`, + `<h1>${torrent.name}</h1><ol>${listHtml}</ol>` ) res.end(html) } @@ -161,7 +155,7 @@ function Server (torrent, opts) { res.statusCode = 404 res.setHeader('Content-Type', 'text/html') - var html = getPageHTML('404 - Not Found', '<h1>404 - Not Found</h1>') + const html = getPageHTML('404 - Not Found', '<h1>404 - Not Found</h1>') res.end(html) } @@ -175,7 +169,7 @@ function Server (torrent, opts) { // Set name of file (for "Save Page As..." dialog) res.setHeader( 'Content-Disposition', - 'inline; filename*=UTF-8\'\'' + encodeRFC5987(file.name) + `inline; filename*=UTF-8''${encodeRFC5987(file.name)}` ) // Support DLNA streaming @@ -187,7 +181,7 @@ function Server (torrent, opts) { // `rangeParser` returns an array of ranges, or an error code (number) if // there was an error parsing the range. - var range = rangeParser(file.length, req.headers.range || '') + let range = rangeParser(file.length, req.headers.range || '') if (Array.isArray(range)) { res.statusCode = 206 // indicates that range-request was understood @@ -197,7 +191,7 @@ function Server (torrent, opts) { res.setHeader( 'Content-Range', - 'bytes ' + range.start + '-' + range.end + '/' + file.length + `bytes ${range.start}-${range.end}/${file.length}` ) res.setHeader('Content-Length', range.end - range.start + 1) } else { @@ -215,7 +209,7 @@ function Server (torrent, opts) { function serveMethodNotAllowed () { res.statusCode = 405 res.setHeader('Content-Type', 'text/html') - var html = getPageHTML('405 - Method Not Allowed', '<h1>405 - Method Not Allowed</h1>') + const html = getPageHTML('405 - Method Not Allowed', '<h1>405 - Method Not Allowed</h1>') res.end(html) } } @@ -224,10 +218,7 @@ function Server (torrent, opts) { } function getPageHTML (title, pageHtml) { - return '<!DOCTYPE html><html lang="en"><head>' + - '<meta charset="utf-8">' + - '<title>' + title + '</title>' + - '</head><body>' + pageHtml + '</body></html>' + return `<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><title>${title}</title></head><body>${pageHtml}</body></html>` } // From https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/encodeURIComponent @@ -241,3 +232,5 @@ function encodeRFC5987 (str) { // so we can allow for a little better readability over the wire: |`^ .replace(/%(?:7C|60|5E)/g, unescape) } + +module.exports = Server diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js index 7625d8d..1094c4f 100644 --- a/lib/tcp-pool.js +++ b/lib/tcp-pool.js @@ -1,10 +1,8 @@ -module.exports = TCPPool - -var arrayRemove = require('unordered-array-remove') -var debug = require('debug')('webtorrent:tcp-pool') -var net = require('net') // browser exclude +const arrayRemove = require('unordered-array-remove') +const debug = require('debug')('webtorrent:tcp-pool') +const net = require('net') // browser exclude -var Peer = require('./peer') +const Peer = require('./peer') /** * TCPPool @@ -15,114 +13,116 @@ var Peer = require('./peer') * * @param {number} port */ -function TCPPool (client) { - var self = this - debug('create tcp pool (port %s)', client.torrentPort) - - self.server = net.createServer() - self._client = client +class TCPPool { + constructor (client) { + debug('create tcp pool (port %s)', client.torrentPort) - // Temporarily store incoming connections so they can be destroyed if the server is - // closed before the connection is passed off to a Torrent. - self._pendingConns = [] + this.server = net.createServer() + this._client = client - self._onConnectionBound = function (conn) { - self._onConnection(conn) - } + // Temporarily store incoming connections so they can be destroyed if the server is + // closed before the connection is passed off to a Torrent. + this._pendingConns = [] - self._onListening = function () { - self._client._onListening() - } + this._onConnectionBound = conn => { + this._onConnection(conn) + } - self._onError = function (err) { - self._client._destroy(err) - } + this._onListening = () => { + this._client._onListening() + } - self.server.on('connection', self._onConnectionBound) - self.server.on('listening', self._onListening) - self.server.on('error', self._onError) + this._onError = err => { + this._client._destroy(err) + } - self.server.listen(client.torrentPort) -} + this.server.on('connection', this._onConnectionBound) + this.server.on('listening', this._onListening) + this.server.on('error', this._onError) -/** - * Destroy this TCP pool. - * @param {function} cb - */ -TCPPool.prototype.destroy = function (cb) { - var self = this - debug('destroy tcp pool') - - self.server.removeListener('connection', self._onConnectionBound) - self.server.removeListener('listening', self._onListening) - self.server.removeListener('error', self._onError) - - // Destroy all open connection objects so server can close gracefully without waiting - // for connection timeout or remote peer to disconnect. - self._pendingConns.forEach(function (conn) { - conn.on('error', noop) - conn.destroy() - }) - - try { - self.server.close(cb) - } catch (err) { - if (cb) process.nextTick(cb) + this.server.listen(client.torrentPort) } - self.server = null - self._client = null - self._pendingConns = null -} + /** + * Destroy this TCP pool. + * @param {function} cb + */ + destroy (cb) { + debug('destroy tcp pool') + + this.server.removeListener('connection', this._onConnectionBound) + this.server.removeListener('listening', this._onListening) + this.server.removeListener('error', this._onError) + + // Destroy all open connection objects so server can close gracefully without waiting + // for connection timeout or remote peer to disconnect. + this._pendingConns.forEach(conn => { + conn.on('error', noop) + conn.destroy() + }) + + try { + this.server.close(cb) + } catch (err) { + if (cb) process.nextTick(cb) + } -/** - * On incoming connections, we expect the remote peer to send a handshake first. Based - * on the infoHash in that handshake, route the peer to the right swarm. - */ -TCPPool.prototype._onConnection = function (conn) { - var self = this - - // If the connection has already been closed before the `connect` event is fired, - // then `remoteAddress` will not be available, and we can't use this connection. - // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566 - // - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398 - if (!conn.remoteAddress) { - conn.on('error', noop) - conn.destroy() - return + this.server = null + this._client = null + this._pendingConns = null } - self._pendingConns.push(conn) - conn.once('close', cleanupPending) + /** + * On incoming connections, we expect the remote peer to send a handshake first. Based + * on the infoHash in that handshake, route the peer to the right swarm. + */ + _onConnection (conn) { + const self = this + + // If the connection has already been closed before the `connect` event is fired, + // then `remoteAddress` will not be available, and we can't use this connection. + // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566 + // - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398 + if (!conn.remoteAddress) { + conn.on('error', noop) + conn.destroy() + return + } + + self._pendingConns.push(conn) + conn.once('close', cleanupPending) - var peer = Peer.createTCPIncomingPeer(conn) + const peer = Peer.createTCPIncomingPeer(conn) - var wire = peer.wire - wire.once('handshake', onHandshake) + const wire = peer.wire + wire.once('handshake', onHandshake) - function onHandshake (infoHash, peerId) { - cleanupPending() + function onHandshake (infoHash, peerId) { + cleanupPending() - var torrent = self._client.get(infoHash) - if (torrent) { - peer.swarm = torrent - torrent._addIncomingPeer(peer) - peer.onHandshake(infoHash, peerId) - } else { - var err = new Error( - 'Unexpected info hash ' + infoHash + ' from incoming peer ' + peer.id - ) - peer.destroy(err) + const torrent = self._client.get(infoHash) + if (torrent) { + peer.swarm = torrent + torrent._addIncomingPeer(peer) + peer.onHandshake(infoHash, peerId) + } else { + const err = new Error( + `Unexpected info hash ${infoHash} from incoming peer ${peer.id}` + ) + peer.destroy(err) + } } - } - function cleanupPending () { - conn.removeListener('close', cleanupPending) - wire.removeListener('handshake', onHandshake) - if (self._pendingConns) { - arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn)) + function cleanupPending () { + conn.removeListener('close', cleanupPending) + wire.removeListener('handshake', onHandshake) + if (self._pendingConns) { + arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn)) + } } } } function noop () {} + +module.exports = TCPPool diff --git a/lib/torrent.js b/lib/torrent.js index ea422e0..a32326a 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -1,613 +1,572 @@ /* global URL, Blob */ -module.exports = Torrent - -var addrToIPPort = require('addr-to-ip-port') -var BitField = require('bitfield') -var ChunkStoreWriteStream = require('chunk-store-stream/write') -var debug = require('debug')('webtorrent:torrent') -var Discovery = require('torrent-discovery') -var EventEmitter = require('events').EventEmitter -var extend = require('xtend') -var extendMutable = require('xtend/mutable') -var fs = require('fs') -var FSChunkStore = require('fs-chunk-store') // browser: `memory-chunk-store` -var get = require('simple-get') -var ImmediateChunkStore = require('immediate-chunk-store') -var inherits = require('inherits') -var MultiStream = require('multistream') -var net = require('net') // browser exclude -var os = require('os') // browser exclude -var parallel = require('run-parallel') -var parallelLimit = require('run-parallel-limit') -var parseTorrent = require('parse-torrent') -var path = require('path') -var Piece = require('torrent-piece') -var pump = require('pump') -var randomIterate = require('random-iterate') -var sha1 = require('simple-sha1') -var speedometer = require('speedometer') -var uniq = require('uniq') -var utMetadata = require('ut_metadata') -var utPex = require('ut_pex') // browser exclude -var parseRange = require('parse-numeric-range') - -var File = require('./file') -var Peer = require('./peer') -var RarityMap = require('./rarity-map') -var Server = require('./server') // browser exclude - -var MAX_BLOCK_LENGTH = 128 * 1024 -var PIECE_TIMEOUT = 30000 -var CHOKE_TIMEOUT = 5000 -var SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH - -var PIPELINE_MIN_DURATION = 0.5 -var PIPELINE_MAX_DURATION = 1 - -var RECHOKE_INTERVAL = 10000 // 10 seconds -var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds - -var FILESYSTEM_CONCURRENCY = 2 - -var RECONNECT_WAIT = [ 1000, 5000, 15000 ] - -var VERSION = require('../package.json').version -var USER_AGENT = 'WebTorrent/' + VERSION + ' (https://webtorrent.io)' - -var TMP +const addrToIPPort = require('addr-to-ip-port') +const BitField = require('bitfield') +const ChunkStoreWriteStream = require('chunk-store-stream/write') +const debug = require('debug')('webtorrent:torrent') +const Discovery = require('torrent-discovery') +const EventEmitter = require('events').EventEmitter +const extend = require('xtend') +const extendMutable = require('xtend/mutable') +const fs = require('fs') +const FSChunkStore = require('fs-chunk-store') // browser: `memory-chunk-store` +const get = require('simple-get') +const ImmediateChunkStore = require('immediate-chunk-store') +const MultiStream = require('multistream') +const net = require('net') // browser exclude +const os = require('os') // browser exclude +const parallel = require('run-parallel') +const parallelLimit = require('run-parallel-limit') +const parseTorrent = require('parse-torrent') +const path = require('path') +const Piece = require('torrent-piece') +const pump = require('pump') +const randomIterate = require('random-iterate') +const sha1 = require('simple-sha1') +const speedometer = require('speedometer') +const uniq = require('uniq') +const utMetadata = require('ut_metadata') +const utPex = require('ut_pex') // browser exclude +const parseRange = require('parse-numeric-range') + +const File = require('./file') +const Peer = require('./peer') +const RarityMap = require('./rarity-map') +const Server = require('./server') // browser exclude + +const MAX_BLOCK_LENGTH = 128 * 1024 +const PIECE_TIMEOUT = 30000 +const CHOKE_TIMEOUT = 5000 +const SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH + +const PIPELINE_MIN_DURATION = 0.5 +const PIPELINE_MAX_DURATION = 1 + +const RECHOKE_INTERVAL = 10000 // 10 seconds +const RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds + +const FILESYSTEM_CONCURRENCY = 2 + +const RECONNECT_WAIT = [ 1000, 5000, 15000 ] + +const VERSION = require('../package.json').version +const USER_AGENT = `WebTorrent/${VERSION} (https://webtorrent.io)` + +let TMP try { TMP = path.join(fs.statSync('/tmp') && '/tmp', 'webtorrent') } catch (err) { TMP = path.join(typeof os.tmpdir === 'function' ? os.tmpdir() : '/', 'webtorrent') } -inherits(Torrent, EventEmitter) +class Torrent extends EventEmitter { + constructor (torrentId, client, opts) { + super() -function Torrent (torrentId, client, opts) { - EventEmitter.call(this) + this._debugId = 'unknown infohash' + this.client = client - this._debugId = 'unknown infohash' - this.client = client + this.announce = opts.announce + this.urlList = opts.urlList - this.announce = opts.announce - this.urlList = opts.urlList + this.path = opts.path + this.skipVerify = !!opts.skipVerify + this._store = opts.store || FSChunkStore + this._getAnnounceOpts = opts.getAnnounceOpts - this.path = opts.path - this.skipVerify = !!opts.skipVerify - this._store = opts.store || FSChunkStore - this._getAnnounceOpts = opts.getAnnounceOpts + this.strategy = opts.strategy || 'sequential' - this.strategy = opts.strategy || 'sequential' + this.maxWebConns = opts.maxWebConns || 4 - this.maxWebConns = opts.maxWebConns || 4 + this._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0) + ? 0 + : (+opts.uploads || 10) + this._rechokeOptimisticWire = null + this._rechokeOptimisticTime = 0 + this._rechokeIntervalId = null - this._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0) - ? 0 - : (+opts.uploads || 10) - this._rechokeOptimisticWire = null - this._rechokeOptimisticTime = 0 - this._rechokeIntervalId = null + this.ready = false + this.destroyed = false + this.paused = false + this.done = false - this.ready = false - this.destroyed = false - this.paused = false - this.done = false + this.metadata = null + this.store = null + this.files = [] + this.pieces = [] - this.metadata = null - this.store = null - this.files = [] - this.pieces = [] + this._amInterested = false + this._selections = [] + this._critical = [] - this._amInterested = false - this._selections = [] - this._critical = [] + this.wires = [] // open wires (added *after* handshake) - this.wires = [] // open wires (added *after* handshake) + this._queue = [] // queue of outgoing tcp peers to connect to + this._peers = {} // connected peers (addr/peerId -> Peer) + this._peersLength = 0 // number of elements in `this._peers` (cache, for perf) - this._queue = [] // queue of outgoing tcp peers to connect to - this._peers = {} // connected peers (addr/peerId -> Peer) - this._peersLength = 0 // number of elements in `this._peers` (cache, for perf) + // stats + this.received = 0 + this.uploaded = 0 + this._downloadSpeed = speedometer() + this._uploadSpeed = speedometer() - // stats - this.received = 0 - this.uploaded = 0 - this._downloadSpeed = speedometer() - this._uploadSpeed = speedometer() + // for cleanup + this._servers = [] + this._xsRequests = [] - // for cleanup - this._servers = [] - this._xsRequests = [] + // TODO: remove this and expose a hook instead + // optimization: don't recheck every file if it hasn't changed + this._fileModtimes = opts.fileModtimes - // TODO: remove this and expose a hook instead - // optimization: don't recheck every file if it hasn't changed - this._fileModtimes = opts.fileModtimes + if (torrentId !== null) this._onTorrentId(torrentId) - if (torrentId !== null) this._onTorrentId(torrentId) - - this._debug('new torrent') -} + this._debug('new torrent') + } -Object.defineProperty(Torrent.prototype, 'timeRemaining', { - get: function () { + get timeRemaining () { if (this.done) return 0 if (this.downloadSpeed === 0) return Infinity return ((this.length - this.downloaded) / this.downloadSpeed) * 1000 } -}) -Object.defineProperty(Torrent.prototype, 'downloaded', { - get: function () { + get downloaded () { if (!this.bitfield) return 0 - var downloaded = 0 - for (var index = 0, len = this.pieces.length; index < len; ++index) { + let downloaded = 0 + for (let index = 0, len = this.pieces.length; index < len; ++index) { if (this.bitfield.get(index)) { // verified data downloaded += (index === len - 1) ? this.lastPieceLength : this.pieceLength } else { // "in progress" data - var piece = this.pieces[index] + const piece = this.pieces[index] downloaded += (piece.length - piece.missing) } } return downloaded } -}) - -// TODO: re-enable this. The number of missing pieces. Used to implement 'end game' mode. -// Object.defineProperty(Storage.prototype, 'numMissing', { -// get: function () { -// var self = this -// var numMissing = self.pieces.length -// for (var index = 0, len = self.pieces.length; index < len; index++) { -// numMissing -= self.bitfield.get(index) -// } -// return numMissing -// } -// }) - -Object.defineProperty(Torrent.prototype, 'downloadSpeed', { - get: function () { return this._downloadSpeed() } -}) - -Object.defineProperty(Torrent.prototype, 'uploadSpeed', { - get: function () { return this._uploadSpeed() } -}) - -Object.defineProperty(Torrent.prototype, 'progress', { - get: function () { return this.length ? this.downloaded / this.length : 0 } -}) - -Object.defineProperty(Torrent.prototype, 'ratio', { - get: function () { return this.uploaded / (this.received || 1) } -}) - -Object.defineProperty(Torrent.prototype, 'numPeers', { - get: function () { return this.wires.length } -}) - -Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', { - get: function () { + + // TODO: re-enable this. The number of missing pieces. Used to implement 'end game' mode. + // Object.defineProperty(Storage.prototype, 'numMissing', { + // get: function () { + // var self = this + // var numMissing = self.pieces.length + // for (var index = 0, len = self.pieces.length; index < len; index++) { + // numMissing -= self.bitfield.get(index) + // } + // return numMissing + // } + // }) + + get downloadSpeed () { return this._downloadSpeed() } + + get uploadSpeed () { return this._uploadSpeed() } + + get progress () { return this.length ? this.downloaded / this.length : 0 } + + get ratio () { return this.uploaded / (this.received || 1) } + + get numPeers () { return this.wires.length } + + get torrentFileBlobURL () { if (typeof window === 'undefined') throw new Error('browser-only property') if (!this.torrentFile) return null return URL.createObjectURL( new Blob([ this.torrentFile ], { type: 'application/x-bittorrent' }) ) } -}) -Object.defineProperty(Torrent.prototype, '_numQueued', { - get: function () { + get _numQueued () { return this._queue.length + (this._peersLength - this._numConns) } -}) -Object.defineProperty(Torrent.prototype, '_numConns', { - get: function () { - var self = this - var numConns = 0 - for (var id in self._peers) { + get _numConns () { + const self = this + let numConns = 0 + for (const id in self._peers) { if (self._peers[id].connected) numConns += 1 } return numConns } -}) -// TODO: remove in v1 -Object.defineProperty(Torrent.prototype, 'swarm', { - get: function () { + // TODO: remove in v1 + get swarm () { console.warn('WebTorrent: `torrent.swarm` is deprecated. Use `torrent` directly instead.') return this } -}) - -Torrent.prototype._onTorrentId = function (torrentId) { - var self = this - if (self.destroyed) return - - var parsedTorrent - try { parsedTorrent = parseTorrent(torrentId) } catch (err) {} - if (parsedTorrent) { - // Attempt to set infoHash property synchronously - self.infoHash = parsedTorrent.infoHash - self._debugId = parsedTorrent.infoHash.toString('hex').substring(0, 7) - process.nextTick(function () { - if (self.destroyed) return - self._onParsedTorrent(parsedTorrent) - }) - } else { - // If torrentId failed to parse, it could be in a form that requires an async - // operation, i.e. http/https link, filesystem path, or Blob. - parseTorrent.remote(torrentId, function (err, parsedTorrent) { - if (self.destroyed) return - if (err) return self._destroy(err) - self._onParsedTorrent(parsedTorrent) - }) - } -} - -Torrent.prototype._onParsedTorrent = function (parsedTorrent) { - var self = this - if (self.destroyed) return - self._processParsedTorrent(parsedTorrent) + _onTorrentId (torrentId) { + const self = this + if (self.destroyed) return - if (!self.infoHash) { - return self._destroy(new Error('Malformed torrent data: No info hash')) + let parsedTorrent + try { parsedTorrent = parseTorrent(torrentId) } catch (err) {} + if (parsedTorrent) { + // Attempt to set infoHash property synchronously + self.infoHash = parsedTorrent.infoHash + self._debugId = parsedTorrent.infoHash.toString('hex').substring(0, 7) + process.nextTick(() => { + if (self.destroyed) return + self._onParsedTorrent(parsedTorrent) + }) + } else { + // If torrentId failed to parse, it could be in a form that requires an async + // operation, i.e. http/https link, filesystem path, or Blob. + parseTorrent.remote(torrentId, (err, parsedTorrent) => { + if (self.destroyed) return + if (err) return self._destroy(err) + self._onParsedTorrent(parsedTorrent) + }) + } } - if (!self.path) self.path = path.join(TMP, self.infoHash) + _onParsedTorrent (parsedTorrent) { + const self = this + if (self.destroyed) return - self._rechokeIntervalId = setInterval(function () { - self._rechoke() - }, RECHOKE_INTERVAL) - if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref() + self._processParsedTorrent(parsedTorrent) - // Private 'infoHash' event allows client.add to check for duplicate torrents and - // destroy them before the normal 'infoHash' event is emitted. Prevents user - // applications from needing to deal with duplicate 'infoHash' events. - self.emit('_infoHash', self.infoHash) - if (self.destroyed) return + if (!self.infoHash) { + return self._destroy(new Error('Malformed torrent data: No info hash')) + } - self.emit('infoHash', self.infoHash) - if (self.destroyed) return // user might destroy torrent in event handler + if (!self.path) self.path = path.join(TMP, self.infoHash) - if (self.client.listening) { - self._onListening() - } else { - self.client.once('listening', function () { - self._onListening() - }) - } -} + self._rechokeIntervalId = setInterval(() => { + self._rechoke() + }, RECHOKE_INTERVAL) + if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref() -Torrent.prototype._processParsedTorrent = function (parsedTorrent) { - this._debugId = parsedTorrent.infoHash.toString('hex').substring(0, 7) + // Private 'infoHash' event allows client.add to check for duplicate torrents and + // destroy them before the normal 'infoHash' event is emitted. Prevents user + // applications from needing to deal with duplicate 'infoHash' events. + self.emit('_infoHash', self.infoHash) + if (self.destroyed) return - if (this.announce) { - // Allow specifying trackers via `opts` parameter - parsedTorrent.announce = parsedTorrent.announce.concat(this.announce) - } + self.emit('infoHash', self.infoHash) + if (self.destroyed) return // user might destroy torrent in event handler - if (this.client.tracker && global.WEBTORRENT_ANNOUNCE && !this.private) { - // So `webtorrent-hybrid` can force specific trackers to be used - parsedTorrent.announce = parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE) + if (self.client.listening) { + self._onListening() + } else { + self.client.once('listening', () => { + self._onListening() + }) + } } - if (this.urlList) { - // Allow specifying web seeds via `opts` parameter - parsedTorrent.urlList = parsedTorrent.urlList.concat(this.urlList) - } + _processParsedTorrent (parsedTorrent) { + this._debugId = parsedTorrent.infoHash.toString('hex').substring(0, 7) - uniq(parsedTorrent.announce) - uniq(parsedTorrent.urlList) + if (this.announce) { + // Allow specifying trackers via `opts` parameter + parsedTorrent.announce = parsedTorrent.announce.concat(this.announce) + } - extendMutable(this, parsedTorrent) + if (this.client.tracker && global.WEBTORRENT_ANNOUNCE && !this.private) { + // So `webtorrent-hybrid` can force specific trackers to be used + parsedTorrent.announce = parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE) + } - this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent) - this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent) -} + if (this.urlList) { + // Allow specifying web seeds via `opts` parameter + parsedTorrent.urlList = parsedTorrent.urlList.concat(this.urlList) + } -Torrent.prototype._onListening = function () { - var self = this - if (self.discovery || self.destroyed) return - - var trackerOpts = self.client.tracker - if (trackerOpts) { - trackerOpts = extend(self.client.tracker, { - getAnnounceOpts: function () { - var opts = { - uploaded: self.uploaded, - downloaded: self.downloaded, - left: Math.max(self.length - self.downloaded, 0) - } - if (self.client.tracker.getAnnounceOpts) { - extendMutable(opts, self.client.tracker.getAnnounceOpts()) - } - if (self._getAnnounceOpts) { - // TODO: consider deprecating this, as it's redundant with the former case - extendMutable(opts, self._getAnnounceOpts()) - } - return opts - } - }) - } + uniq(parsedTorrent.announce) + uniq(parsedTorrent.urlList) - // begin discovering peers via DHT and trackers - self.discovery = new Discovery({ - infoHash: self.infoHash, - announce: self.announce, - peerId: self.client.peerId, - dht: !self.private && self.client.dht, - tracker: trackerOpts, - port: self.client.torrentPort, - userAgent: USER_AGENT - }) - - self.discovery.on('error', onError) - self.discovery.on('peer', onPeer) - self.discovery.on('trackerAnnounce', onTrackerAnnounce) - self.discovery.on('dhtAnnounce', onDHTAnnounce) - self.discovery.on('warning', onWarning) - - function onError (err) { - self._destroy(err) - } + extendMutable(this, parsedTorrent) - function onPeer (peer) { - // Don't create new outgoing TCP connections when torrent is done - if (typeof peer === 'string' && self.done) return - self.addPeer(peer) + this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent) + this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent) } - function onTrackerAnnounce () { - self.emit('trackerAnnounce') - if (self.numPeers === 0) self.emit('noPeers', 'tracker') - } + _onListening () { + const self = this + if (self.discovery || self.destroyed) return - function onDHTAnnounce () { - self.emit('dhtAnnounce') - if (self.numPeers === 0) self.emit('noPeers', 'dht') - } + let trackerOpts = self.client.tracker + if (trackerOpts) { + trackerOpts = extend(self.client.tracker, { + getAnnounceOpts () { + const opts = { + uploaded: self.uploaded, + downloaded: self.downloaded, + left: Math.max(self.length - self.downloaded, 0) + } + if (self.client.tracker.getAnnounceOpts) { + extendMutable(opts, self.client.tracker.getAnnounceOpts()) + } + if (self._getAnnounceOpts) { + // TODO: consider deprecating this, as it's redundant with the former case + extendMutable(opts, self._getAnnounceOpts()) + } + return opts + } + }) + } - function onWarning (err) { - self.emit('warning', err) - } + // begin discovering peers via DHT and trackers + self.discovery = new Discovery({ + infoHash: self.infoHash, + announce: self.announce, + peerId: self.client.peerId, + dht: !self.private && self.client.dht, + tracker: trackerOpts, + port: self.client.torrentPort, + userAgent: USER_AGENT + }) - if (self.info) { - // if full metadata was included in initial torrent id, use it immediately. Otherwise, - // wait for torrent-discovery to find peers and ut_metadata to get the metadata. - self._onMetadata(self) - } else if (self.xs) { - self._getMetadataFromServer() - } -} + self.discovery.on('error', onError) + self.discovery.on('peer', onPeer) + self.discovery.on('trackerAnnounce', onTrackerAnnounce) + self.discovery.on('dhtAnnounce', onDHTAnnounce) + self.discovery.on('warning', onWarning) -Torrent.prototype._getMetadataFromServer = function () { - var self = this - var urls = Array.isArray(self.xs) ? self.xs : [ self.xs ] + function onError (err) { + self._destroy(err) + } - var tasks = urls.map(function (url) { - return function (cb) { - getMetadataFromURL(url, cb) + function onPeer (peer) { + // Don't create new outgoing TCP connections when torrent is done + if (typeof peer === 'string' && self.done) return + self.addPeer(peer) } - }) - parallel(tasks) - function getMetadataFromURL (url, cb) { - if (url.indexOf('http://') !== 0 && url.indexOf('https://') !== 0) { - self.emit('warning', new Error('skipping non-http xs param: ' + url)) - return cb(null) + function onTrackerAnnounce () { + self.emit('trackerAnnounce') + if (self.numPeers === 0) self.emit('noPeers', 'tracker') } - var opts = { - url: url, - method: 'GET', - headers: { - 'user-agent': USER_AGENT - } + function onDHTAnnounce () { + self.emit('dhtAnnounce') + if (self.numPeers === 0) self.emit('noPeers', 'dht') } - var req - try { - req = get.concat(opts, onResponse) - } catch (err) { - self.emit('warning', new Error('skipping invalid url xs param: ' + url)) - return cb(null) + + function onWarning (err) { + self.emit('warning', err) } - self._xsRequests.push(req) + if (self.info) { + // if full metadata was included in initial torrent id, use it immediately. Otherwise, + // wait for torrent-discovery to find peers and ut_metadata to get the metadata. + self._onMetadata(self) + } else if (self.xs) { + self._getMetadataFromServer() + } + } - function onResponse (err, res, torrent) { - if (self.destroyed) return cb(null) - if (self.metadata) return cb(null) + _getMetadataFromServer () { + const self = this + const urls = Array.isArray(self.xs) ? self.xs : [ self.xs ] - if (err) { - self.emit('warning', new Error('http error from xs param: ' + url)) - return cb(null) - } - if (res.statusCode !== 200) { - self.emit('warning', new Error('non-200 status code ' + res.statusCode + ' from xs param: ' + url)) + const tasks = urls.map(url => cb => { + getMetadataFromURL(url, cb) + }) + parallel(tasks) + + function getMetadataFromURL (url, cb) { + if (url.indexOf('http://') !== 0 && url.indexOf('https://') !== 0) { + self.emit('warning', new Error(`skipping non-http xs param: ${url}`)) return cb(null) } - var parsedTorrent + const opts = { + url, + method: 'GET', + headers: { + 'user-agent': USER_AGENT + } + } + let req try { - parsedTorrent = parseTorrent(torrent) - } catch (err) {} - - if (!parsedTorrent) { - self.emit('warning', new Error('got invalid torrent file from xs param: ' + url)) + req = get.concat(opts, onResponse) + } catch (err) { + self.emit('warning', new Error(`skipping invalid url xs param: ${url}`)) return cb(null) } - if (parsedTorrent.infoHash !== self.infoHash) { - self.emit('warning', new Error('got torrent file with incorrect info hash from xs param: ' + url)) - return cb(null) - } + self._xsRequests.push(req) - self._onMetadata(parsedTorrent) - cb(null) - } - } -} + function onResponse (err, res, torrent) { + if (self.destroyed) return cb(null) + if (self.metadata) return cb(null) -/** - * Called when the full torrent metadata is received. - */ -Torrent.prototype._onMetadata = function (metadata) { - var self = this - if (self.metadata || self.destroyed) return - self._debug('got metadata') - - self._xsRequests.forEach(function (req) { - req.abort() - }) - self._xsRequests = [] - - var parsedTorrent - if (metadata && metadata.infoHash) { - // `metadata` is a parsed torrent (from parse-torrent module) - parsedTorrent = metadata - } else { - try { - parsedTorrent = parseTorrent(metadata) - } catch (err) { - return self._destroy(err) + if (err) { + self.emit('warning', new Error(`http error from xs param: ${url}`)) + return cb(null) + } + if (res.statusCode !== 200) { + self.emit('warning', new Error(`non-200 status code ${res.statusCode} from xs param: ${url}`)) + return cb(null) + } + + let parsedTorrent + try { + parsedTorrent = parseTorrent(torrent) + } catch (err) {} + + if (!parsedTorrent) { + self.emit('warning', new Error(`got invalid torrent file from xs param: ${url}`)) + return cb(null) + } + + if (parsedTorrent.infoHash !== self.infoHash) { + self.emit('warning', new Error(`got torrent file with incorrect info hash from xs param: ${url}`)) + return cb(null) + } + + self._onMetadata(parsedTorrent) + cb(null) + } } } - self._processParsedTorrent(parsedTorrent) - self.metadata = self.torrentFile + /** + * Called when the full torrent metadata is received. + */ + _onMetadata (metadata) { + const self = this + if (self.metadata || self.destroyed) return + self._debug('got metadata') - // add web seed urls (BEP19) - if (self.client.enableWebSeeds) { - self.urlList.forEach(function (url) { - self.addWebSeed(url) + self._xsRequests.forEach(req => { + req.abort() }) - } + self._xsRequests = [] + + let parsedTorrent + if (metadata && metadata.infoHash) { + // `metadata` is a parsed torrent (from parse-torrent module) + parsedTorrent = metadata + } else { + try { + parsedTorrent = parseTorrent(metadata) + } catch (err) { + return self._destroy(err) + } + } + + self._processParsedTorrent(parsedTorrent) + self.metadata = self.torrentFile + + // add web seed urls (BEP19) + if (self.client.enableWebSeeds) { + self.urlList.forEach(url => { + self.addWebSeed(url) + }) + } - self._rarityMap = new RarityMap(self) + self._rarityMap = new RarityMap(self) - self.store = new ImmediateChunkStore( - new self._store(self.pieceLength, { - torrent: { - infoHash: self.infoHash - }, - files: self.files.map(function (file) { - return { + self.store = new ImmediateChunkStore( + new self._store(self.pieceLength, { + torrent: { + infoHash: self.infoHash + }, + files: self.files.map(file => ({ path: path.join(self.path, file.path), length: file.length, offset: file.offset - } - }), - length: self.length - }) - ) + })), + length: self.length + }) + ) - self.files = self.files.map(function (file) { - return new File(self, file) - }) + self.files = self.files.map(file => new File(self, file)) - // Select only specified files (BEP53) http://www.bittorrent.org/beps/bep_0053.html - if (self.so) { - var selectOnlyFiles = parseRange.parse(self.so) + // Select only specified files (BEP53) http://www.bittorrent.org/beps/bep_0053.html + if (self.so) { + const selectOnlyFiles = parseRange.parse(self.so) - self.files.forEach(function (v, i) { - if (selectOnlyFiles.includes(i)) self.files[i].select(true) - }) - } else { - // start off selecting the entire torrent with low priority - if (self.pieces.length !== 0) { - self.select(0, self.pieces.length - 1, false) + self.files.forEach((v, i) => { + if (selectOnlyFiles.includes(i)) self.files[i].select(true) + }) + } else { + // start off selecting the entire torrent with low priority + if (self.pieces.length !== 0) { + self.select(0, self.pieces.length - 1, false) + } } - } - self._hashes = self.pieces - - self.pieces = self.pieces.map(function (hash, i) { - var pieceLength = (i === self.pieces.length - 1) - ? self.lastPieceLength - : self.pieceLength - return new Piece(pieceLength) - }) - - self._reservations = self.pieces.map(function () { - return [] - }) - - self.bitfield = new BitField(self.pieces.length) - - self.wires.forEach(function (wire) { - // If we didn't have the metadata at the time ut_metadata was initialized for this - // wire, we still want to make it available to the peer in case they request it. - if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata) - - self._onWireWithMetadata(wire) - }) - - if (self.skipVerify) { - // Skip verifying exisitng data and just assume it's correct - self._markAllVerified() - self._onStore() - } else { - self._debug('verifying existing torrent data') - if (self._fileModtimes && self._store === FSChunkStore) { - // don't verify if the files haven't been modified since we last checked - self.getFileModtimes(function (err, fileModtimes) { - if (err) return self._destroy(err) + self._hashes = self.pieces - var unchanged = self.files.map(function (_, index) { - return fileModtimes[index] === self._fileModtimes[index] - }).every(function (x) { - return x - }) + self.pieces = self.pieces.map((hash, i) => { + const pieceLength = (i === self.pieces.length - 1) + ? self.lastPieceLength + : self.pieceLength + return new Piece(pieceLength) + }) - if (unchanged) { - self._markAllVerified() - self._onStore() - } else { - self._verifyPieces() - } - }) + self._reservations = self.pieces.map(() => []) + + self.bitfield = new BitField(self.pieces.length) + + self.wires.forEach(wire => { + // If we didn't have the metadata at the time ut_metadata was initialized for this + // wire, we still want to make it available to the peer in case they request it. + if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata) + + self._onWireWithMetadata(wire) + }) + + if (self.skipVerify) { + // Skip verifying exisitng data and just assume it's correct + self._markAllVerified() + self._onStore() } else { - self._verifyPieces() + self._debug('verifying existing torrent data') + if (self._fileModtimes && self._store === FSChunkStore) { + // don't verify if the files haven't been modified since we last checked + self.getFileModtimes((err, fileModtimes) => { + if (err) return self._destroy(err) + + const unchanged = self.files.map((_, index) => fileModtimes[index] === self._fileModtimes[index]).every(x => x) + + if (unchanged) { + self._markAllVerified() + self._onStore() + } else { + self._verifyPieces() + } + }) + } else { + self._verifyPieces() + } } - } - self.emit('metadata') -} + self.emit('metadata') + } -/* - * TODO: remove this - * Gets the last modified time of every file on disk for this torrent. - * Only valid in Node, not in the browser. - */ -Torrent.prototype.getFileModtimes = function (cb) { - var self = this - var ret = [] - parallelLimit(self.files.map(function (file, index) { - return function (cb) { - fs.stat(path.join(self.path, file.path), function (err, stat) { + /* + * TODO: remove this + * Gets the last modified time of every file on disk for this torrent. + * Only valid in Node, not in the browser. + */ + getFileModtimes (cb) { + const self = this + const ret = [] + parallelLimit(self.files.map((file, index) => cb => { + fs.stat(path.join(self.path, file.path), (err, stat) => { if (err && err.code !== 'ENOENT') return cb(err) ret[index] = stat && stat.mtime.getTime() cb(null) }) - } - }), FILESYSTEM_CONCURRENCY, function (err) { - self._debug('done getting file modtimes') - cb(err, ret) - }) -} + }), FILESYSTEM_CONCURRENCY, err => { + self._debug('done getting file modtimes') + cb(err, ret) + }) + } -Torrent.prototype._verifyPieces = function () { - var self = this - parallelLimit(self.pieces.map(function (_, index) { - return function (cb) { + _verifyPieces () { + const self = this + parallelLimit(self.pieces.map((_, index) => cb => { if (self.destroyed) return cb(new Error('torrent is destroyed')) - self.store.get(index, function (err, buf) { + self.store.get(index, (err, buf) => { if (self.destroyed) return cb(new Error('torrent is destroyed')) if (err) return process.nextTick(cb, null) // ignore error - sha1(buf, function (hash) { + sha1(buf, hash => { if (self.destroyed) return cb(new Error('torrent is destroyed')) if (hash === self._hashes[index]) { @@ -620,1138 +579,1132 @@ Torrent.prototype._verifyPieces = function () { cb(null) }) }) - } - }), FILESYSTEM_CONCURRENCY, function (err) { - if (err) return self._destroy(err) - self._debug('done verifying') - self._onStore() - }) -} + }), FILESYSTEM_CONCURRENCY, err => { + if (err) return self._destroy(err) + self._debug('done verifying') + self._onStore() + }) + } -Torrent.prototype._markAllVerified = function () { - for (var index = 0; index < this.pieces.length; index++) { - this._markVerified(index) + _markAllVerified () { + for (let index = 0; index < this.pieces.length; index++) { + this._markVerified(index) + } } -} -Torrent.prototype._markVerified = function (index) { - this.pieces[index] = null - this._reservations[index] = null - this.bitfield.set(index, true) -} + _markVerified (index) { + this.pieces[index] = null + this._reservations[index] = null + this.bitfield.set(index, true) + } -/** - * Called when the metadata, listening server, and underlying chunk store is initialized. - */ -Torrent.prototype._onStore = function () { - var self = this - if (self.destroyed) return - self._debug('on store') + /** + * Called when the metadata, listening server, and underlying chunk store is initialized. + */ + _onStore () { + const self = this + if (self.destroyed) return + self._debug('on store') - self.ready = true - self.emit('ready') + self.ready = true + self.emit('ready') - // Files may start out done if the file was already in the store - self._checkDone() + // Files may start out done if the file was already in the store + self._checkDone() - // In case any selections were made before torrent was ready - self._updateSelections() -} + // In case any selections were made before torrent was ready + self._updateSelections() + } -Torrent.prototype.destroy = function (cb) { - var self = this - self._destroy(null, cb) -} + destroy (cb) { + const self = this + self._destroy(null, cb) + } -Torrent.prototype._destroy = function (err, cb) { - var self = this - if (self.destroyed) return - self.destroyed = true - self._debug('destroy') + _destroy (err, cb) { + const self = this + if (self.destroyed) return + self.destroyed = true + self._debug('destroy') - self.client._remove(self) + self.client._remove(self) - clearInterval(self._rechokeIntervalId) + clearInterval(self._rechokeIntervalId) - self._xsRequests.forEach(function (req) { - req.abort() - }) + self._xsRequests.forEach(req => { + req.abort() + }) - if (self._rarityMap) { - self._rarityMap.destroy() - } + if (self._rarityMap) { + self._rarityMap.destroy() + } - for (var id in self._peers) { - self.removePeer(id) - } + for (const id in self._peers) { + self.removePeer(id) + } - self.files.forEach(function (file) { - if (file instanceof File) file._destroy() - }) + self.files.forEach(file => { + if (file instanceof File) file._destroy() + }) - var tasks = self._servers.map(function (server) { - return function (cb) { + const tasks = self._servers.map(server => cb => { server.destroy(cb) + }) + + if (self.discovery) { + tasks.push(cb => { + self.discovery.destroy(cb) + }) } - }) - if (self.discovery) { - tasks.push(function (cb) { - self.discovery.destroy(cb) - }) - } + if (self.store) { + tasks.push(cb => { + self.store.close(cb) + }) + } - if (self.store) { - tasks.push(function (cb) { - self.store.close(cb) - }) - } + parallel(tasks, cb) + + if (err) { + // Torrent errors are emitted at `torrent.on('error')`. If there are no 'error' + // event handlers on the torrent instance, then the error will be emitted at + // `client.on('error')`. This prevents throwing an uncaught exception + // (unhandled 'error' event), but it makes it impossible to distinguish client + // errors versus torrent errors. Torrent errors are not fatal, and the client + // is still usable afterwards. Therefore, always listen for errors in both + // places (`client.on('error')` and `torrent.on('error')`). + if (self.listenerCount('error') === 0) { + self.client.emit('error', err) + } else { + self.emit('error', err) + } + } + + self.emit('close') + + self.client = null + self.files = [] + self.discovery = null + self.store = null + self._rarityMap = null + self._peers = null + self._servers = null + self._xsRequests = null + } + + addPeer (peer) { + const self = this + if (self.destroyed) throw new Error('torrent is destroyed') + if (!self.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event') + + if (self.client.blocked) { + let host + if (typeof peer === 'string') { + let parts + try { + parts = addrToIPPort(peer) + } catch (e) { + self._debug('ignoring peer: invalid %s', peer) + self.emit('invalidPeer', peer) + return false + } + host = parts[0] + } else if (typeof peer.remoteAddress === 'string') { + host = peer.remoteAddress + } + + if (host && self.client.blocked.contains(host)) { + self._debug('ignoring peer: blocked %s', peer) + if (typeof peer !== 'string') peer.destroy() + self.emit('blockedPeer', peer) + return false + } + } - parallel(tasks, cb) - - if (err) { - // Torrent errors are emitted at `torrent.on('error')`. If there are no 'error' - // event handlers on the torrent instance, then the error will be emitted at - // `client.on('error')`. This prevents throwing an uncaught exception - // (unhandled 'error' event), but it makes it impossible to distinguish client - // errors versus torrent errors. Torrent errors are not fatal, and the client - // is still usable afterwards. Therefore, always listen for errors in both - // places (`client.on('error')` and `torrent.on('error')`). - if (self.listenerCount('error') === 0) { - self.client.emit('error', err) + const wasAdded = !!self._addPeer(peer) + if (wasAdded) { + self.emit('peer', peer) } else { - self.emit('error', err) + self.emit('invalidPeer', peer) } + return wasAdded } - self.emit('close') + _addPeer (peer) { + const self = this + if (self.destroyed) { + if (typeof peer !== 'string') peer.destroy() + return null + } + if (typeof peer === 'string' && !self._validAddr(peer)) { + self._debug('ignoring peer: invalid %s', peer) + return null + } - self.client = null - self.files = [] - self.discovery = null - self.store = null - self._rarityMap = null - self._peers = null - self._servers = null - self._xsRequests = null -} + const id = (peer && peer.id) || peer + if (self._peers[id]) { + self._debug('ignoring peer: duplicate (%s)', id) + if (typeof peer !== 'string') peer.destroy() + return null + } + + if (self.paused) { + self._debug('ignoring peer: torrent is paused') + if (typeof peer !== 'string') peer.destroy() + return null + } -Torrent.prototype.addPeer = function (peer) { - var self = this - if (self.destroyed) throw new Error('torrent is destroyed') - if (!self.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event') + self._debug('add peer %s', id) - if (self.client.blocked) { - var host + let newPeer if (typeof peer === 'string') { - var parts - try { - parts = addrToIPPort(peer) - } catch (e) { - self._debug('ignoring peer: invalid %s', peer) - self.emit('invalidPeer', peer) - return false - } - host = parts[0] - } else if (typeof peer.remoteAddress === 'string') { - host = peer.remoteAddress + // `peer` is an addr ("ip:port" string) + newPeer = Peer.createTCPOutgoingPeer(peer, self) + } else { + // `peer` is a WebRTC connection (simple-peer) + newPeer = Peer.createWebRTCPeer(peer, self) } - if (host && self.client.blocked.contains(host)) { - self._debug('ignoring peer: blocked %s', peer) - if (typeof peer !== 'string') peer.destroy() - self.emit('blockedPeer', peer) - return false + self._peers[newPeer.id] = newPeer + self._peersLength += 1 + + if (typeof peer === 'string') { + // `peer` is an addr ("ip:port" string) + self._queue.push(newPeer) + self._drain() } - } - var wasAdded = !!self._addPeer(peer) - if (wasAdded) { - self.emit('peer', peer) - } else { - self.emit('invalidPeer', peer) + return newPeer } - return wasAdded -} -Torrent.prototype._addPeer = function (peer) { - var self = this - if (self.destroyed) { - if (typeof peer !== 'string') peer.destroy() - return null - } - if (typeof peer === 'string' && !self._validAddr(peer)) { - self._debug('ignoring peer: invalid %s', peer) - return null - } + addWebSeed (url) { + if (this.destroyed) throw new Error('torrent is destroyed') - var id = (peer && peer.id) || peer - if (self._peers[id]) { - self._debug('ignoring peer: duplicate (%s)', id) - if (typeof peer !== 'string') peer.destroy() - return null - } + if (!/^https?:\/\/.+/.test(url)) { + this.emit('warning', new Error(`ignoring invalid web seed: ${url}`)) + this.emit('invalidPeer', url) + return + } - if (self.paused) { - self._debug('ignoring peer: torrent is paused') - if (typeof peer !== 'string') peer.destroy() - return null - } + if (this._peers[url]) { + this.emit('warning', new Error(`ignoring duplicate web seed: ${url}`)) + this.emit('invalidPeer', url) + return + } + + this._debug('add web seed %s', url) - self._debug('add peer %s', id) + const newPeer = Peer.createWebSeedPeer(url, this) + this._peers[newPeer.id] = newPeer + this._peersLength += 1 - var newPeer - if (typeof peer === 'string') { - // `peer` is an addr ("ip:port" string) - newPeer = Peer.createTCPOutgoingPeer(peer, self) - } else { - // `peer` is a WebRTC connection (simple-peer) - newPeer = Peer.createWebRTCPeer(peer, self) + this.emit('peer', url) } - self._peers[newPeer.id] = newPeer - self._peersLength += 1 + /** + * Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a + * peer that has already sent a handshake. + */ + _addIncomingPeer (peer) { + const self = this + if (self.destroyed) return peer.destroy(new Error('torrent is destroyed')) + if (self.paused) return peer.destroy(new Error('torrent is paused')) - if (typeof peer === 'string') { - // `peer` is an addr ("ip:port" string) - self._queue.push(newPeer) - self._drain() + this._debug('add incoming peer %s', peer.id) + + self._peers[peer.id] = peer + self._peersLength += 1 } - return newPeer -} + removePeer (peer) { + const self = this + const id = (peer && peer.id) || peer + peer = self._peers[id] -Torrent.prototype.addWebSeed = function (url) { - if (this.destroyed) throw new Error('torrent is destroyed') + if (!peer) return - if (!/^https?:\/\/.+/.test(url)) { - this.emit('warning', new Error('ignoring invalid web seed: ' + url)) - this.emit('invalidPeer', url) - return - } + this._debug('removePeer %s', id) + + delete self._peers[id] + self._peersLength -= 1 + + peer.destroy() - if (this._peers[url]) { - this.emit('warning', new Error('ignoring duplicate web seed: ' + url)) - this.emit('invalidPeer', url) - return + // If torrent swarm was at capacity before, try to open a new connection now + self._drain() } - this._debug('add web seed %s', url) + select (start, end, priority, notify) { + const self = this + if (self.destroyed) throw new Error('torrent is destroyed') - var newPeer = Peer.createWebSeedPeer(url, this) - this._peers[newPeer.id] = newPeer - this._peersLength += 1 + if (start < 0 || end < start || self.pieces.length <= end) { + throw new Error('invalid selection ', start, ':', end) + } + priority = Number(priority) || 0 - this.emit('peer', url) -} + self._debug('select %s-%s (priority %s)', start, end, priority) -/** - * Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a - * peer that has already sent a handshake. - */ -Torrent.prototype._addIncomingPeer = function (peer) { - var self = this - if (self.destroyed) return peer.destroy(new Error('torrent is destroyed')) - if (self.paused) return peer.destroy(new Error('torrent is paused')) + self._selections.push({ + from: start, + to: end, + offset: 0, + priority, + notify: notify || noop + }) - this._debug('add incoming peer %s', peer.id) + self._selections.sort((a, b) => b.priority - a.priority) - self._peers[peer.id] = peer - self._peersLength += 1 -} + self._updateSelections() + } -Torrent.prototype.removePeer = function (peer) { - var self = this - var id = (peer && peer.id) || peer - peer = self._peers[id] + deselect (start, end, priority) { + const self = this + if (self.destroyed) throw new Error('torrent is destroyed') - if (!peer) return + priority = Number(priority) || 0 + self._debug('deselect %s-%s (priority %s)', start, end, priority) - this._debug('removePeer %s', id) + for (let i = 0; i < self._selections.length; ++i) { + const s = self._selections[i] + if (s.from === start && s.to === end && s.priority === priority) { + self._selections.splice(i, 1) + break + } + } - delete self._peers[id] - self._peersLength -= 1 + self._updateSelections() + } - peer.destroy() + critical (start, end) { + const self = this + if (self.destroyed) throw new Error('torrent is destroyed') - // If torrent swarm was at capacity before, try to open a new connection now - self._drain() -} + self._debug('critical %s-%s', start, end) -Torrent.prototype.select = function (start, end, priority, notify) { - var self = this - if (self.destroyed) throw new Error('torrent is destroyed') + for (let i = start; i <= end; ++i) { + self._critical[i] = true + } - if (start < 0 || end < start || self.pieces.length <= end) { - throw new Error('invalid selection ', start, ':', end) + self._updateSelections() } - priority = Number(priority) || 0 - self._debug('select %s-%s (priority %s)', start, end, priority) + _onWire (wire, addr) { + const self = this + self._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown') - self._selections.push({ - from: start, - to: end, - offset: 0, - priority: priority, - notify: notify || noop - }) + wire.on('download', downloaded => { + if (self.destroyed) return + self.received += downloaded + self._downloadSpeed(downloaded) + self.client._downloadSpeed(downloaded) + self.emit('download', downloaded) + self.client.emit('download', downloaded) + }) - self._selections.sort(function (a, b) { - return b.priority - a.priority - }) + wire.on('upload', uploaded => { + if (self.destroyed) return + self.uploaded += uploaded + self._uploadSpeed(uploaded) + self.client._uploadSpeed(uploaded) + self.emit('upload', uploaded) + self.client.emit('upload', uploaded) + }) - self._updateSelections() -} + self.wires.push(wire) -Torrent.prototype.deselect = function (start, end, priority) { - var self = this - if (self.destroyed) throw new Error('torrent is destroyed') + if (addr) { + // Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers + const parts = addrToIPPort(addr) + wire.remoteAddress = parts[0] + wire.remotePort = parts[1] + } - priority = Number(priority) || 0 - self._debug('deselect %s-%s (priority %s)', start, end, priority) + // When peer sends PORT message, add that DHT node to routing table + if (self.client.dht && self.client.dht.listening) { + wire.on('port', port => { + if (self.destroyed || self.client.dht.destroyed) { + return + } + if (!wire.remoteAddress) { + return self._debug('ignoring PORT from peer with no address') + } + if (port === 0 || port > 65536) { + return self._debug('ignoring invalid PORT from peer') + } - for (var i = 0; i < self._selections.length; ++i) { - var s = self._selections[i] - if (s.from === start && s.to === end && s.priority === priority) { - self._selections.splice(i, 1) - break + self._debug('port: %s (from %s)', port, addr) + self.client.dht.addNode({ host: wire.remoteAddress, port }) + }) } - } - self._updateSelections() -} + wire.on('timeout', () => { + self._debug('wire timeout (%s)', addr) + // TODO: this might be destroying wires too eagerly + wire.destroy() + }) -Torrent.prototype.critical = function (start, end) { - var self = this - if (self.destroyed) throw new Error('torrent is destroyed') + // Timeout for piece requests to this peer + wire.setTimeout(PIECE_TIMEOUT, true) - self._debug('critical %s-%s', start, end) + // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire + wire.setKeepAlive(true) - for (var i = start; i <= end; ++i) { - self._critical[i] = true - } + // use ut_metadata extension + wire.use(utMetadata(self.metadata)) - self._updateSelections() -} + wire.ut_metadata.on('warning', err => { + self._debug('ut_metadata warning: %s', err.message) + }) + + if (!self.metadata) { + wire.ut_metadata.on('metadata', metadata => { + self._debug('got metadata via ut_metadata') + self._onMetadata(metadata) + }) + wire.ut_metadata.fetch() + } -Torrent.prototype._onWire = function (wire, addr) { - var self = this - self._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown') + // use ut_pex extension if the torrent is not flagged as private + if (typeof utPex === 'function' && !self.private) { + wire.use(utPex()) - wire.on('download', function (downloaded) { - if (self.destroyed) return - self.received += downloaded - self._downloadSpeed(downloaded) - self.client._downloadSpeed(downloaded) - self.emit('download', downloaded) - self.client.emit('download', downloaded) - }) - - wire.on('upload', function (uploaded) { - if (self.destroyed) return - self.uploaded += uploaded - self._uploadSpeed(uploaded) - self.client._uploadSpeed(uploaded) - self.emit('upload', uploaded) - self.client.emit('upload', uploaded) - }) - - self.wires.push(wire) - - if (addr) { - // Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers - var parts = addrToIPPort(addr) - wire.remoteAddress = parts[0] - wire.remotePort = parts[1] - } + wire.ut_pex.on('peer', peer => { + // Only add potential new peers when we're not seeding + if (self.done) return + self._debug('ut_pex: got peer: %s (from %s)', peer, addr) + self.addPeer(peer) + }) - // When peer sends PORT message, add that DHT node to routing table - if (self.client.dht && self.client.dht.listening) { - wire.on('port', function (port) { - if (self.destroyed || self.client.dht.destroyed) { - return - } - if (!wire.remoteAddress) { - return self._debug('ignoring PORT from peer with no address') - } - if (port === 0 || port > 65536) { - return self._debug('ignoring invalid PORT from peer') - } + wire.ut_pex.on('dropped', peer => { + // the remote peer believes a given peer has been dropped from the torrent swarm. + // if we're not currently connected to it, then remove it from the queue. + const peerObj = self._peers[peer] + if (peerObj && !peerObj.connected) { + self._debug('ut_pex: dropped peer: %s (from %s)', peer, addr) + self.removePeer(peer) + } + }) - self._debug('port: %s (from %s)', port, addr) - self.client.dht.addNode({ host: wire.remoteAddress, port: port }) - }) - } + wire.once('close', () => { + // Stop sending updates to remote peer + wire.ut_pex.reset() + }) + } - wire.on('timeout', function () { - self._debug('wire timeout (%s)', addr) - // TODO: this might be destroying wires too eagerly - wire.destroy() - }) + // Hook to allow user-defined `bittorrent-protocol` extensions + // More info: https://github.com/webtorrent/bittorrent-protocol#extension-api + self.emit('wire', wire, addr) - // Timeout for piece requests to this peer - wire.setTimeout(PIECE_TIMEOUT, true) + if (self.metadata) { + process.nextTick(() => { + // This allows wire.handshake() to be called (by Peer.onHandshake) before any + // messages get sent on the wire + self._onWireWithMetadata(wire) + }) + } + } - // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire - wire.setKeepAlive(true) + _onWireWithMetadata (wire) { + const self = this + let timeoutId = null - // use ut_metadata extension - wire.use(utMetadata(self.metadata)) + function onChokeTimeout () { + if (self.destroyed || wire.destroyed) return - wire.ut_metadata.on('warning', function (err) { - self._debug('ut_metadata warning: %s', err.message) - }) + if (self._numQueued > 2 * (self._numConns - self.numPeers) && + wire.amInterested) { + wire.destroy() + } else { + timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) + if (timeoutId.unref) timeoutId.unref() + } + } - if (!self.metadata) { - wire.ut_metadata.on('metadata', function (metadata) { - self._debug('got metadata via ut_metadata') - self._onMetadata(metadata) + let i + function updateSeedStatus () { + if (wire.peerPieces.buffer.length !== self.bitfield.buffer.length) return + for (i = 0; i < self.pieces.length; ++i) { + if (!wire.peerPieces.get(i)) return + } + wire.isSeeder = true + wire.choke() // always choke seeders + } + + wire.on('bitfield', () => { + updateSeedStatus() + self._update() }) - wire.ut_metadata.fetch() - } - // use ut_pex extension if the torrent is not flagged as private - if (typeof utPex === 'function' && !self.private) { - wire.use(utPex()) + wire.on('have', () => { + updateSeedStatus() + self._update() + }) - wire.ut_pex.on('peer', function (peer) { - // Only add potential new peers when we're not seeding - if (self.done) return - self._debug('ut_pex: got peer: %s (from %s)', peer, addr) - self.addPeer(peer) + wire.once('interested', () => { + wire.unchoke() }) - wire.ut_pex.on('dropped', function (peer) { - // the remote peer believes a given peer has been dropped from the torrent swarm. - // if we're not currently connected to it, then remove it from the queue. - var peerObj = self._peers[peer] - if (peerObj && !peerObj.connected) { - self._debug('ut_pex: dropped peer: %s (from %s)', peer, addr) - self.removePeer(peer) - } + wire.once('close', () => { + clearTimeout(timeoutId) }) - wire.once('close', function () { - // Stop sending updates to remote peer - wire.ut_pex.reset() + wire.on('choke', () => { + clearTimeout(timeoutId) + timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) + if (timeoutId.unref) timeoutId.unref() }) - } - // Hook to allow user-defined `bittorrent-protocol` extensions - // More info: https://github.com/webtorrent/bittorrent-protocol#extension-api - self.emit('wire', wire, addr) + wire.on('unchoke', () => { + clearTimeout(timeoutId) + self._update() + }) - if (self.metadata) { - process.nextTick(function () { - // This allows wire.handshake() to be called (by Peer.onHandshake) before any - // messages get sent on the wire - self._onWireWithMetadata(wire) + wire.on('request', (index, offset, length, cb) => { + if (length > MAX_BLOCK_LENGTH) { + // Per spec, disconnect from peers that request >128KB + return wire.destroy() + } + if (self.pieces[index]) return + self.store.get(index, { offset, length }, cb) }) - } -} -Torrent.prototype._onWireWithMetadata = function (wire) { - var self = this - var timeoutId = null + wire.bitfield(self.bitfield) // always send bitfield (required) + wire.uninterested() // always start out uninterested (as per protocol) - function onChokeTimeout () { - if (self.destroyed || wire.destroyed) return + // Send PORT message to peers that support DHT + if (wire.peerExtensions.dht && self.client.dht && self.client.dht.listening) { + wire.port(self.client.dht.address().port) + } - if (self._numQueued > 2 * (self._numConns - self.numPeers) && - wire.amInterested) { - wire.destroy() - } else { + if (wire.type !== 'webSeed') { // do not choke on webseeds timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) if (timeoutId.unref) timeoutId.unref() } - } - var i - function updateSeedStatus () { - if (wire.peerPieces.buffer.length !== self.bitfield.buffer.length) return - for (i = 0; i < self.pieces.length; ++i) { - if (!wire.peerPieces.get(i)) return - } - wire.isSeeder = true - wire.choke() // always choke seeders + wire.isSeeder = false + updateSeedStatus() } - wire.on('bitfield', function () { - updateSeedStatus() - self._update() - }) + /** + * Called on selection changes. + */ + _updateSelections () { + const self = this + if (!self.ready || self.destroyed) return - wire.on('have', function () { - updateSeedStatus() + process.nextTick(() => { + self._gcSelections() + }) + self._updateInterest() self._update() - }) + } + + /** + * Garbage collect selections with respect to the store's current state. + */ + _gcSelections () { + const self = this - wire.once('interested', function () { - wire.unchoke() - }) + for (let i = 0; i < self._selections.length; ++i) { + const s = self._selections[i] + const oldOffset = s.offset - wire.once('close', function () { - clearTimeout(timeoutId) - }) + // check for newly downloaded pieces in selection + while (self.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) { + s.offset += 1 + } - wire.on('choke', function () { - clearTimeout(timeoutId) - timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) - if (timeoutId.unref) timeoutId.unref() - }) + if (oldOffset !== s.offset) s.notify() + if (s.to !== s.from + s.offset) continue + if (!self.bitfield.get(s.from + s.offset)) continue - wire.on('unchoke', function () { - clearTimeout(timeoutId) - self._update() - }) + self._selections.splice(i, 1) // remove fully downloaded selection + i -= 1 // decrement i to offset splice - wire.on('request', function (index, offset, length, cb) { - if (length > MAX_BLOCK_LENGTH) { - // Per spec, disconnect from peers that request >128KB - return wire.destroy() + s.notify() + self._updateInterest() } - if (self.pieces[index]) return - self.store.get(index, { offset: offset, length: length }, cb) - }) - wire.bitfield(self.bitfield) // always send bitfield (required) - wire.uninterested() // always start out uninterested (as per protocol) - - // Send PORT message to peers that support DHT - if (wire.peerExtensions.dht && self.client.dht && self.client.dht.listening) { - wire.port(self.client.dht.address().port) + if (!self._selections.length) self.emit('idle') } - if (wire.type !== 'webSeed') { // do not choke on webseeds - timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) - if (timeoutId.unref) timeoutId.unref() - } + /** + * Update interested status for all peers. + */ + _updateInterest () { + const self = this - wire.isSeeder = false - updateSeedStatus() -} + const prev = self._amInterested + self._amInterested = !!self._selections.length -/** - * Called on selection changes. - */ -Torrent.prototype._updateSelections = function () { - var self = this - if (!self.ready || self.destroyed) return + self.wires.forEach(wire => { + let interested = false + for (let index = 0; index < self.pieces.length; ++index) { + if (self.pieces[index] && wire.peerPieces.get(index)) { + interested = true + break + } + } - process.nextTick(function () { - self._gcSelections() - }) - self._updateInterest() - self._update() -} + if (interested) wire.interested() + else wire.uninterested() + }) -/** - * Garbage collect selections with respect to the store's current state. - */ -Torrent.prototype._gcSelections = function () { - var self = this + if (prev === self._amInterested) return + if (self._amInterested) self.emit('interested') + else self.emit('uninterested') + } - for (var i = 0; i < self._selections.length; ++i) { - var s = self._selections[i] - var oldOffset = s.offset + /** + * Heartbeat to update all peers and their requests. + */ + _update () { + const self = this + if (self.destroyed) return - // check for newly downloaded pieces in selection - while (self.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) { - s.offset += 1 + // update wires in random order for better request distribution + const ite = randomIterate(self.wires) + let wire + while ((wire = ite())) { + self._updateWire(wire) } + } - if (oldOffset !== s.offset) s.notify() - if (s.to !== s.from + s.offset) continue - if (!self.bitfield.get(s.from + s.offset)) continue + /** + * Attempts to update a peer's requests + */ + _updateWire (wire) { + const self = this - self._selections.splice(i, 1) // remove fully downloaded selection - i -= 1 // decrement i to offset splice + if (wire.peerChoking) return + if (!wire.downloaded) return validateWire() - s.notify() - self._updateInterest() - } + const minOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MIN_DURATION) + if (wire.requests.length >= minOutstandingRequests) return + const maxOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MAX_DURATION) - if (!self._selections.length) self.emit('idle') -} - -/** - * Update interested status for all peers. - */ -Torrent.prototype._updateInterest = function () { - var self = this + trySelectWire(false) || trySelectWire(true) - var prev = self._amInterested - self._amInterested = !!self._selections.length + function genPieceFilterFunc (start, end, tried, rank) { + return i => i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i)) + } - self.wires.forEach(function (wire) { - var interested = false - for (var index = 0; index < self.pieces.length; ++index) { - if (self.pieces[index] && wire.peerPieces.get(index)) { - interested = true - break + // TODO: Do we need both validateWire and trySelectWire? + function validateWire () { + if (wire.requests.length) return + + let i = self._selections.length + while (i--) { + const next = self._selections[i] + let piece + if (self.strategy === 'rarest') { + const start = next.from + next.offset + const end = next.to + const len = end - start + 1 + const tried = {} + let tries = 0 + const filter = genPieceFilterFunc(start, end, tried) + + while (tries < len) { + piece = self._rarityMap.getRarestPiece(filter) + if (piece < 0) break + if (self._request(wire, piece, false)) return + tried[piece] = true + tries += 1 + } + } else { + for (piece = next.to; piece >= next.from + next.offset; --piece) { + if (!wire.peerPieces.get(piece)) continue + if (self._request(wire, piece, false)) return + } + } } + + // TODO: wire failed to validate as useful; should we close it? + // probably not, since 'have' and 'bitfield' messages might be coming } - if (interested) wire.interested() - else wire.uninterested() - }) + function speedRanker () { + const speed = wire.downloadSpeed() || 1 + if (speed > SPEED_THRESHOLD) return () => true - if (prev === self._amInterested) return - if (self._amInterested) self.emit('interested') - else self.emit('uninterested') -} + const secs = Math.max(1, wire.requests.length) * Piece.BLOCK_LENGTH / speed + let tries = 10 + let ptr = 0 -/** - * Heartbeat to update all peers and their requests. - */ -Torrent.prototype._update = function () { - var self = this - if (self.destroyed) return - - // update wires in random order for better request distribution - var ite = randomIterate(self.wires) - var wire - while ((wire = ite())) { - self._updateWire(wire) - } -} + return index => { + if (!tries || self.bitfield.get(index)) return true -/** - * Attempts to update a peer's requests - */ -Torrent.prototype._updateWire = function (wire) { - var self = this + let missing = self.pieces[index].missing - if (wire.peerChoking) return - if (!wire.downloaded) return validateWire() + for (; ptr < self.wires.length; ptr++) { + const otherWire = self.wires[ptr] + const otherSpeed = otherWire.downloadSpeed() - var minOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MIN_DURATION) - if (wire.requests.length >= minOutstandingRequests) return - var maxOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MAX_DURATION) + if (otherSpeed < SPEED_THRESHOLD) continue + if (otherSpeed <= speed) continue + if (!otherWire.peerPieces.get(index)) continue + if ((missing -= otherSpeed * secs) > 0) continue - trySelectWire(false) || trySelectWire(true) + tries-- + return false + } - function genPieceFilterFunc (start, end, tried, rank) { - return function (i) { - return i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i)) + return true + } } - } - // TODO: Do we need both validateWire and trySelectWire? - function validateWire () { - if (wire.requests.length) return - - var i = self._selections.length - while (i--) { - var next = self._selections[i] - var piece - if (self.strategy === 'rarest') { - var start = next.from + next.offset - var end = next.to - var len = end - start + 1 - var tried = {} - var tries = 0 - var filter = genPieceFilterFunc(start, end, tried) - - while (tries < len) { - piece = self._rarityMap.getRarestPiece(filter) - if (piece < 0) break - if (self._request(wire, piece, false)) return - tried[piece] = true - tries += 1 - } - } else { - for (piece = next.to; piece >= next.from + next.offset; --piece) { - if (!wire.peerPieces.get(piece)) continue - if (self._request(wire, piece, false)) return - } + function shufflePriority (i) { + let last = i + for (let j = i; j < self._selections.length && self._selections[j].priority; j++) { + last = j } + const tmp = self._selections[i] + self._selections[i] = self._selections[last] + self._selections[last] = tmp } - // TODO: wire failed to validate as useful; should we close it? - // probably not, since 'have' and 'bitfield' messages might be coming - } - - function speedRanker () { - var speed = wire.downloadSpeed() || 1 - if (speed > SPEED_THRESHOLD) return function () { return true } - - var secs = Math.max(1, wire.requests.length) * Piece.BLOCK_LENGTH / speed - var tries = 10 - var ptr = 0 - - return function (index) { - if (!tries || self.bitfield.get(index)) return true - - var missing = self.pieces[index].missing + function trySelectWire (hotswap) { + if (wire.requests.length >= maxOutstandingRequests) return true + const rank = speedRanker() + + for (let i = 0; i < self._selections.length; i++) { + const next = self._selections[i] + + let piece + if (self.strategy === 'rarest') { + const start = next.from + next.offset + const end = next.to + const len = end - start + 1 + const tried = {} + let tries = 0 + const filter = genPieceFilterFunc(start, end, tried, rank) + + while (tries < len) { + piece = self._rarityMap.getRarestPiece(filter) + if (piece < 0) break + + // request all non-reserved blocks in this piece + while (self._request(wire, piece, self._critical[piece] || hotswap)) {} + + if (wire.requests.length < maxOutstandingRequests) { + tried[piece] = true + tries++ + continue + } + + if (next.priority) shufflePriority(i) + return true + } + } else { + for (piece = next.from + next.offset; piece <= next.to; piece++) { + if (!wire.peerPieces.get(piece) || !rank(piece)) continue - for (; ptr < self.wires.length; ptr++) { - var otherWire = self.wires[ptr] - var otherSpeed = otherWire.downloadSpeed() + // request all non-reserved blocks in piece + while (self._request(wire, piece, self._critical[piece] || hotswap)) {} - if (otherSpeed < SPEED_THRESHOLD) continue - if (otherSpeed <= speed) continue - if (!otherWire.peerPieces.get(index)) continue - if ((missing -= otherSpeed * secs) > 0) continue + if (wire.requests.length < maxOutstandingRequests) continue - tries-- - return false + if (next.priority) shufflePriority(i) + return true + } + } } - return true - } - } - - function shufflePriority (i) { - var last = i - for (var j = i; j < self._selections.length && self._selections[j].priority; j++) { - last = j + return false } - var tmp = self._selections[i] - self._selections[i] = self._selections[last] - self._selections[last] = tmp } - function trySelectWire (hotswap) { - if (wire.requests.length >= maxOutstandingRequests) return true - var rank = speedRanker() - - for (var i = 0; i < self._selections.length; i++) { - var next = self._selections[i] - - var piece - if (self.strategy === 'rarest') { - var start = next.from + next.offset - var end = next.to - var len = end - start + 1 - var tried = {} - var tries = 0 - var filter = genPieceFilterFunc(start, end, tried, rank) + /** + * Called periodically to update the choked status of all peers, handling optimistic + * unchoking as described in BEP3. + */ + _rechoke () { + const self = this + if (!self.ready) return - while (tries < len) { - piece = self._rarityMap.getRarestPiece(filter) - if (piece < 0) break + if (self._rechokeOptimisticTime > 0) self._rechokeOptimisticTime -= 1 + else self._rechokeOptimisticWire = null - // request all non-reserved blocks in this piece - while (self._request(wire, piece, self._critical[piece] || hotswap)) {} + const peers = [] - if (wire.requests.length < maxOutstandingRequests) { - tried[piece] = true - tries++ - continue - } + self.wires.forEach(wire => { + if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) { + peers.push({ + wire, + downloadSpeed: wire.downloadSpeed(), + uploadSpeed: wire.uploadSpeed(), + salt: Math.random(), + isChoked: true + }) + } + }) - if (next.priority) shufflePriority(i) - return true - } - } else { - for (piece = next.from + next.offset; piece <= next.to; piece++) { - if (!wire.peerPieces.get(piece) || !rank(piece)) continue + peers.sort(rechokeSort) - // request all non-reserved blocks in piece - while (self._request(wire, piece, self._critical[piece] || hotswap)) {} + let unchokeInterested = 0 + let i = 0 + for (; i < peers.length && unchokeInterested < self._rechokeNumSlots; ++i) { + peers[i].isChoked = false + if (peers[i].wire.peerInterested) unchokeInterested += 1 + } - if (wire.requests.length < maxOutstandingRequests) continue + // Optimistically unchoke a peer + if (!self._rechokeOptimisticWire && i < peers.length && self._rechokeNumSlots) { + const candidates = peers.slice(i).filter(peer => peer.wire.peerInterested) + const optimistic = candidates[randomInt(candidates.length)] - if (next.priority) shufflePriority(i) - return true - } + if (optimistic) { + optimistic.isChoked = false + self._rechokeOptimisticWire = optimistic.wire + self._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION } } - return false - } -} - -/** - * Called periodically to update the choked status of all peers, handling optimistic - * unchoking as described in BEP3. - */ -Torrent.prototype._rechoke = function () { - var self = this - if (!self.ready) return - - if (self._rechokeOptimisticTime > 0) self._rechokeOptimisticTime -= 1 - else self._rechokeOptimisticWire = null - - var peers = [] - - self.wires.forEach(function (wire) { - if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) { - peers.push({ - wire: wire, - downloadSpeed: wire.downloadSpeed(), - uploadSpeed: wire.uploadSpeed(), - salt: Math.random(), - isChoked: true - }) - } - }) + // Unchoke best peers + peers.forEach(peer => { + if (peer.wire.amChoking !== peer.isChoked) { + if (peer.isChoked) peer.wire.choke() + else peer.wire.unchoke() + } + }) - peers.sort(rechokeSort) + function rechokeSort (peerA, peerB) { + // Prefer higher download speed + if (peerA.downloadSpeed !== peerB.downloadSpeed) { + return peerB.downloadSpeed - peerA.downloadSpeed + } - var unchokeInterested = 0 - var i = 0 - for (; i < peers.length && unchokeInterested < self._rechokeNumSlots; ++i) { - peers[i].isChoked = false - if (peers[i].wire.peerInterested) unchokeInterested += 1 - } + // Prefer higher upload speed + if (peerA.uploadSpeed !== peerB.uploadSpeed) { + return peerB.uploadSpeed - peerA.uploadSpeed + } - // Optimistically unchoke a peer - if (!self._rechokeOptimisticWire && i < peers.length && self._rechokeNumSlots) { - var candidates = peers.slice(i).filter(function (peer) { return peer.wire.peerInterested }) - var optimistic = candidates[randomInt(candidates.length)] + // Prefer unchoked + if (peerA.wire.amChoking !== peerB.wire.amChoking) { + return peerA.wire.amChoking ? 1 : -1 + } - if (optimistic) { - optimistic.isChoked = false - self._rechokeOptimisticWire = optimistic.wire - self._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION + // Random order + return peerA.salt - peerB.salt } } - // Unchoke best peers - peers.forEach(function (peer) { - if (peer.wire.amChoking !== peer.isChoked) { - if (peer.isChoked) peer.wire.choke() - else peer.wire.unchoke() - } - }) + /** + * Attempts to cancel a slow block request from another wire such that the + * given wire may effectively swap out the request for one of its own. + */ + _hotswap (wire, index) { + const self = this - function rechokeSort (peerA, peerB) { - // Prefer higher download speed - if (peerA.downloadSpeed !== peerB.downloadSpeed) { - return peerB.downloadSpeed - peerA.downloadSpeed - } - - // Prefer higher upload speed - if (peerA.uploadSpeed !== peerB.uploadSpeed) { - return peerB.uploadSpeed - peerA.uploadSpeed - } + const speed = wire.downloadSpeed() + if (speed < Piece.BLOCK_LENGTH) return false + if (!self._reservations[index]) return false - // Prefer unchoked - if (peerA.wire.amChoking !== peerB.wire.amChoking) { - return peerA.wire.amChoking ? 1 : -1 + const r = self._reservations[index] + if (!r) { + return false } - // Random order - return peerA.salt - peerB.salt - } -} - -/** - * Attempts to cancel a slow block request from another wire such that the - * given wire may effectively swap out the request for one of its own. - */ -Torrent.prototype._hotswap = function (wire, index) { - var self = this + let minSpeed = Infinity + let minWire - var speed = wire.downloadSpeed() - if (speed < Piece.BLOCK_LENGTH) return false - if (!self._reservations[index]) return false - - var r = self._reservations[index] - if (!r) { - return false - } + let i + for (i = 0; i < r.length; i++) { + const otherWire = r[i] + if (!otherWire || otherWire === wire) continue - var minSpeed = Infinity - var minWire + const otherSpeed = otherWire.downloadSpeed() + if (otherSpeed >= SPEED_THRESHOLD) continue + if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue - var i - for (i = 0; i < r.length; i++) { - var otherWire = r[i] - if (!otherWire || otherWire === wire) continue + minWire = otherWire + minSpeed = otherSpeed + } - var otherSpeed = otherWire.downloadSpeed() - if (otherSpeed >= SPEED_THRESHOLD) continue - if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue + if (!minWire) return false - minWire = otherWire - minSpeed = otherSpeed - } + for (i = 0; i < r.length; i++) { + if (r[i] === minWire) r[i] = null + } - if (!minWire) return false + for (i = 0; i < minWire.requests.length; i++) { + const req = minWire.requests[i] + if (req.piece !== index) continue - for (i = 0; i < r.length; i++) { - if (r[i] === minWire) r[i] = null - } - - for (i = 0; i < minWire.requests.length; i++) { - var req = minWire.requests[i] - if (req.piece !== index) continue + self.pieces[index].cancel((req.offset / Piece.BLOCK_LENGTH) | 0) + } - self.pieces[index].cancel((req.offset / Piece.BLOCK_LENGTH) | 0) + self.emit('hotswap', minWire, wire, index) + return true } - self.emit('hotswap', minWire, wire, index) - return true -} + /** + * Attempts to request a block from the given wire. + */ + _request (wire, index, hotswap) { + const self = this + const numRequests = wire.requests.length + const isWebSeed = wire.type === 'webSeed' -/** - * Attempts to request a block from the given wire. - */ -Torrent.prototype._request = function (wire, index, hotswap) { - var self = this - var numRequests = wire.requests.length - var isWebSeed = wire.type === 'webSeed' + if (self.bitfield.get(index)) return false - if (self.bitfield.get(index)) return false + const maxOutstandingRequests = isWebSeed + ? Math.min( + getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength), + self.maxWebConns + ) + : getBlockPipelineLength(wire, PIPELINE_MAX_DURATION) - var maxOutstandingRequests = isWebSeed - ? Math.min( - getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength), - self.maxWebConns - ) - : getBlockPipelineLength(wire, PIPELINE_MAX_DURATION) + if (numRequests >= maxOutstandingRequests) return false + // var endGame = (wire.requests.length === 0 && self.store.numMissing < 30) - if (numRequests >= maxOutstandingRequests) return false - // var endGame = (wire.requests.length === 0 && self.store.numMissing < 30) + const piece = self.pieces[index] + let reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve() - var piece = self.pieces[index] - var reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve() + if (reservation === -1 && hotswap && self._hotswap(wire, index)) { + reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve() + } + if (reservation === -1) return false - if (reservation === -1 && hotswap && self._hotswap(wire, index)) { - reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve() - } - if (reservation === -1) return false + let r = self._reservations[index] + if (!r) r = self._reservations[index] = [] + let i = r.indexOf(null) + if (i === -1) i = r.length + r[i] = wire - var r = self._reservations[index] - if (!r) r = self._reservations[index] = [] - var i = r.indexOf(null) - if (i === -1) i = r.length - r[i] = wire + const chunkOffset = piece.chunkOffset(reservation) + const chunkLength = isWebSeed ? piece.chunkLengthRemaining(reservation) : piece.chunkLength(reservation) - var chunkOffset = piece.chunkOffset(reservation) - var chunkLength = isWebSeed ? piece.chunkLengthRemaining(reservation) : piece.chunkLength(reservation) + wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) { + if (self.destroyed) return - wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) { - if (self.destroyed) return + // TODO: what is this for? + if (!self.ready) return self.once('ready', () => { onChunk(err, chunk) }) - // TODO: what is this for? - if (!self.ready) return self.once('ready', function () { onChunk(err, chunk) }) + if (r[i] === wire) r[i] = null - if (r[i] === wire) r[i] = null + if (piece !== self.pieces[index]) return onUpdateTick() - if (piece !== self.pieces[index]) return onUpdateTick() + if (err) { + self._debug( + 'error getting piece %s (offset: %s length: %s) from %s: %s', + index, chunkOffset, chunkLength, `${wire.remoteAddress}:${wire.remotePort}`, + err.message + ) + isWebSeed ? piece.cancelRemaining(reservation) : piece.cancel(reservation) + onUpdateTick() + return + } - if (err) { self._debug( - 'error getting piece %s (offset: %s length: %s) from %s: %s', - index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort, - err.message + 'got piece %s (offset: %s length: %s) from %s', + index, chunkOffset, chunkLength, `${wire.remoteAddress}:${wire.remotePort}` ) - isWebSeed ? piece.cancelRemaining(reservation) : piece.cancel(reservation) - onUpdateTick() - return - } - - self._debug( - 'got piece %s (offset: %s length: %s) from %s', - index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort - ) - if (!piece.set(reservation, chunk, wire)) return onUpdateTick() + if (!piece.set(reservation, chunk, wire)) return onUpdateTick() - var buf = piece.flush() + const buf = piece.flush() - // TODO: might need to set self.pieces[index] = null here since sha1 is async + // TODO: might need to set self.pieces[index] = null here since sha1 is async - sha1(buf, function (hash) { - if (self.destroyed) return + sha1(buf, hash => { + if (self.destroyed) return - if (hash === self._hashes[index]) { - if (!self.pieces[index]) return - self._debug('piece verified %s', index) + if (hash === self._hashes[index]) { + if (!self.pieces[index]) return + self._debug('piece verified %s', index) - self.pieces[index] = null - self._reservations[index] = null - self.bitfield.set(index, true) + self.pieces[index] = null + self._reservations[index] = null + self.bitfield.set(index, true) - self.store.put(index, buf) + self.store.put(index, buf) - self.wires.forEach(function (wire) { - wire.have(index) - }) + self.wires.forEach(wire => { + wire.have(index) + }) - // We also check `self.destroyed` since `torrent.destroy()` could have been - // called in the `torrent.on('done')` handler, triggered by `_checkDone()`. - if (self._checkDone() && !self.destroyed) self.discovery.complete() - } else { - self.pieces[index] = new Piece(piece.length) - self.emit('warning', new Error('Piece ' + index + ' failed verification')) - } - onUpdateTick() + // We also check `self.destroyed` since `torrent.destroy()` could have been + // called in the `torrent.on('done')` handler, triggered by `_checkDone()`. + if (self._checkDone() && !self.destroyed) self.discovery.complete() + } else { + self.pieces[index] = new Piece(piece.length) + self.emit('warning', new Error(`Piece ${index} failed verification`)) + } + onUpdateTick() + }) }) - }) - function onUpdateTick () { - process.nextTick(function () { self._update() }) + function onUpdateTick () { + process.nextTick(() => { self._update() }) + } + + return true } - return true -} + _checkDone () { + const self = this + if (self.destroyed) return -Torrent.prototype._checkDone = function () { - var self = this - if (self.destroyed) return - - // are any new files done? - self.files.forEach(function (file) { - if (file.done) return - for (var i = file._startPiece; i <= file._endPiece; ++i) { - if (!self.bitfield.get(i)) return - } - file.done = true - file.emit('done') - self._debug('file done: ' + file.name) - }) - - // is the torrent done? (if all current selections are satisfied, or there are - // no selections, then torrent is done) - var done = true - for (var i = 0; i < self._selections.length; i++) { - var selection = self._selections[i] - for (var piece = selection.from; piece <= selection.to; piece++) { - if (!self.bitfield.get(piece)) { - done = false - break + // are any new files done? + self.files.forEach(file => { + if (file.done) return + for (let i = file._startPiece; i <= file._endPiece; ++i) { + if (!self.bitfield.get(i)) return + } + file.done = true + file.emit('done') + self._debug(`file done: ${file.name}`) + }) + + // is the torrent done? (if all current selections are satisfied, or there are + // no selections, then torrent is done) + let done = true + for (let i = 0; i < self._selections.length; i++) { + const selection = self._selections[i] + for (let piece = selection.from; piece <= selection.to; piece++) { + if (!self.bitfield.get(piece)) { + done = false + break + } } + if (!done) break } - if (!done) break - } - if (!self.done && done) { - self.done = true - self._debug('torrent done: ' + self.infoHash) - self.emit('done') + if (!self.done && done) { + self.done = true + self._debug(`torrent done: ${self.infoHash}`) + self.emit('done') + } + self._gcSelections() + + return done } - self._gcSelections() - return done -} + load (streams, cb) { + const self = this + if (self.destroyed) throw new Error('torrent is destroyed') + if (!self.ready) return self.once('ready', () => { self.load(streams, cb) }) -Torrent.prototype.load = function (streams, cb) { - var self = this - if (self.destroyed) throw new Error('torrent is destroyed') - if (!self.ready) return self.once('ready', function () { self.load(streams, cb) }) + if (!Array.isArray(streams)) streams = [ streams ] + if (!cb) cb = noop - if (!Array.isArray(streams)) streams = [ streams ] - if (!cb) cb = noop + const readable = new MultiStream(streams) + const writable = new ChunkStoreWriteStream(self.store, self.pieceLength) - var readable = new MultiStream(streams) - var writable = new ChunkStoreWriteStream(self.store, self.pieceLength) + pump(readable, writable, err => { + if (err) return cb(err) + self._markAllVerified() + self._checkDone() + cb(null) + }) + } - pump(readable, writable, function (err) { - if (err) return cb(err) - self._markAllVerified() - self._checkDone() - cb(null) - }) -} + createServer (requestListener) { + if (typeof Server !== 'function') throw new Error('node.js-only method') + if (this.destroyed) throw new Error('torrent is destroyed') + const server = new Server(this, requestListener) + this._servers.push(server) + return server + } -Torrent.prototype.createServer = function (requestListener) { - if (typeof Server !== 'function') throw new Error('node.js-only method') - if (this.destroyed) throw new Error('torrent is destroyed') - var server = new Server(this, requestListener) - this._servers.push(server) - return server -} + pause () { + if (this.destroyed) return + this._debug('pause') + this.paused = true + } -Torrent.prototype.pause = function () { - if (this.destroyed) return - this._debug('pause') - this.paused = true -} + resume () { + if (this.destroyed) return + this._debug('resume') + this.paused = false + this._drain() + } -Torrent.prototype.resume = function () { - if (this.destroyed) return - this._debug('resume') - this.paused = false - this._drain() -} + _debug () { + const args = [].slice.call(arguments) + args[0] = `[${this.client._debugId}] [${this._debugId}] ${args[0]}` + debug(...args) + } -Torrent.prototype._debug = function () { - var args = [].slice.call(arguments) - args[0] = '[' + this.client._debugId + '] [' + this._debugId + '] ' + args[0] - debug.apply(null, args) -} + /** + * Pop a peer off the FIFO queue and connect to it. When _drain() gets called, + * the queue will usually have only one peer in it, except when there are too + * many peers (over `this.maxConns`) in which case they will just sit in the + * queue until another connection closes. + */ + _drain () { + const self = this + this._debug('_drain numConns %s maxConns %s', self._numConns, self.client.maxConns) + if (typeof net.connect !== 'function' || self.destroyed || self.paused || + self._numConns >= self.client.maxConns) { + return + } + this._debug('drain (%s queued, %s/%s peers)', self._numQueued, self.numPeers, self.client.maxConns) -/** - * Pop a peer off the FIFO queue and connect to it. When _drain() gets called, - * the queue will usually have only one peer in it, except when there are too - * many peers (over `this.maxConns`) in which case they will just sit in the - * queue until another connection closes. - */ -Torrent.prototype._drain = function () { - var self = this - this._debug('_drain numConns %s maxConns %s', self._numConns, self.client.maxConns) - if (typeof net.connect !== 'function' || self.destroyed || self.paused || - self._numConns >= self.client.maxConns) { - return - } - this._debug('drain (%s queued, %s/%s peers)', self._numQueued, self.numPeers, self.client.maxConns) + const peer = self._queue.shift() + if (!peer) return // queue could be empty - var peer = self._queue.shift() - if (!peer) return // queue could be empty + this._debug('tcp connect attempt to %s', peer.addr) - this._debug('tcp connect attempt to %s', peer.addr) + const parts = addrToIPPort(peer.addr) + const opts = { + host: parts[0], + port: parts[1] + } - var parts = addrToIPPort(peer.addr) - var opts = { - host: parts[0], - port: parts[1] - } + const conn = peer.conn = net.connect(opts) - var conn = peer.conn = net.connect(opts) + conn.once('connect', () => { peer.onConnect() }) + conn.once('error', err => { peer.destroy(err) }) + peer.startConnectTimeout() - conn.once('connect', function () { peer.onConnect() }) - conn.once('error', function (err) { peer.destroy(err) }) - peer.startConnectTimeout() + // When connection closes, attempt reconnect after timeout (with exponential backoff) + conn.on('close', () => { + if (self.destroyed) return - // When connection closes, attempt reconnect after timeout (with exponential backoff) - conn.on('close', function () { - if (self.destroyed) return + // TODO: If torrent is done, do not try to reconnect after a timeout - // TODO: If torrent is done, do not try to reconnect after a timeout + if (peer.retries >= RECONNECT_WAIT.length) { + self._debug( + 'conn %s closed: will not re-add (max %s attempts)', + peer.addr, RECONNECT_WAIT.length + ) + return + } - if (peer.retries >= RECONNECT_WAIT.length) { + const ms = RECONNECT_WAIT[peer.retries] self._debug( - 'conn %s closed: will not re-add (max %s attempts)', - peer.addr, RECONNECT_WAIT.length + 'conn %s closed: will re-add to queue in %sms (attempt %s)', + peer.addr, ms, peer.retries + 1 ) - return - } - - var ms = RECONNECT_WAIT[peer.retries] - self._debug( - 'conn %s closed: will re-add to queue in %sms (attempt %s)', - peer.addr, ms, peer.retries + 1 - ) - var reconnectTimeout = setTimeout(function reconnectTimeout () { - var newPeer = self._addPeer(peer.addr) - if (newPeer) newPeer.retries = peer.retries + 1 - }, ms) - if (reconnectTimeout.unref) reconnectTimeout.unref() - }) -} + const reconnectTimeout = setTimeout(function reconnectTimeout () { + const newPeer = self._addPeer(peer.addr) + if (newPeer) newPeer.retries = peer.retries + 1 + }, ms) + if (reconnectTimeout.unref) reconnectTimeout.unref() + }) + } -/** - * Returns `true` if string is valid IPv4/6 address. - * @param {string} addr - * @return {boolean} - */ -Torrent.prototype._validAddr = function (addr) { - var parts - try { - parts = addrToIPPort(addr) - } catch (e) { - return false + /** + * Returns `true` if string is valid IPv4/6 address. + * @param {string} addr + * @return {boolean} + */ + _validAddr (addr) { + let parts + try { + parts = addrToIPPort(addr) + } catch (e) { + return false + } + const host = parts[0] + const port = parts[1] + return port > 0 && port < 65535 && + !(host === '127.0.0.1' && port === this.client.torrentPort) } - var host = parts[0] - var port = parts[1] - return port > 0 && port < 65535 && - !(host === '127.0.0.1' && port === this.client.torrentPort) } function getBlockPipelineLength (wire, duration) { @@ -1770,3 +1723,5 @@ function randomInt (high) { } function noop () {} + +module.exports = Torrent |