/* global URL, Blob */ module.exports = Torrent var addrToIPPort = require('addr-to-ip-port') var BitField = require('bitfield') var ChunkStoreWriteStream = require('chunk-store-stream/write') var debug = require('debug')('webtorrent:torrent') var Discovery = require('torrent-discovery') var EventEmitter = require('events').EventEmitter var extend = require('xtend') var extendMutable = require('xtend/mutable') var fs = require('fs') var FSChunkStore = require('fs-chunk-store') // browser: `memory-chunk-store` var ImmediateChunkStore = require('immediate-chunk-store') var inherits = require('inherits') var MultiStream = require('multistream') var net = require('net') // browser exclude var os = require('os') // browser exclude var parallel = require('run-parallel') var parallelLimit = require('run-parallel-limit') var parseTorrent = require('parse-torrent') var path = require('path') var Piece = require('torrent-piece') var pump = require('pump') var randomIterate = require('random-iterate') var sha1 = require('simple-sha1') var speedometer = require('speedometer') var uniq = require('uniq') var utMetadata = require('ut_metadata') var utPex = require('ut_pex') // browser exclude var File = require('./file') var Peer = require('./peer') var RarityMap = require('./rarity-map') var Server = require('./server') // browser exclude var MAX_BLOCK_LENGTH = 128 * 1024 var PIECE_TIMEOUT = 30000 var CHOKE_TIMEOUT = 5000 var SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH var PIPELINE_MIN_DURATION = 0.5 var PIPELINE_MAX_DURATION = 1 var RECHOKE_INTERVAL = 10000 // 10 seconds var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds var FILESYSTEM_CONCURRENCY = 2 var RECONNECT_WAIT = [ 1000, 5000, 15000 ] var TMP try { TMP = path.join(fs.statSync('/tmp') && '/tmp', 'webtorrent') } catch (err) { TMP = path.join(typeof os.tmpDir === 'function' ? os.tmpDir() : '/', 'webtorrent') } inherits(Torrent, EventEmitter) function Torrent (torrentId, client, opts) { EventEmitter.call(this) this.client = client this._debugId = this.client.peerId.slice(32) this._debug('new torrent') this.announce = opts.announce this.urlList = opts.urlList this.path = opts.path this._store = opts.store || FSChunkStore this._getAnnounceOpts = opts.getAnnounceOpts this.strategy = opts.strategy || 'sequential' this.maxWebConns = opts.maxWebConns || 4 this._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0) ? 0 : (+opts.uploads || 10) this._rechokeOptimisticWire = null this._rechokeOptimisticTime = 0 this._rechokeIntervalId = null this.ready = false this.destroyed = false this.paused = false this.done = false this.metadata = null this.store = null this.files = [] this.pieces = [] this._amInterested = false this._selections = [] this._critical = [] this.wires = [] // open wires (added *after* handshake) this._queue = [] // queue of outgoing tcp peers to connect to this._peers = {} // connected peers (addr/peerId -> Peer) this._peersLength = 0 // number of elements in `this._peers` (cache, for perf) // stats this.received = 0 this.uploaded = 0 this._downloadSpeed = speedometer() this._uploadSpeed = speedometer() // for cleanup this._servers = [] // TODO: remove this and expose a hook instead // optimization: don't recheck every file if it hasn't changed this._fileModtimes = opts.fileModtimes if (torrentId !== null) this._onTorrentId(torrentId) } Object.defineProperty(Torrent.prototype, 'timeRemaining', { get: function () { if (this.done) return 0 if (this.downloadSpeed === 0) return Infinity return ((this.length - this.downloaded) / this.downloadSpeed) * 1000 } }) Object.defineProperty(Torrent.prototype, 'downloaded', { get: function () { if (!this.bitfield) return 0 var downloaded = 0 for (var index = 0, len = this.pieces.length; index < len; ++index) { if (this.bitfield.get(index)) { // verified data downloaded += (index === len - 1) ? this.lastPieceLength : this.pieceLength } else { // "in progress" data var piece = this.pieces[index] downloaded += (piece.length - piece.missing) } } return downloaded } }) // TODO: re-enable this. The number of missing pieces. Used to implement 'end game' mode. // Object.defineProperty(Storage.prototype, 'numMissing', { // get: function () { // var self = this // var numMissing = self.pieces.length // for (var index = 0, len = self.pieces.length; index < len; index++) { // numMissing -= self.bitfield.get(index) // } // return numMissing // } // }) Object.defineProperty(Torrent.prototype, 'downloadSpeed', { get: function () { return this._downloadSpeed() } }) Object.defineProperty(Torrent.prototype, 'uploadSpeed', { get: function () { return this._uploadSpeed() } }) Object.defineProperty(Torrent.prototype, 'progress', { get: function () { return this.length ? this.downloaded / this.length : 0 } }) Object.defineProperty(Torrent.prototype, 'ratio', { get: function () { return this.uploaded / (this.received || 1) } }) Object.defineProperty(Torrent.prototype, 'numPeers', { get: function () { return this.wires.length } }) Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', { get: function () { if (typeof window === 'undefined') throw new Error('browser-only property') if (!this.torrentFile) return null return URL.createObjectURL( new Blob([ this.torrentFile ], { type: 'application/x-bittorrent' }) ) } }) Object.defineProperty(Torrent.prototype, '_numQueued', { get: function () { return this._queue.length + (this._peersLength - this._numConns) } }) Object.defineProperty(Torrent.prototype, '_numConns', { get: function () { var self = this var numConns = 0 for (var id in self._peers) { if (self._peers[id].connected) numConns += 1 } return numConns } }) // TODO: remove in v1 Object.defineProperty(Torrent.prototype, 'swarm', { get: function () { console.warn('WebTorrent: `torrent.swarm` is deprecated. Use `torrent` directly instead.') return this } }) Torrent.prototype._onTorrentId = function (torrentId) { var self = this if (self.destroyed) return var parsedTorrent try { parsedTorrent = parseTorrent(torrentId) } catch (err) {} if (parsedTorrent) { // Attempt to set infoHash property synchronously self.infoHash = parsedTorrent.infoHash process.nextTick(function () { if (self.destroyed) return self._onParsedTorrent(parsedTorrent) }) } else { // If torrentId failed to parse, it could be in a form that requires an async // operation, i.e. http/https link, filesystem path, or Blob. parseTorrent.remote(torrentId, function (err, parsedTorrent) { if (self.destroyed) return if (err) return self._destroy(err) self._onParsedTorrent(parsedTorrent) }) } } Torrent.prototype._onParsedTorrent = function (parsedTorrent) { var self = this if (self.destroyed) return self._processParsedTorrent(parsedTorrent) if (!self.infoHash) { return self._destroy(new Error('Malformed torrent data: No info hash')) } if (!self.path) self.path = path.join(TMP, self.infoHash) self._rechokeIntervalId = setInterval(function () { self._rechoke() }, RECHOKE_INTERVAL) if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref() self.emit('infoHash', self.infoHash) if (self.destroyed) return // user might destroy torrent in `infoHash` event handler if (self.client.listening) { self._onListening() } else { self.client.once('listening', function () { self._onListening() }) } } Torrent.prototype._processParsedTorrent = function (parsedTorrent) { if (this.announce) { // Allow specifying trackers via `opts` parameter parsedTorrent.announce = parsedTorrent.announce.concat(this.announce) } if (this.client.tracker && global.WEBTORRENT_ANNOUNCE && !this.private) { // So `webtorrent-hybrid` can force specific trackers to be used parsedTorrent.announce = parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE) } if (this.urlList) { // Allow specifying web seeds via `opts` parameter parsedTorrent.urlList = parsedTorrent.urlList.concat(this.urlList) } uniq(parsedTorrent.announce) uniq(parsedTorrent.urlList) extendMutable(this, parsedTorrent) this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent) this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent) } Torrent.prototype._onListening = function () { var self = this if (self.discovery || self.destroyed) return var trackerOpts = self.client.tracker if (trackerOpts) { trackerOpts = extend(self.client.tracker, { getAnnounceOpts: function () { var opts = { uploaded: self.uploaded, downloaded: self.downloaded, left: Math.max(self.length - self.downloaded, 0) } if (self.client.tracker.getAnnounceOpts) { extendMutable(opts, self.client.tracker.getAnnounceOpts()) } if (self._getAnnounceOpts) { // TODO: consider deprecating this, as it's redundant with the former case extendMutable(opts, self._getAnnounceOpts()) } return opts } }) } // begin discovering peers via DHT and trackers self.discovery = new Discovery({ infoHash: self.infoHash, announce: self.announce, peerId: self.client.peerId, dht: !self.private && self.client.dht, tracker: trackerOpts, port: self.client.torrentPort }) self.discovery.on('error', onError) self.discovery.on('peer', onPeer) self.discovery.on('trackerAnnounce', onTrackerAnnounce) self.discovery.on('dhtAnnounce', onDHTAnnounce) self.discovery.on('warning', onWarning) function onError (err) { self._destroy(err) } function onPeer (peer) { // Don't create new outgoing TCP connections when torrent is done if (typeof peer === 'string' && self.done) return self.addPeer(peer) } function onTrackerAnnounce () { self.emit('trackerAnnounce') if (self.numPeers === 0) self.emit('noPeers', 'tracker') } function onDHTAnnounce () { self.emit('dhtAnnounce') if (self.numPeers === 0) self.emit('noPeers', 'dht') } function onWarning (err) { self.emit('warning', err) } // if full metadata was included in initial torrent id, use it immediately. Otherwise, // wait for torrent-discovery to find peers and ut_metadata to get the metadata. if (self.info) self._onMetadata(self) } /** * Called when the full torrent metadata is received. */ Torrent.prototype._onMetadata = function (metadata) { var self = this if (self.metadata || self.destroyed) return self._debug('got metadata') var parsedTorrent if (metadata && metadata.infoHash) { // `metadata` is a parsed torrent (from parse-torrent module) parsedTorrent = metadata } else { try { parsedTorrent = parseTorrent(metadata) } catch (err) { return self._destroy(err) } } self._processParsedTorrent(parsedTorrent) self.metadata = self.torrentFile // add web seed urls (BEP19) self.urlList.forEach(function (url) { self.addWebSeed(url) }) self._rarityMap = new RarityMap(self) self.store = new ImmediateChunkStore( new self._store(self.pieceLength, { torrent: { infoHash: self.infoHash }, files: self.files.map(function (file) { return { path: path.join(self.path, file.path), length: file.length, offset: file.offset } }), length: self.length }) ) self.files = self.files.map(function (file) { return new File(self, file) }) self._hashes = self.pieces self.pieces = self.pieces.map(function (hash, i) { var pieceLength = (i === self.pieces.length - 1) ? self.lastPieceLength : self.pieceLength return new Piece(pieceLength) }) self._reservations = self.pieces.map(function () { return [] }) self.bitfield = new BitField(self.pieces.length) self.wires.forEach(function (wire) { // If we didn't have the metadata at the time ut_metadata was initialized for this // wire, we still want to make it available to the peer in case they request it. if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata) self._onWireWithMetadata(wire) }) self._debug('verifying existing torrent data') if (self._fileModtimes && self._store === FSChunkStore) { // don't verify if the files haven't been modified since we last checked self.getFileModtimes(function (err, fileModtimes) { if (err) return self._destroy(err) var unchanged = self.files.map(function (_, index) { return fileModtimes[index] === self._fileModtimes[index] }).every(function (x) { return x }) if (unchanged) { for (var index = 0; index < self.pieces.length; index++) { self._markVerified(index) } self._onStore() } else { self._verifyPieces() } }) } else { self._verifyPieces() } self.emit('metadata') } /* * TODO: remove this * Gets the last modified time of every file on disk for this torrent. * Only valid in Node, not in the browser. */ Torrent.prototype.getFileModtimes = function (cb) { var self = this var ret = [] parallelLimit(self.files.map(function (file, index) { return function (cb) { fs.stat(path.join(self.path, file.path), function (err, stat) { if (err && err.code !== 'ENOENT') return cb(err) ret[index] = stat && stat.mtime.getTime() cb(null) }) } }), FILESYSTEM_CONCURRENCY, function (err) { self._debug('done getting file modtimes') cb(err, ret) }) } Torrent.prototype._verifyPieces = function () { var self = this parallelLimit(self.pieces.map(function (_, index) { return function (cb) { if (self.destroyed) return cb(new Error('torrent is destroyed')) self.store.get(index, function (err, buf) { if (err) return cb(null) // ignore error sha1(buf, function (hash) { if (hash === self._hashes[index]) { if (!self.pieces[index]) return self._debug('piece verified %s', index) self._markVerified(index) } else { self._debug('piece invalid %s', index) } cb(null) }) }) } }), FILESYSTEM_CONCURRENCY, function (err) { if (err) return self._destroy(err) self._debug('done verifying') self._onStore() }) } Torrent.prototype._markVerified = function (index) { this.pieces[index] = null this._reservations[index] = null this.bitfield.set(index, true) } /** * Called when the metadata, listening server, and underlying chunk store is initialized. */ Torrent.prototype._onStore = function () { var self = this if (self.destroyed) return self._debug('on store') // start off selecting the entire torrent with low priority self.select(0, self.pieces.length - 1, false) self.ready = true self.emit('ready') // Files may start out done if the file was already in the store self._checkDone() // In case any selections were made before torrent was ready self._updateSelections() } Torrent.prototype.destroy = function (cb) { var self = this self._destroy(null, cb) } Torrent.prototype._destroy = function (err, cb) { var self = this if (self.destroyed) return self.destroyed = true self._debug('destroy') self.client._remove(self) clearInterval(self._rechokeIntervalId) if (self._rarityMap) { self._rarityMap.destroy() } for (var id in self._peers) { self.removePeer(id) } self.files.forEach(function (file) { if (file instanceof File) file._destroy() }) var tasks = self._servers.map(function (server) { return function (cb) { server.destroy(cb) } }) if (self.discovery) { tasks.push(function (cb) { self.discovery.destroy(cb) }) } if (self.store) { tasks.push(function (cb) { self.store.close(cb) }) } parallel(tasks, cb) if (err) { // Torrent errors are emitted at `torrent.on('error')`. If there are no 'error' event // handlers on the torrent instance, the error will be emitted at // `client.on('error')`. This prevents crashing the user's program, but it makes it // impossible to determine a client error versus a torrent error (where the client // is still usable afterwards). Users are recommended for errors in both places // to distinguish between the error types. if (self.listenerCount('error') === 0) { self.client.emit('error', err) } else { self.emit('error', err) } } self.emit('close') self.client = null self.files = [] self.discovery = null self.store = null self._rarityMap = null self._peers = null self._servers = null } Torrent.prototype.addPeer = function (peer) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') if (!self.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event') if (self.client.blocked) { var host if (typeof peer === 'string') { var parts try { parts = addrToIPPort(peer) } catch (e) { self._debug('ignoring peer: invalid %s', peer) self.emit('invalidPeer', peer) return false } host = parts[0] } else if (typeof peer.remoteAddress === 'string') { host = peer.remoteAddress } if (host && self.client.blocked.contains(host)) { self._debug('ignoring peer: blocked %s', peer) if (typeof peer !== 'string') peer.destroy() self.emit('blockedPeer', peer) return false } } var wasAdded = !!self._addPeer(peer) if (wasAdded) { self.emit('peer', peer) } else { self.emit('invalidPeer', peer) } return wasAdded } Torrent.prototype._addPeer = function (peer) { var self = this if (self.destroyed) { self._debug('ignoring peer: torrent is destroyed') if (typeof peer !== 'string') peer.destroy() return null } if (typeof peer === 'string' && !self._validAddr(peer)) { self._debug('ignoring peer: invalid %s', peer) return null } var id = (peer && peer.id) || peer if (self._peers[id]) { self._debug('ignoring peer: duplicate (%s)', id) if (typeof peer !== 'string') peer.destroy() return null } if (self.paused) { self._debug('ignoring peer: torrent is paused') if (typeof peer !== 'string') peer.destroy() return null } self._debug('add peer %s', id) var newPeer if (typeof peer === 'string') { // `peer` is an addr ("ip:port" string) newPeer = Peer.createTCPOutgoingPeer(peer, self) } else { // `peer` is a WebRTC connection (simple-peer) newPeer = Peer.createWebRTCPeer(peer, self) } self._peers[newPeer.id] = newPeer self._peersLength += 1 if (typeof peer === 'string') { // `peer` is an addr ("ip:port" string) self._queue.push(newPeer) self._drain() } return newPeer } Torrent.prototype.addWebSeed = function (url) { if (this.destroyed) throw new Error('torrent is destroyed') if (!/^https?:\/\/.+/.test(url)) { this._debug('ignoring invalid web seed %s', url) this.emit('invalidPeer', url) return } if (this._peers[url]) { this._debug('ignoring duplicate web seed %s', url) this.emit('invalidPeer', url) return } this._debug('add web seed %s', url) var newPeer = Peer.createWebSeedPeer(url, this) this._peers[newPeer.id] = newPeer this._peersLength += 1 this.emit('peer', url) } /** * Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a * peer that has already sent a handshake. */ Torrent.prototype._addIncomingPeer = function (peer) { var self = this if (self.destroyed) return peer.destroy(new Error('torrent is destroyed')) if (self.paused) return peer.destroy(new Error('torrent is paused')) this._debug('add incoming peer %s', peer.id) self._peers[peer.id] = peer self._peersLength += 1 } Torrent.prototype.removePeer = function (peer) { var self = this var id = (peer && peer.id) || peer peer = self._peers[id] if (!peer) return this._debug('removePeer %s', id) delete self._peers[id] self._peersLength -= 1 peer.destroy() // If torrent swarm was at capacity before, try to open a new connection now self._drain() } Torrent.prototype.select = function (start, end, priority, notify) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') if (start > end || start < 0 || end >= self.pieces.length) { throw new Error('invalid selection ', start, ':', end) } priority = Number(priority) || 0 self._debug('select %s-%s (priority %s)', start, end, priority) self._selections.push({ from: start, to: end, offset: 0, priority: priority, notify: notify || noop }) self._selections.sort(function (a, b) { return b.priority - a.priority }) self._updateSelections() } Torrent.prototype.deselect = function (start, end, priority) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') priority = Number(priority) || 0 self._debug('deselect %s-%s (priority %s)', start, end, priority) for (var i = 0; i < self._selections.length; ++i) { var s = self._selections[i] if (s.from === start && s.to === end && s.priority === priority) { self._selections.splice(i--, 1) break } } self._updateSelections() } Torrent.prototype.critical = function (start, end) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') self._debug('critical %s-%s', start, end) for (var i = start; i <= end; ++i) { self._critical[i] = true } self._updateSelections() } Torrent.prototype._onWire = function (wire, addr) { var self = this self._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown') wire.on('download', function (downloaded) { if (self.destroyed) return self.received += downloaded self._downloadSpeed(downloaded) self.client._downloadSpeed(downloaded) self.emit('download', downloaded) self.client.emit('download', downloaded) }) wire.on('upload', function (uploaded) { if (self.destroyed) return self.uploaded += uploaded self._uploadSpeed(uploaded) self.client._uploadSpeed(uploaded) self.emit('upload', uploaded) self.client.emit('upload', uploaded) }) self.wires.push(wire) if (addr) { // Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers var parts = addrToIPPort(addr) wire.remoteAddress = parts[0] wire.remotePort = parts[1] } // When peer sends PORT message, add that DHT node to routing table if (self.client.dht && self.client.dht.listening) { wire.on('port', function (port) { if (self.destroyed || self.client.dht.destroyed) { return } if (!wire.remoteAddress) { return self._debug('ignoring PORT from peer with no address') } if (port === 0 || port > 65536) { return self._debug('ignoring invalid PORT from peer') } self._debug('port: %s (from %s)', port, addr) self.client.dht.addNode({ host: wire.remoteAddress, port: port }) }) } wire.on('timeout', function () { self._debug('wire timeout (%s)', addr) // TODO: this might be destroying wires too eagerly wire.destroy() }) // Timeout for piece requests to this peer wire.setTimeout(PIECE_TIMEOUT, true) // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire wire.setKeepAlive(true) // use ut_metadata extension wire.use(utMetadata(self.metadata)) wire.ut_metadata.on('warning', function (err) { self._debug('ut_metadata warning: %s', err.message) }) if (!self.metadata) { wire.ut_metadata.on('metadata', function (metadata) { self._debug('got metadata via ut_metadata') self._onMetadata(metadata) }) wire.ut_metadata.fetch() } // use ut_pex extension if the torrent is not flagged as private if (typeof utPex === 'function' && !self.private) { wire.use(utPex()) wire.ut_pex.on('peer', function (peer) { // Only add potential new peers when we're not seeding if (self.done) return self._debug('ut_pex: got peer: %s (from %s)', peer, addr) self.addPeer(peer) }) wire.ut_pex.on('dropped', function (peer) { // the remote peer believes a given peer has been dropped from the torrent swarm. // if we're not currently connected to it, then remove it from the queue. var peerObj = self._peers[peer] if (peerObj && !peerObj.connected) { self._debug('ut_pex: dropped peer: %s (from %s)', peer, addr) self.removePeer(peer) } }) wire.once('close', function () { // Stop sending updates to remote peer wire.ut_pex.reset() }) } // Hook to allow user-defined `bittorrent-protocol` extensions // More info: https://github.com/feross/bittorrent-protocol#extension-api self.emit('wire', wire, addr) if (self.metadata) { process.nextTick(function () { // This allows wire.handshake() to be called (by Peer.onHandshake) before any // messages get sent on the wire self._onWireWithMetadata(wire) }) } } Torrent.prototype._onWireWithMetadata = function (wire) { var self = this var timeoutId = null function onChokeTimeout () { if (self.destroyed || wire.destroyed) return if (self._numQueued > 2 * (self._numConns - self.numPeers) && wire.amInterested) { wire.destroy() } else { timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) if (timeoutId.unref) timeoutId.unref() } } var i = 0 function updateSeedStatus () { if (wire.peerPieces.length !== self.pieces.length) return for (; i < self.pieces.length; ++i) { if (!wire.peerPieces.get(i)) return } wire.isSeeder = true wire.choke() // always choke seeders } wire.on('bitfield', function () { updateSeedStatus() self._update() }) wire.on('have', function () { updateSeedStatus() self._update() }) wire.once('interested', function () { wire.unchoke() }) wire.once('close', function () { clearTimeout(timeoutId) }) wire.on('choke', function () { clearTimeout(timeoutId) timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) if (timeoutId.unref) timeoutId.unref() }) wire.on('unchoke', function () { clearTimeout(timeoutId) self._update() }) wire.on('request', function (index, offset, length, cb) { if (length > MAX_BLOCK_LENGTH) { // Per spec, disconnect from peers that request >128KB return wire.destroy() } if (self.pieces[index]) return self.store.get(index, { offset: offset, length: length }, cb) }) wire.bitfield(self.bitfield) // always send bitfield (required) wire.interested() // always start out interested // Send PORT message to peers that support DHT if (wire.peerExtensions.dht && self.client.dht && self.client.dht.listening) { wire.port(self.client.dht.address().port) } timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) if (timeoutId.unref) timeoutId.unref() wire.isSeeder = false updateSeedStatus() } /** * Called on selection changes. */ Torrent.prototype._updateSelections = function () { var self = this if (!self.ready || self.destroyed) return process.nextTick(function () { self._gcSelections() }) self._updateInterest() self._update() } /** * Garbage collect selections with respect to the store's current state. */ Torrent.prototype._gcSelections = function () { var self = this for (var i = 0; i < self._selections.length; i++) { var s = self._selections[i] var oldOffset = s.offset // check for newly downloaded pieces in selection while (self.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) { s.offset++ } if (oldOffset !== s.offset) s.notify() if (s.to !== s.from + s.offset) continue if (!self.bitfield.get(s.from + s.offset)) continue // remove fully downloaded selection self._selections.splice(i--, 1) // decrement i to offset splice s.notify() // TODO: this may notify twice in a row. is this a problem? self._updateInterest() } if (!self._selections.length) self.emit('idle') } /** * Update interested status for all peers. */ Torrent.prototype._updateInterest = function () { var self = this var prev = self._amInterested self._amInterested = !!self._selections.length self.wires.forEach(function (wire) { // TODO: only call wire.interested if the wire has at least one piece we need if (self._amInterested) wire.interested() else wire.uninterested() }) if (prev === self._amInterested) return if (self._amInterested) self.emit('interested') else self.emit('uninterested') } /** * Heartbeat to update all peers and their requests. */ Torrent.prototype._update = function () { var self = this if (self.destroyed) return // update wires in random order for better request distribution var ite = randomIterate(self.wires) var wire while ((wire = ite())) { self._updateWire(wire) } } /** * Attempts to update a peer's requests */ Torrent.prototype._updateWire = function (wire) { var self = this if (wire.peerChoking) return if (!wire.downloaded) return validateWire() var minOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MIN_DURATION) if (wire.requests.length >= minOutstandingRequests) return var maxOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MAX_DURATION) trySelectWire(false) || trySelectWire(true) function genPieceFilterFunc (start, end, tried, rank) { return function (i) { return i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i)) } } // TODO: Do we need both validateWire and trySelectWire? function validateWire () { if (wire.requests.length) return var i = self._selections.length while (i--) { var next = self._selections[i] var piece if (self.strategy === 'rarest') { var start = next.from + next.offset var end = next.to var len = end - start + 1 var tried = {} var tries = 0 var filter = genPieceFilterFunc(start, end, tried) while (tries < len) { piece = self._rarityMap.getRarestPiece(filter) if (piece < 0) break if (self._request(wire, piece, false)) return tried[piece] = true tries += 1 } } else { for (piece = next.to; piece >= next.from + next.offset; --piece) { if (!wire.peerPieces.get(piece)) continue if (self._request(wire, piece, false)) return } } } // TODO: wire failed to validate as useful; should we close it? // probably not, since 'have' and 'bitfield' messages might be coming } function speedRanker () { var speed = wire.downloadSpeed() || 1 if (speed > SPEED_THRESHOLD) return function () { return true } var secs = Math.max(1, wire.requests.length) * Piece.BLOCK_LENGTH / speed var tries = 10 var ptr = 0 return function (index) { if (!tries || self.bitfield.get(index)) return true var missing = self.pieces[index].missing for (; ptr < self.wires.length; ptr++) { var otherWire = self.wires[ptr] var otherSpeed = otherWire.downloadSpeed() if (otherSpeed < SPEED_THRESHOLD) continue if (otherSpeed <= speed) continue if (!otherWire.peerPieces.get(index)) continue if ((missing -= otherSpeed * secs) > 0) continue tries-- return false } return true } } function shufflePriority (i) { var last = i for (var j = i; j < self._selections.length && self._selections[j].priority; j++) { last = j } var tmp = self._selections[i] self._selections[i] = self._selections[last] self._selections[last] = tmp } function trySelectWire (hotswap) { if (wire.requests.length >= maxOutstandingRequests) return true var rank = speedRanker() for (var i = 0; i < self._selections.length; i++) { var next = self._selections[i] var piece if (self.strategy === 'rarest') { var start = next.from + next.offset var end = next.to var len = end - start + 1 var tried = {} var tries = 0 var filter = genPieceFilterFunc(start, end, tried, rank) while (tries < len) { piece = self._rarityMap.getRarestPiece(filter) if (piece < 0) break // request all non-reserved blocks in this piece while (self._request(wire, piece, self._critical[piece] || hotswap)) {} if (wire.requests.length < maxOutstandingRequests) { tried[piece] = true tries++ continue } if (next.priority) shufflePriority(i) return true } } else { for (piece = next.from + next.offset; piece <= next.to; piece++) { if (!wire.peerPieces.get(piece) || !rank(piece)) continue // request all non-reserved blocks in piece while (self._request(wire, piece, self._critical[piece] || hotswap)) {} if (wire.requests.length < maxOutstandingRequests) continue if (next.priority) shufflePriority(i) return true } } } return false } } /** * Called periodically to update the choked status of all peers, handling optimistic * unchoking as described in BEP3. */ Torrent.prototype._rechoke = function () { var self = this if (!self.ready) return if (self._rechokeOptimisticTime > 0) self._rechokeOptimisticTime -= 1 else self._rechokeOptimisticWire = null var peers = [] self.wires.forEach(function (wire) { if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) { peers.push({ wire: wire, downloadSpeed: wire.downloadSpeed(), uploadSpeed: wire.uploadSpeed(), salt: Math.random(), isChoked: true }) } }) peers.sort(rechokeSort) var unchokeInterested = 0 var i = 0 for (; i < peers.length && unchokeInterested < self._rechokeNumSlots; ++i) { peers[i].isChoked = false if (peers[i].wire.peerInterested) unchokeInterested += 1 } // Optimistically unchoke a peer if (!self._rechokeOptimisticWire && i < peers.length && self._rechokeNumSlots) { var candidates = peers.slice(i).filter(function (peer) { return peer.wire.peerInterested }) var optimistic = candidates[randomInt(candidates.length)] if (optimistic) { optimistic.isChoked = false self._rechokeOptimisticWire = optimistic.wire self._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION } } // Unchoke best peers peers.forEach(function (peer) { if (peer.wire.amChoking !== peer.isChoked) { if (peer.isChoked) peer.wire.choke() else peer.wire.unchoke() } }) function rechokeSort (peerA, peerB) { // Prefer higher download speed if (peerA.downloadSpeed !== peerB.downloadSpeed) { return peerB.downloadSpeed - peerA.downloadSpeed } // Prefer higher upload speed if (peerA.uploadSpeed !== peerB.uploadSpeed) { return peerB.uploadSpeed - peerA.uploadSpeed } // Prefer unchoked if (peerA.wire.amChoking !== peerB.wire.amChoking) { return peerA.wire.amChoking ? 1 : -1 } // Random order return peerA.salt - peerB.salt } } /** * Attempts to cancel a slow block request from another wire such that the * given wire may effectively swap out the request for one of its own. */ Torrent.prototype._hotswap = function (wire, index) { var self = this var speed = wire.downloadSpeed() if (speed < Piece.BLOCK_LENGTH) return false if (!self._reservations[index]) return false var r = self._reservations[index] if (!r) { return false } var minSpeed = Infinity var minWire var i for (i = 0; i < r.length; i++) { var otherWire = r[i] if (!otherWire || otherWire === wire) continue var otherSpeed = otherWire.downloadSpeed() if (otherSpeed >= SPEED_THRESHOLD) continue if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue minWire = otherWire minSpeed = otherSpeed } if (!minWire) return false for (i = 0; i < r.length; i++) { if (r[i] === minWire) r[i] = null } for (i = 0; i < minWire.requests.length; i++) { var req = minWire.requests[i] if (req.piece !== index) continue self.pieces[index].cancel((req.offset / Piece.BLOCK_LENGTH) | 0) } self.emit('hotswap', minWire, wire, index) return true } /** * Attempts to request a block from the given wire. */ Torrent.prototype._request = function (wire, index, hotswap) { var self = this var numRequests = wire.requests.length var isWebSeed = wire.type === 'webSeed' if (self.bitfield.get(index)) return false var maxOutstandingRequests = isWebSeed ? Math.min( getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength), self.maxWebConns ) : getBlockPipelineLength(wire, PIPELINE_MAX_DURATION) if (numRequests >= maxOutstandingRequests) return false // var endGame = (wire.requests.length === 0 && self.store.numMissing < 30) var piece = self.pieces[index] var reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve() if (reservation === -1 && hotswap && self._hotswap(wire, index)) { reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve() } if (reservation === -1) return false var r = self._reservations[index] if (!r) r = self._reservations[index] = [] var i = r.indexOf(null) if (i === -1) i = r.length r[i] = wire var chunkOffset = piece.chunkOffset(reservation) var chunkLength = isWebSeed ? piece.chunkLengthRemaining(reservation) : piece.chunkLength(reservation) wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) { // TODO: what is this for? if (!self.ready) return self.once('ready', function () { onChunk(err, chunk) }) if (r[i] === wire) r[i] = null if (piece !== self.pieces[index]) return onUpdateTick() if (err) { self._debug( 'error getting piece %s (offset: %s length: %s) from %s: %s', index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort, err.message ) isWebSeed ? piece.cancelRemaining(reservation) : piece.cancel(reservation) onUpdateTick() return } self._debug( 'got piece %s (offset: %s length: %s) from %s', index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort ) if (!piece.set(reservation, chunk, wire)) return onUpdateTick() var buf = piece.flush() // TODO: might need to set self.pieces[index] = null here since sha1 is async sha1(buf, function (hash) { if (hash === self._hashes[index]) { if (!self.pieces[index]) return self._debug('piece verified %s', index) self.pieces[index] = null self._reservations[index] = null self.bitfield.set(index, true) self.store.put(index, buf) self.wires.forEach(function (wire) { wire.have(index) }) self._checkDone() } else { self.pieces[index] = new Piece(piece.length) self.emit('warning', new Error('Piece ' + index + ' failed verification')) } onUpdateTick() }) }) function onUpdateTick () { process.nextTick(function () { self._update() }) } return true } Torrent.prototype._checkDone = function () { var self = this if (self.destroyed) return // are any new files done? self.files.forEach(function (file) { if (file.done) return for (var i = file._startPiece; i <= file._endPiece; ++i) { if (!self.bitfield.get(i)) return } file.done = true file.emit('done') self._debug('file done: ' + file.name) }) // is the torrent done? (if all current selections are satisfied, or there are // no selections, then torrent is done) var done = true for (var i = 0; i < self._selections.length; i++) { var selection = self._selections[i] for (var piece = selection.from; piece <= selection.to; piece++) { if (!self.bitfield.get(piece)) { done = false break } } if (!done) break } if (!self.done && done) { self.done = true self._debug('torrent done: ' + self.infoHash) if (self.discovery.tracker) { self.discovery.tracker.complete() } self.emit('done') } self._gcSelections() } Torrent.prototype.load = function (streams, cb) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') if (!self.ready) return self.once('ready', function () { self.load(streams, cb) }) if (!Array.isArray(streams)) streams = [ streams ] if (!cb) cb = noop var readable = new MultiStream(streams) var writable = new ChunkStoreWriteStream(self.store, self.pieceLength) pump(readable, writable, function (err) { if (err) return cb(err) self.pieces.forEach(function (piece, index) { self.pieces[index] = null self._reservations[index] = null self.bitfield.set(index, true) }) self._checkDone() cb(null) }) } Torrent.prototype.createServer = function (opts) { if (typeof Server !== 'function') throw new Error('node.js-only method') if (this.destroyed) throw new Error('torrent is destroyed') var server = new Server(this, opts) this._servers.push(server) return server } Torrent.prototype.pause = function () { if (this.destroyed) return this._debug('pause') this.paused = true } Torrent.prototype.resume = function () { if (this.destroyed) return this._debug('resume') this.paused = false this._drain() } Torrent.prototype._debug = function () { var args = [].slice.call(arguments) args[0] = '[' + this._debugId + '] ' + args[0] debug.apply(null, args) } /** * Pop a peer off the FIFO queue and connect to it. When _drain() gets called, * the queue will usually have only one peer in it, except when there are too * many peers (over `this.maxConns`) in which case they will just sit in the * queue until another connection closes. */ Torrent.prototype._drain = function () { var self = this this._debug('_drain numConns %s maxConns %s', self._numConns, self.client.maxConns) if (typeof net.connect !== 'function' || self.destroyed || self.paused || self._numConns >= self.client.maxConns) { return } this._debug('drain (%s queued, %s/%s peers)', self._numQueued, self.numPeers, self.client.maxConns) var peer = self._queue.shift() if (!peer) return // queue could be empty this._debug('tcp connect attempt to %s', peer.addr) var parts = addrToIPPort(peer.addr) var opts = { host: parts[0], port: parts[1] } var conn = peer.conn = net.connect(opts) conn.once('connect', function () { peer.onConnect() }) conn.once('error', function (err) { peer.destroy(err) }) peer.startConnectTimeout() // When connection closes, attempt reconnect after timeout (with exponential backoff) conn.on('close', function () { if (self.destroyed) return // TODO: If torrent is done, do not try to reconnect after a timeout if (peer.retries >= RECONNECT_WAIT.length) { self._debug( 'conn %s closed: will not re-add (max %s attempts)', peer.addr, RECONNECT_WAIT.length ) return } var ms = RECONNECT_WAIT[peer.retries] self._debug( 'conn %s closed: will re-add to queue in %sms (attempt %s)', peer.addr, ms, peer.retries + 1 ) var reconnectTimeout = setTimeout(function reconnectTimeout () { var newPeer = self._addPeer(peer.addr) if (newPeer) newPeer.retries = peer.retries + 1 }, ms) if (reconnectTimeout.unref) reconnectTimeout.unref() }) } /** * Returns `true` if string is valid IPv4/6 address. * @param {string} addr * @return {boolean} */ Torrent.prototype._validAddr = function (addr) { var parts try { parts = addrToIPPort(addr) } catch (e) { return false } var host = parts[0] var port = parts[1] return port > 0 && port < 65535 && !(host === '127.0.0.1' && port === this.client.torrentPort) } function getBlockPipelineLength (wire, duration) { return 2 + Math.ceil(duration * wire.downloadSpeed() / Piece.BLOCK_LENGTH) } function getPiecePipelineLength (wire, duration, pieceLength) { return 1 + Math.ceil(duration * wire.downloadSpeed() / pieceLength) } /** * Returns a random integer in [0,high) */ function randomInt (high) { return Math.random() * high | 0 } function noop () {}