Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFeross Aboukhadijeh <feross@feross.org>2014-09-21 05:41:24 +0400
committerFeross Aboukhadijeh <feross@feross.org>2014-09-21 05:41:24 +0400
commit93686505fbc90522c75b6c151ec7261aa76098de (patch)
tree5b474b920e79b7b39b6804fac5e6a641de6ae843 /lib/torrent.js
parent2e14192c311f64c20496a72af7ffce36495be92b (diff)
merge `bittorrent-client` into this module
When I started the WebTorrent project I thought there were going to need to be two separate client implementations (bittorrent-client and webtorrent-client) that would get tied together in a higher-level module. Fortunately, this was not necessary because of the awesome “browser” field support in browserify. By substituting just a few modules, we can make the same module (webtorrent) work in node AND the browser, with the same codebase! So, from now on, you can just `require(‘webtorrent’)` in node or the browser, and it will just work. You can also `npm install webtorrent` if you want to use bittorrent in a node app or script. Lastly, you can `npm install webtorrent -g` if you want to use webtorrent as a command line app (it installs a `webtorrent` command).
Diffstat (limited to 'lib/torrent.js')
-rw-r--r--lib/torrent.js991
1 files changed, 991 insertions, 0 deletions
diff --git a/lib/torrent.js b/lib/torrent.js
new file mode 100644
index 0000000..039f0e9
--- /dev/null
+++ b/lib/torrent.js
@@ -0,0 +1,991 @@
+module.exports = Torrent
+
+var addrToIPPort = require('addr-to-ip-port')
+var concat = require('concat-stream') // browser exclude
+var debug = require('debug')('webtorrent:torrent')
+var Discovery = require('torrent-discovery')
+var EventEmitter = require('events').EventEmitter
+var fs = require('fs') // browser exclude
+var hh = require('http-https') // browser exclude
+var inherits = require('inherits')
+var parallel = require('run-parallel')
+var parseTorrent = require('parse-torrent')
+var RarityMap = require('./rarity-map')
+var reemit = require('re-emitter')
+var Storage = require('./storage')
+var Swarm = require('bittorrent-swarm') // `webtorrent-swarm` in browser
+var ut_metadata = require('ut_metadata')
+var ut_pex = require('ut_pex') // browser exclude
+
+var MAX_BLOCK_LENGTH = 128 * 1024
+var MAX_OUTSTANDING_REQUESTS = 5
+var PIECE_TIMEOUT = 10000
+var CHOKE_TIMEOUT = 5000
+var SPEED_THRESHOLD = 3 * Storage.BLOCK_LENGTH
+
+var RECHOKE_INTERVAL = 10000 // 10 seconds
+var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds
+
+function noop () {}
+
+inherits(Torrent, EventEmitter)
+
+/**
+ * A torrent
+ *
+ * @param {string|Buffer|Object} torrentId
+ * @param {Object} opts
+ */
+function Torrent (torrentId, opts) {
+ var self = this
+ EventEmitter.call(self)
+ debug('new torrent')
+
+ self.client = opts.client
+
+ self.hotswapEnabled = ('hotswap' in opts ? opts.hotswap : true)
+ self.verify = opts.verify
+ self.storageOpts = opts.storageOpts
+
+ self.chokeTimeout = opts.chokeTimeout || CHOKE_TIMEOUT
+ self.pieceTimeout = opts.pieceTimeout || PIECE_TIMEOUT
+ self.strategy = opts.strategy || 'sequential'
+
+ self._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0) ? 0 : (+opts.uploads || 10)
+ self._rechokeOptimisticWire = null
+ self._rechokeOptimisticTime = 0
+ self._rechokeIntervalId = null
+
+ self.ready = false
+ self.files = []
+ self.metadata = null
+ self.parsedTorrent = null
+ self.storage = null
+ self.numBlockedPeers = 0
+ self._amInterested = false
+ self._destroyed = false
+ self._selections = []
+ self._critical = []
+ self._storageImpl = opts.storage || Storage
+
+ var parsedTorrent = parseTorrent(torrentId)
+ if (parsedTorrent && parsedTorrent.infoHash) {
+ onTorrentId(parsedTorrent)
+
+ } else if (typeof hh.get === 'function' && /^https?:/.test(torrentId)) {
+ // http or https url to torrent file
+ hh.get(torrentId, function (res) {
+ res.pipe(concat(function (torrent) {
+ onTorrentId(torrent)
+ }))
+ }).on('error', function (err) {
+ self.emit('error', new Error('error downloading torrent: ' + err.message))
+ })
+
+ } else if (typeof fs.readFile === 'function') {
+ // assume it's a filesystem path
+ fs.readFile(torrentId, function (err, torrent) {
+ if (err) return self.emit('error', new Error('invalid torrent id'))
+ onTorrentId(torrent)
+ })
+
+ } else throw new Error('invalid torrent id')
+
+ function onTorrentId (torrentId) {
+ parsedTorrent = parseTorrent(torrentId)
+ self.infoHash = parsedTorrent.infoHash
+ if (parsedTorrent.name) self.name = parsedTorrent.name // preliminary name
+
+ // create swarm
+ self.swarm = new Swarm(self.infoHash, self.client.peerId, {
+ handshake: { dht: !!self.client.dht }
+ })
+ reemit(self.swarm, self, ['warning', 'error'])
+ self.swarm.on('wire', self._onWire.bind(self))
+
+ // update overall client stats
+ self.swarm.on('download', self.client.downloadSpeed.bind(self.client))
+ self.swarm.on('upload', self.client.uploadSpeed.bind(self.client))
+
+ if (process.browser) {
+ // in browser, swarm does not listen
+ self._onSwarmListening(parsedTorrent)
+ } else {
+ // listen for peers
+ self.swarm.listen(self.client.torrentPort, self._onSwarmListening.bind(self, parsedTorrent))
+ }
+ process.nextTick(function () {
+ self.emit('infoHash')
+ })
+ }
+}
+
+// torrent size (in bytes)
+Object.defineProperty(Torrent.prototype, 'length', {
+ get: function () {
+ return (this.parsedTorrent && this.parsedTorrent.length) || 0
+ }
+})
+
+// time remaining (in milliseconds)
+Object.defineProperty(Torrent.prototype, 'timeRemaining', {
+ get: function () {
+ if (this.swarm.downloadSpeed() === 0) return Infinity
+ else return ((this.length - this.downloaded) / this.swarm.downloadSpeed()) * 1000
+ }
+})
+
+// percentage complete, represented as a number between 0 and 1
+Object.defineProperty(Torrent.prototype, 'progress', {
+ get: function () {
+ return (this.parsedTorrent && (this.downloaded / this.parsedTorrent.length)) || 0
+ }
+})
+
+// bytes downloaded (not necessarily verified)
+Object.defineProperty(Torrent.prototype, 'downloaded', {
+ get: function () {
+ return (this.storage && this.storage.downloaded) || 0
+ }
+})
+
+// bytes uploaded
+Object.defineProperty(Torrent.prototype, 'uploaded', {
+ get: function () {
+ return this.swarm.uploaded
+ }
+})
+
+// ratio of bytes downloaded to uploaded
+Object.defineProperty(Torrent.prototype, 'ratio', {
+ get: function () {
+ return (this.uploaded && (this.downloaded / this.uploaded)) || 0
+ }
+})
+
+Torrent.prototype._onSwarmListening = function (parsed, port) {
+ var self = this
+ if (self._destroyed) return
+
+ self.client.torrentPort = port
+
+ // begin discovering peers via the DHT and tracker servers
+ self.discovery = new Discovery({
+ announce: parsed.announce,
+ dht: self.client.dht,
+ tracker: self.client.tracker,
+ peerId: self.client.peerId,
+ port: port
+ })
+ self.discovery.setTorrent(self.infoHash)
+ self.discovery.on('peer', self.addPeer.bind(self))
+
+ // expose discovery events
+ reemit(self.discovery, self, ['dhtAnnounce', 'warning', 'error'])
+
+ // if full metadata was included in initial torrent id, use it
+ if (parsed.info) self._onMetadata(parsed)
+
+ self.emit('listening', port)
+}
+
+/**
+ * Called when the metadata is received.
+ */
+Torrent.prototype._onMetadata = function (metadata) {
+ var self = this
+ if (self.metadata || self._destroyed) return
+ debug('got metadata')
+
+ if (metadata && metadata.infoHash) {
+ // `metadata` is a parsed torrent (from parse-torrent module)
+ self.metadata = parseTorrent.toBuffer(metadata)
+ self.parsedTorrent = metadata
+ } else {
+ self.metadata = metadata
+ try {
+ self.parsedTorrent = parseTorrent(self.metadata)
+ } catch (err) {
+ return self.emit('error', err)
+ }
+ }
+
+ // update preliminary torrent name
+ self.name = self.parsedTorrent.name
+
+ // update discovery module with full torrent metadata
+ self.discovery.setTorrent(self.parsedTorrent)
+
+ self.rarityMap = new RarityMap(self.swarm, self.parsedTorrent.pieces.length)
+
+ self.storage = new self._storageImpl(self.parsedTorrent, self.storageOpts)
+ self.storage.on('piece', self._onStoragePiece.bind(self))
+ self.storage.on('file', function (file) {
+ self.emit('file', file)
+ })
+
+ self._reservations = self.storage.pieces.map(function () {
+ return []
+ })
+
+ self.storage.on('done', function () {
+ if (self.discovery.tracker)
+ self.discovery.tracker.complete()
+
+ debug('torrent ' + self.infoHash + ' done')
+ self.emit('done')
+ })
+
+ self.storage.on('select', self.select.bind(self))
+ self.storage.on('deselect', self.deselect.bind(self))
+ self.storage.on('critical', self.critical.bind(self))
+
+ self.storage.files.forEach(function (file) {
+ self.files.push(file)
+ })
+
+ self.swarm.wires.forEach(function (wire) {
+ // If we didn't have the metadata at the time ut_metadata was initialized for this
+ // wire, we still want to make it available to the peer in case they request it.
+ if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata)
+
+ self._onWireWithMetadata(wire)
+ })
+
+ if (self.verify) {
+ process.nextTick(function () {
+ debug('verifying existing torrent data')
+ var numPieces = 0
+ var numVerified = 0
+
+ // TODO: move storage verification to storage.js?
+ parallel(self.storage.pieces.map(function (piece) {
+ return function (cb) {
+ self.storage.read(piece.index, function (err, buffer) {
+ numPieces += 1
+ self.emit('verifying', {
+ percentDone: 100 * numPieces / self.storage.pieces.length,
+ percentVerified: 100 * numVerified / self.storage.pieces.length,
+ })
+
+ if (!err && buffer) {
+ // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer
+ piece.verify(buffer)
+ numVerified += piece.verified
+ debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index)
+ }
+ // continue regardless of whether piece verification failed
+ cb()
+ }, true) // forces override to allow reading from non-verified pieces
+ }
+ }), self._onStorage.bind(self))
+ })
+ } else {
+ process.nextTick(self._onStorage.bind(self))
+ }
+
+ process.nextTick(function () {
+ self.emit('metadata')
+ })
+}
+
+/**
+ * Destroy and cleanup this torrent.
+ */
+Torrent.prototype.destroy = function (cb) {
+ var self = this
+ debug('destroy')
+ self._destroyed = true
+ clearInterval(self._rechokeIntervalId)
+
+ var tasks = []
+ if (self.swarm) tasks.push(function (cb) {
+ self.swarm.destroy(cb)
+ })
+ if (self.discovery) tasks.push(function (cb) {
+ self.discovery.stop(cb)
+ })
+ if (self.storage) tasks.push(function (cb) {
+ self.storage.close(cb)
+ })
+ parallel(tasks, cb)
+}
+
+/**
+ * Add a peer to the swarm
+ * @param {string|SimplePeer} peer
+ */
+Torrent.prototype.addPeer = function (peer) {
+ var self = this
+
+ // TODO: extract IP address from peer object and check blocklist
+ if (typeof peer === 'string' &&
+ self.client.blocked && self.client.blocked.contains(addrToIPPort(peer)[0])) {
+ self.numBlockedPeers += 1
+ self.emit('blocked-peer', peer)
+ } else {
+ self.emit('peer', peer)
+ self.swarm.addPeer(peer)
+ }
+}
+
+/**
+ * Select a range of pieces to prioritize.
+ *
+ * @param {number} start start piece index (inclusive)
+ * @param {number} end end piece index (inclusive)
+ * @param {number} priority priority associated with this selection
+ * @param {function} notify callback when selection is updated with new data
+ */
+Torrent.prototype.select = function (start, end, priority, notify) {
+ var self = this
+ if (start > end || start < 0 || end >= self.storage.pieces.length)
+ throw new Error('invalid selection ', start, ':', end)
+ priority = Number(priority) || 0
+
+ debug('select %s-%s (priority %s)', start, end, priority)
+
+ self._selections.push({
+ from: start,
+ to: end,
+ offset: 0,
+ priority: priority,
+ notify: notify || noop
+ })
+
+ self._selections.sort(function (a, b) {
+ return b.priority - a.priority
+ })
+
+ self._updateSelections()
+}
+
+/**
+ * Deprioritizes a range of previously selected pieces.
+ *
+ * @param {number} start start piece index (inclusive)
+ * @param {number} end end piece index (inclusive)
+ * @param {number} priority priority associated with the selection
+ */
+Torrent.prototype.deselect = function (start, end, priority) {
+ var self = this
+ priority = Number(priority) || 0
+ debug('deselect %s-%s (priority %s)', start, end, priority)
+
+ for (var i = 0; i < self._selections.length; ++i) {
+ var s = self._selections[i]
+ if (s.from === start && s.to === end && s.priority === priority) {
+ self._selections.splice(i--, 1)
+ break
+ }
+ }
+
+ self._updateSelections()
+}
+
+/**
+ * Marks a range of pieces as critical priority to be downloaded ASAP.
+ *
+ * @param {number} start start piece index (inclusive)
+ * @param {number} end end piece index (inclusive)
+ */
+Torrent.prototype.critical = function (start, end) {
+ var self = this
+ debug('critical %s-%s', start, end)
+
+ for (var i = start; i <= end; ++i) {
+ self._critical[i] = true
+ }
+
+ self._updateSelections()
+}
+
+Torrent.prototype._onWire = function (wire) {
+ var self = this
+
+ // use ut_metadata extension
+ wire.use(ut_metadata(self.metadata))
+
+ if (!self.metadata) {
+ wire.ut_metadata.on('metadata', function (metadata) {
+ debug('got metadata via ut_metadata')
+ self._onMetadata(metadata)
+ })
+ wire.ut_metadata.fetch()
+ }
+
+ // use ut_pex extension
+ if (typeof ut_pex === 'function') wire.use(ut_pex())
+
+ //wire.ut_pex.start() // TODO two-way communication
+ if (wire.ut_pex) wire.ut_pex.on('peer', function (peer) {
+ debug('got peer via ut_pex ' + peer)
+ self.addPeer(peer)
+ })
+
+ if (wire.ut_pex) wire.ut_pex.on('dropped', function (peer) {
+ // the remote peer believes a given peer has been dropped from the swarm.
+ // if we're not currently connected to it, then remove it from the swarm's queue.
+ if (!(peer in self.swarm._peers)) self.swarm.removePeer(peer)
+ })
+
+ // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire
+ wire.setKeepAlive(true)
+
+ // If peer supports DHT, send PORT message to report DHT node listening port
+ if (wire.peerExtensions.dht && self.client.dht && self.client.dht.port) {
+ wire.port(self.client.dht.port)
+ }
+
+ // When peer sends PORT, add them to the routing table
+ wire.on('port', function (port) {
+ debug('port message from ' + wire.remoteAddress)
+ // TODO: dht should support adding a node when you don't know the nodeId
+ // dht.addNode(wire.remoteAddress + ':' + port)
+ })
+
+ wire.on('timeout', function () {
+ debug('wire timeout from ' + wire.remoteAddress)
+ // TODO: this might be destroying wires too eagerly
+ wire.destroy()
+ })
+
+ // Timeout for piece requests to this peer
+ wire.setTimeout(self.pieceTimeout)
+
+ if (self.metadata) {
+ self._onWireWithMetadata(wire)
+ }
+}
+
+Torrent.prototype._onWireWithMetadata = function (wire) {
+ var self = this
+ var timeoutId = null
+ var timeoutMs = self.chokeTimeout
+
+ function onChokeTimeout () {
+ if (self._destroyed || wire._destroyed) return
+
+ if (self.swarm.numQueued > 2 * (self.swarm.numConns - self.swarm.numPeers) && wire.amInterested) {
+ wire.destroy()
+ } else {
+ timeoutId = setTimeout(onChokeTimeout, timeoutMs)
+ }
+ }
+
+ var i = 0
+ function updateSeedStatus () {
+ if (wire.peerPieces.length !== self.storage.pieces.length) return
+ for (; i < self.storage.pieces.length; ++i) {
+ if (!wire.peerPieces.get(i)) return
+ }
+ wire.isSeeder = true
+ wire.choke() // always choke seeders
+ }
+
+ wire.on('bitfield', function () {
+ updateSeedStatus()
+ self._update()
+ })
+
+ wire.on('have', function () {
+ updateSeedStatus()
+ self._update()
+ })
+
+ wire.once('interested', function () {
+ wire.unchoke()
+ })
+
+ wire.on('close', function () {
+ clearTimeout(timeoutId)
+ })
+
+ wire.on('choke', function () {
+ clearTimeout(timeoutId)
+ timeoutId = setTimeout(onChokeTimeout, timeoutMs)
+ })
+
+ wire.on('unchoke', function () {
+ clearTimeout(timeoutId)
+ self._update()
+ })
+
+ wire.on('request', function (index, offset, length, cb) {
+ // Disconnect from peers that request more than 128KB, per spec
+ if (length > MAX_BLOCK_LENGTH) {
+ debug(wire.remoteAddress, 'requested invalid block size', length)
+ return wire.destroy()
+ }
+
+ self.storage.readBlock(index, offset, length, cb)
+ })
+
+ wire.bitfield(self.storage.bitfield) // always send bitfield (required)
+ wire.interested() // always start out interested
+
+ timeoutId = setTimeout(onChokeTimeout, timeoutMs)
+
+ wire.isSeeder = false
+ updateSeedStatus()
+}
+
+/**
+ * Called when the metadata, swarm, and underlying storage are all fully initialized.
+ */
+Torrent.prototype._onStorage = function () {
+ var self = this
+ debug('on storage')
+
+ // allow writes to storage only after initial piece verification is finished
+ self.storage.readonly = false
+
+ // start off selecting the entire torrent with low priority
+ self.select(0, self.storage.pieces.length - 1, false)
+
+ self._rechokeIntervalId = setInterval(self._rechoke.bind(self), RECHOKE_INTERVAL)
+ self._rechokeIntervalId.unref && self._rechokeIntervalId.unref()
+
+ process.nextTick(function () {
+ self.ready = true
+ self.emit('ready')
+ })
+}
+
+/**
+ * When a piece is fully downloaded, notify all peers with a HAVE message.
+ * @param {Piece} piece
+ */
+Torrent.prototype._onStoragePiece = function (piece) {
+ var self = this
+ debug('piece done %s', piece.index)
+ self._reservations[piece.index] = null
+
+ self.swarm.wires.forEach(function (wire) {
+ wire.have(piece.index)
+ })
+
+ self._gcSelections()
+}
+
+/**
+ * Called on selection changes.
+ */
+Torrent.prototype._updateSelections = function () {
+ var self = this
+ if (!self.swarm || self._destroyed) return
+ if (!self.metadata) return self.once('metadata', self._updateSelections.bind(self))
+
+ process.nextTick(self._gcSelections.bind(self))
+ self._updateInterest()
+ self._update()
+}
+
+/**
+ * Garbage collect selections with respect to the storage's current state.
+ */
+Torrent.prototype._gcSelections = function () {
+ var self = this
+
+ for (var i = 0; i < self._selections.length; i++) {
+ var s = self._selections[i]
+ var oldOffset = s.offset
+
+ // check for newly downloaded pieces in selection
+ while (self.storage.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) {
+ s.offset++
+ }
+
+ if (oldOffset !== s.offset) s.notify()
+ if (s.to !== s.from + s.offset) continue
+ if (!self.storage.bitfield.get(s.from + s.offset)) continue
+
+ // remove fully downloaded selection
+ self._selections.splice(i--, 1) // decrement i to offset splice
+ s.notify() // TODO: this may notify twice in a row. is this a problem?
+ self._updateInterest()
+ }
+
+ if (!self._selections.length) self.emit('idle')
+}
+
+/**
+ * Update interested status for all peers.
+ */
+Torrent.prototype._updateInterest = function () {
+ var self = this
+
+ var prev = self._amInterested
+ self._amInterested = !!self._selections.length
+
+ self.swarm.wires.forEach(function (wire) {
+ // TODO: only call wire.interested if the wire has at least one piece we need
+ if (self._amInterested) wire.interested()
+ else wire.uninterested()
+ })
+
+ if (prev === self._amInterested) return
+ if (self._amInterested) self.emit('interested')
+ else self.emit('uninterested')
+}
+
+/**
+ * Heartbeat to update all peers and their requests.
+ */
+Torrent.prototype._update = function () {
+ var self = this
+ if (self._destroyed) return
+
+ // update wires in random order for better request distribution
+ randomizedForEach(self.swarm.wires, self._updateWire.bind(self))
+}
+
+/**
+ * Attempts to update a peer's requests
+ */
+Torrent.prototype._updateWire = function (wire) {
+ var self = this
+
+ if (wire.peerChoking) return
+ if (!wire.downloaded) return validateWire()
+
+ trySelectWire(false) || trySelectWire(true)
+
+ function genPieceFilterFunc (start, end, tried, rank) {
+ return function (i) {
+ return i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i))
+ }
+ }
+
+ // TODO: Do we need both validateWire and trySelectWire?
+ function validateWire () {
+ if (wire.requests.length) return
+
+ for (var i = self._selections.length; i--;) {
+ var next = self._selections[i]
+
+ var piece
+ if (self.strategy === 'rarest') {
+ var start = next.from + next.offset
+ var end = next.to
+ var len = end - start + 1
+ var tried = {}
+ var tries = 0
+ var filter = genPieceFilterFunc(start, end, tried)
+
+ while (tries < len) {
+ piece = self.rarityMap.getRarestPiece(filter)
+ if (piece < 0) break
+ if (self._request(wire, piece, false)) return
+ tried[piece] = true
+ tries += 1
+ }
+ } else {
+ for (piece = next.to; piece >= next.from + next.offset; --piece) {
+ if (!wire.peerPieces.get(piece)) continue
+ if (self._request(wire, piece, false)) return
+ }
+ }
+ }
+
+ // TODO: wire failed to validate as useful; should we close it?
+ }
+
+ function speedRanker () {
+ var speed = wire.downloadSpeed() || 1
+ if (speed > SPEED_THRESHOLD) return function () { return true }
+
+ var secs = MAX_OUTSTANDING_REQUESTS * Storage.BLOCK_LENGTH / speed
+ var tries = 10
+ var ptr = 0
+
+ return function (index) {
+ if (!tries || self.storage.bitfield.get(index)) return true
+
+ var piece = self.storage.pieces[index]
+ var missing = piece.blocks.length - piece.blocksWritten
+
+ for (; ptr < self.swarm.wires.length; ptr++) {
+ var otherWire = self.swarm.wires[ptr]
+ var otherSpeed = otherWire.downloadSpeed()
+
+ if (otherSpeed < SPEED_THRESHOLD) continue
+ if (otherSpeed <= speed) continue
+ if (!otherWire.peerPieces.get(index)) continue
+ if ((missing -= otherSpeed * secs) > 0) continue
+
+ tries--
+ return false
+ }
+
+ return true
+ }
+ }
+
+ function shufflePriority (i) {
+ var last = i
+ for (var j = i; j < self._selections.length && self._selections[j].priority; j++) {
+ last = j
+ }
+ var tmp = self._selections[i]
+ self._selections[i] = self._selections[last]
+ self._selections[last] = tmp
+ }
+
+ function trySelectWire (hotswap) {
+ if (wire.requests.length >= MAX_OUTSTANDING_REQUESTS) return true
+ var rank = speedRanker()
+
+ for (var i = 0; i < self._selections.length; i++) {
+ var next = self._selections[i]
+
+ var piece
+ if (self.strategy === 'rarest') {
+ var start = next.from + next.offset
+ var end = next.to
+ var len = end - start + 1
+ var tried = {}
+ var tries = 0
+ var filter = genPieceFilterFunc(start, end, tried, rank)
+
+ while (tries < len) {
+ piece = self.rarityMap.getRarestPiece(filter)
+ if (piece < 0) break
+
+ // request all non-reserved blocks in this piece
+ while (self._request(wire, piece, self._critical[piece] || hotswap)) {}
+
+ if (wire.requests.length < MAX_OUTSTANDING_REQUESTS) {
+ tried[piece] = true
+ tries++
+ continue
+ }
+
+ if (next.priority) shufflePriority(i)
+ return true
+ }
+ } else {
+ for (piece = next.from + next.offset; piece <= next.to; piece++) {
+ if (!wire.peerPieces.get(piece) || !rank(piece)) continue
+
+ // request all non-reserved blocks in piece
+ while (self._request(wire, piece, self._critical[piece] || hotswap)) {}
+
+ if (wire.requests.length < MAX_OUTSTANDING_REQUESTS) continue
+
+ if (next.priority) shufflePriority(i)
+ return true
+ }
+ }
+ }
+
+ return false
+ }
+}
+
+/**
+ * Called periodically to update the choked status of all peers, handling optimistic
+ * unchoking as described in BEP3.
+ */
+Torrent.prototype._rechoke = function () {
+ var self = this
+
+ if (self._rechokeOptimisticTime > 0)
+ self._rechokeOptimisticTime -= 1
+ else
+ self._rechokeOptimisticWire = null
+
+ var peers = []
+
+ self.swarm.wires.forEach(function (wire) {
+ if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) {
+ peers.push({
+ wire: wire,
+ downloadSpeed: wire.downloadSpeed(),
+ uploadSpeed: wire.uploadSpeed(),
+ salt: Math.random(),
+ isChoked: true
+ })
+ }
+ })
+
+ peers.sort(rechokeSort)
+
+ var unchokeInterested = 0
+ var i = 0
+ for (; i < peers.length && unchokeInterested < self._rechokeNumSlots; ++i) {
+ peers[i].isChoked = false
+ if (peers[i].wire.peerInterested) unchokeInterested += 1
+ }
+
+ // Optimistically unchoke a peer
+ if (!self._rechokeOptimisticWire && i < peers.length && self._rechokeNumSlots) {
+ var candidates = peers.slice(i).filter(function (peer) { return peer.wire.peerInterested })
+ var optimistic = candidates[randomInt(candidates.length)]
+
+ if (optimistic) {
+ optimistic.isChoked = false
+ self._rechokeOptimisticWire = optimistic.wire
+ self._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION
+ }
+ }
+
+ // Unchoke best peers
+ peers.forEach(function (peer) {
+ if (peer.wire.amChoking !== peer.isChoked) {
+ if (peer.isChoked) peer.wire.choke()
+ else peer.wire.unchoke()
+ }
+ })
+
+ function rechokeSort (peerA, peerB) {
+ // Prefer higher download speed
+ if (peerA.downloadSpeed !== peerB.downloadSpeed)
+ return peerB.downloadSpeed - peerA.downloadSpeed
+
+ // Prefer higher upload speed
+ if (peerA.uploadSpeed !== peerB.uploadSpeed)
+ return peerB.uploadSpeed - peerA.uploadSpeed
+
+ // Prefer unchoked
+ if (peerA.wire.amChoking !== peerB.wire.amChoking)
+ return peerA.wire.amChoking ? 1 : -1
+
+ // Random order
+ return peerA.salt - peerB.salt
+ }
+}
+
+/**
+ * Attempts to cancel a slow block request from another wire such that the
+ * given wire may effectively swap out the request for one of its own.
+ */
+Torrent.prototype._hotswap = function (wire, index) {
+ var self = this
+ if (!self.hotswapEnabled) return false
+
+ var speed = wire.downloadSpeed()
+ if (speed < Storage.BLOCK_LENGTH) return false
+ if (!self._reservations[index]) return false
+
+ var r = self._reservations[index]
+ if (!r) {
+ return false
+ }
+
+ var minSpeed = Infinity
+ var minWire
+
+ var i
+ for (i = 0; i < r.length; i++) {
+ var otherWire = r[i]
+ if (!otherWire || otherWire === wire) continue
+
+ var otherSpeed = otherWire.downloadSpeed()
+ if (otherSpeed >= SPEED_THRESHOLD) continue
+ if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue
+
+ minWire = otherWire
+ minSpeed = otherSpeed
+ }
+
+ if (!minWire) return false
+
+ for (i = 0; i < r.length; i++) {
+ if (r[i] === minWire) r[i] = null
+ }
+
+ for (i = 0; i < minWire.requests.length; i++) {
+ var req = minWire.requests[i]
+ if (req.piece !== index) continue
+
+ self.storage.cancelBlock(index, req.offset)
+ }
+
+ self.emit('hotswap', minWire, wire, index)
+ return true
+}
+
+/**
+ * Attempts to request a block from the given wire.
+ */
+Torrent.prototype._request = function (wire, index, hotswap) {
+ var self = this
+ var numRequests = wire.requests.length
+
+ if (self.storage.bitfield.get(index)) return false
+ if (numRequests >= MAX_OUTSTANDING_REQUESTS) return false
+
+ var endGame = (wire.requests.length === 0 && self.storage.numMissing < 30)
+ var block = self.storage.reserveBlock(index, endGame)
+
+ if (!block && !endGame && hotswap && self._hotswap(wire, index))
+ block = self.storage.reserveBlock(index, false)
+ if (!block) return false
+
+ var r = self._reservations[index]
+ if (!r) {
+ r = self._reservations[index] = []
+ }
+ var i = r.indexOf(null)
+ if (i === -1) i = r.length
+ r[i] = wire
+
+ function gotPiece (err, buffer) {
+ if (!self.ready) {
+ self.once('ready', function () {
+ gotPiece(err, buffer)
+ })
+ return
+ }
+
+ if (r[i] === wire) r[i] = null
+
+ if (err) {
+ debug('error getting piece ' + index + '(offset: ' + block.offset + ' length: ' + block.length + ') from ' + wire.remoteAddress + ' ' + err.message)
+ self.storage.cancelBlock(index, block.offset)
+ process.nextTick(self._update.bind(self))
+ return false
+ } else {
+ // debug('got piece ' + index + '(offset: ' + block.offset + ' length: ' + block.length + ') from ' + wire.remoteAddress)
+ self.storage.writeBlock(index, block.offset, buffer, function (err) {
+ if (err) {
+ debug('error writing block')
+ self.storage.cancelBlock(index, block.offset)
+ }
+
+ process.nextTick(self._update.bind(self))
+ })
+ }
+ }
+
+ wire.request(index, block.offset, block.length, gotPiece)
+
+ return true
+}
+
+/**
+ * Returns a random integer in [0,high)
+ */
+function randomInt (high) {
+ return Math.random() * high | 0
+}
+
+/**
+ * Iterates through the given array in a random order, calling the given
+ * callback for each element.
+ */
+function randomizedForEach (array, cb) {
+ var indices = array.map(function (value, index) { return index })
+
+ for (var i = 0, len = indices.length; i < len; ++i) {
+ var j = randomInt(len)
+ var tmp = indices[i]
+ indices[i] = indices[j]
+ indices[j] = tmp
+ }
+
+ indices.forEach(function (index) {
+ cb(array[index], index, array)
+ })
+}