diff options
author | Feross Aboukhadijeh <feross@feross.org> | 2014-09-21 05:41:24 +0400 |
---|---|---|
committer | Feross Aboukhadijeh <feross@feross.org> | 2014-09-21 05:41:24 +0400 |
commit | 93686505fbc90522c75b6c151ec7261aa76098de (patch) | |
tree | 5b474b920e79b7b39b6804fac5e6a641de6ae843 /lib/torrent.js | |
parent | 2e14192c311f64c20496a72af7ffce36495be92b (diff) |
merge `bittorrent-client` into this module
When I started the WebTorrent project I thought there were going to
need to be two separate client implementations (bittorrent-client and
webtorrent-client) that would get tied together in a higher-level
module.
Fortunately, this was not necessary because of the awesome “browser”
field support in browserify. By substituting just a few modules, we can
make the same module (webtorrent) work in node AND the browser, with
the same codebase!
So, from now on, you can just `require(‘webtorrent’)` in node or the
browser, and it will just work. You can also `npm install webtorrent`
if you want to use bittorrent in a node app or script. Lastly, you can
`npm install webtorrent -g` if you want to use webtorrent as a command
line app (it installs a `webtorrent` command).
Diffstat (limited to 'lib/torrent.js')
-rw-r--r-- | lib/torrent.js | 991 |
1 files changed, 991 insertions, 0 deletions
diff --git a/lib/torrent.js b/lib/torrent.js new file mode 100644 index 0000000..039f0e9 --- /dev/null +++ b/lib/torrent.js @@ -0,0 +1,991 @@ +module.exports = Torrent + +var addrToIPPort = require('addr-to-ip-port') +var concat = require('concat-stream') // browser exclude +var debug = require('debug')('webtorrent:torrent') +var Discovery = require('torrent-discovery') +var EventEmitter = require('events').EventEmitter +var fs = require('fs') // browser exclude +var hh = require('http-https') // browser exclude +var inherits = require('inherits') +var parallel = require('run-parallel') +var parseTorrent = require('parse-torrent') +var RarityMap = require('./rarity-map') +var reemit = require('re-emitter') +var Storage = require('./storage') +var Swarm = require('bittorrent-swarm') // `webtorrent-swarm` in browser +var ut_metadata = require('ut_metadata') +var ut_pex = require('ut_pex') // browser exclude + +var MAX_BLOCK_LENGTH = 128 * 1024 +var MAX_OUTSTANDING_REQUESTS = 5 +var PIECE_TIMEOUT = 10000 +var CHOKE_TIMEOUT = 5000 +var SPEED_THRESHOLD = 3 * Storage.BLOCK_LENGTH + +var RECHOKE_INTERVAL = 10000 // 10 seconds +var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds + +function noop () {} + +inherits(Torrent, EventEmitter) + +/** + * A torrent + * + * @param {string|Buffer|Object} torrentId + * @param {Object} opts + */ +function Torrent (torrentId, opts) { + var self = this + EventEmitter.call(self) + debug('new torrent') + + self.client = opts.client + + self.hotswapEnabled = ('hotswap' in opts ? opts.hotswap : true) + self.verify = opts.verify + self.storageOpts = opts.storageOpts + + self.chokeTimeout = opts.chokeTimeout || CHOKE_TIMEOUT + self.pieceTimeout = opts.pieceTimeout || PIECE_TIMEOUT + self.strategy = opts.strategy || 'sequential' + + self._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0) ? 0 : (+opts.uploads || 10) + self._rechokeOptimisticWire = null + self._rechokeOptimisticTime = 0 + self._rechokeIntervalId = null + + self.ready = false + self.files = [] + self.metadata = null + self.parsedTorrent = null + self.storage = null + self.numBlockedPeers = 0 + self._amInterested = false + self._destroyed = false + self._selections = [] + self._critical = [] + self._storageImpl = opts.storage || Storage + + var parsedTorrent = parseTorrent(torrentId) + if (parsedTorrent && parsedTorrent.infoHash) { + onTorrentId(parsedTorrent) + + } else if (typeof hh.get === 'function' && /^https?:/.test(torrentId)) { + // http or https url to torrent file + hh.get(torrentId, function (res) { + res.pipe(concat(function (torrent) { + onTorrentId(torrent) + })) + }).on('error', function (err) { + self.emit('error', new Error('error downloading torrent: ' + err.message)) + }) + + } else if (typeof fs.readFile === 'function') { + // assume it's a filesystem path + fs.readFile(torrentId, function (err, torrent) { + if (err) return self.emit('error', new Error('invalid torrent id')) + onTorrentId(torrent) + }) + + } else throw new Error('invalid torrent id') + + function onTorrentId (torrentId) { + parsedTorrent = parseTorrent(torrentId) + self.infoHash = parsedTorrent.infoHash + if (parsedTorrent.name) self.name = parsedTorrent.name // preliminary name + + // create swarm + self.swarm = new Swarm(self.infoHash, self.client.peerId, { + handshake: { dht: !!self.client.dht } + }) + reemit(self.swarm, self, ['warning', 'error']) + self.swarm.on('wire', self._onWire.bind(self)) + + // update overall client stats + self.swarm.on('download', self.client.downloadSpeed.bind(self.client)) + self.swarm.on('upload', self.client.uploadSpeed.bind(self.client)) + + if (process.browser) { + // in browser, swarm does not listen + self._onSwarmListening(parsedTorrent) + } else { + // listen for peers + self.swarm.listen(self.client.torrentPort, self._onSwarmListening.bind(self, parsedTorrent)) + } + process.nextTick(function () { + self.emit('infoHash') + }) + } +} + +// torrent size (in bytes) +Object.defineProperty(Torrent.prototype, 'length', { + get: function () { + return (this.parsedTorrent && this.parsedTorrent.length) || 0 + } +}) + +// time remaining (in milliseconds) +Object.defineProperty(Torrent.prototype, 'timeRemaining', { + get: function () { + if (this.swarm.downloadSpeed() === 0) return Infinity + else return ((this.length - this.downloaded) / this.swarm.downloadSpeed()) * 1000 + } +}) + +// percentage complete, represented as a number between 0 and 1 +Object.defineProperty(Torrent.prototype, 'progress', { + get: function () { + return (this.parsedTorrent && (this.downloaded / this.parsedTorrent.length)) || 0 + } +}) + +// bytes downloaded (not necessarily verified) +Object.defineProperty(Torrent.prototype, 'downloaded', { + get: function () { + return (this.storage && this.storage.downloaded) || 0 + } +}) + +// bytes uploaded +Object.defineProperty(Torrent.prototype, 'uploaded', { + get: function () { + return this.swarm.uploaded + } +}) + +// ratio of bytes downloaded to uploaded +Object.defineProperty(Torrent.prototype, 'ratio', { + get: function () { + return (this.uploaded && (this.downloaded / this.uploaded)) || 0 + } +}) + +Torrent.prototype._onSwarmListening = function (parsed, port) { + var self = this + if (self._destroyed) return + + self.client.torrentPort = port + + // begin discovering peers via the DHT and tracker servers + self.discovery = new Discovery({ + announce: parsed.announce, + dht: self.client.dht, + tracker: self.client.tracker, + peerId: self.client.peerId, + port: port + }) + self.discovery.setTorrent(self.infoHash) + self.discovery.on('peer', self.addPeer.bind(self)) + + // expose discovery events + reemit(self.discovery, self, ['dhtAnnounce', 'warning', 'error']) + + // if full metadata was included in initial torrent id, use it + if (parsed.info) self._onMetadata(parsed) + + self.emit('listening', port) +} + +/** + * Called when the metadata is received. + */ +Torrent.prototype._onMetadata = function (metadata) { + var self = this + if (self.metadata || self._destroyed) return + debug('got metadata') + + if (metadata && metadata.infoHash) { + // `metadata` is a parsed torrent (from parse-torrent module) + self.metadata = parseTorrent.toBuffer(metadata) + self.parsedTorrent = metadata + } else { + self.metadata = metadata + try { + self.parsedTorrent = parseTorrent(self.metadata) + } catch (err) { + return self.emit('error', err) + } + } + + // update preliminary torrent name + self.name = self.parsedTorrent.name + + // update discovery module with full torrent metadata + self.discovery.setTorrent(self.parsedTorrent) + + self.rarityMap = new RarityMap(self.swarm, self.parsedTorrent.pieces.length) + + self.storage = new self._storageImpl(self.parsedTorrent, self.storageOpts) + self.storage.on('piece', self._onStoragePiece.bind(self)) + self.storage.on('file', function (file) { + self.emit('file', file) + }) + + self._reservations = self.storage.pieces.map(function () { + return [] + }) + + self.storage.on('done', function () { + if (self.discovery.tracker) + self.discovery.tracker.complete() + + debug('torrent ' + self.infoHash + ' done') + self.emit('done') + }) + + self.storage.on('select', self.select.bind(self)) + self.storage.on('deselect', self.deselect.bind(self)) + self.storage.on('critical', self.critical.bind(self)) + + self.storage.files.forEach(function (file) { + self.files.push(file) + }) + + self.swarm.wires.forEach(function (wire) { + // If we didn't have the metadata at the time ut_metadata was initialized for this + // wire, we still want to make it available to the peer in case they request it. + if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata) + + self._onWireWithMetadata(wire) + }) + + if (self.verify) { + process.nextTick(function () { + debug('verifying existing torrent data') + var numPieces = 0 + var numVerified = 0 + + // TODO: move storage verification to storage.js? + parallel(self.storage.pieces.map(function (piece) { + return function (cb) { + self.storage.read(piece.index, function (err, buffer) { + numPieces += 1 + self.emit('verifying', { + percentDone: 100 * numPieces / self.storage.pieces.length, + percentVerified: 100 * numVerified / self.storage.pieces.length, + }) + + if (!err && buffer) { + // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer + piece.verify(buffer) + numVerified += piece.verified + debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index) + } + // continue regardless of whether piece verification failed + cb() + }, true) // forces override to allow reading from non-verified pieces + } + }), self._onStorage.bind(self)) + }) + } else { + process.nextTick(self._onStorage.bind(self)) + } + + process.nextTick(function () { + self.emit('metadata') + }) +} + +/** + * Destroy and cleanup this torrent. + */ +Torrent.prototype.destroy = function (cb) { + var self = this + debug('destroy') + self._destroyed = true + clearInterval(self._rechokeIntervalId) + + var tasks = [] + if (self.swarm) tasks.push(function (cb) { + self.swarm.destroy(cb) + }) + if (self.discovery) tasks.push(function (cb) { + self.discovery.stop(cb) + }) + if (self.storage) tasks.push(function (cb) { + self.storage.close(cb) + }) + parallel(tasks, cb) +} + +/** + * Add a peer to the swarm + * @param {string|SimplePeer} peer + */ +Torrent.prototype.addPeer = function (peer) { + var self = this + + // TODO: extract IP address from peer object and check blocklist + if (typeof peer === 'string' && + self.client.blocked && self.client.blocked.contains(addrToIPPort(peer)[0])) { + self.numBlockedPeers += 1 + self.emit('blocked-peer', peer) + } else { + self.emit('peer', peer) + self.swarm.addPeer(peer) + } +} + +/** + * Select a range of pieces to prioritize. + * + * @param {number} start start piece index (inclusive) + * @param {number} end end piece index (inclusive) + * @param {number} priority priority associated with this selection + * @param {function} notify callback when selection is updated with new data + */ +Torrent.prototype.select = function (start, end, priority, notify) { + var self = this + if (start > end || start < 0 || end >= self.storage.pieces.length) + throw new Error('invalid selection ', start, ':', end) + priority = Number(priority) || 0 + + debug('select %s-%s (priority %s)', start, end, priority) + + self._selections.push({ + from: start, + to: end, + offset: 0, + priority: priority, + notify: notify || noop + }) + + self._selections.sort(function (a, b) { + return b.priority - a.priority + }) + + self._updateSelections() +} + +/** + * Deprioritizes a range of previously selected pieces. + * + * @param {number} start start piece index (inclusive) + * @param {number} end end piece index (inclusive) + * @param {number} priority priority associated with the selection + */ +Torrent.prototype.deselect = function (start, end, priority) { + var self = this + priority = Number(priority) || 0 + debug('deselect %s-%s (priority %s)', start, end, priority) + + for (var i = 0; i < self._selections.length; ++i) { + var s = self._selections[i] + if (s.from === start && s.to === end && s.priority === priority) { + self._selections.splice(i--, 1) + break + } + } + + self._updateSelections() +} + +/** + * Marks a range of pieces as critical priority to be downloaded ASAP. + * + * @param {number} start start piece index (inclusive) + * @param {number} end end piece index (inclusive) + */ +Torrent.prototype.critical = function (start, end) { + var self = this + debug('critical %s-%s', start, end) + + for (var i = start; i <= end; ++i) { + self._critical[i] = true + } + + self._updateSelections() +} + +Torrent.prototype._onWire = function (wire) { + var self = this + + // use ut_metadata extension + wire.use(ut_metadata(self.metadata)) + + if (!self.metadata) { + wire.ut_metadata.on('metadata', function (metadata) { + debug('got metadata via ut_metadata') + self._onMetadata(metadata) + }) + wire.ut_metadata.fetch() + } + + // use ut_pex extension + if (typeof ut_pex === 'function') wire.use(ut_pex()) + + //wire.ut_pex.start() // TODO two-way communication + if (wire.ut_pex) wire.ut_pex.on('peer', function (peer) { + debug('got peer via ut_pex ' + peer) + self.addPeer(peer) + }) + + if (wire.ut_pex) wire.ut_pex.on('dropped', function (peer) { + // the remote peer believes a given peer has been dropped from the swarm. + // if we're not currently connected to it, then remove it from the swarm's queue. + if (!(peer in self.swarm._peers)) self.swarm.removePeer(peer) + }) + + // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire + wire.setKeepAlive(true) + + // If peer supports DHT, send PORT message to report DHT node listening port + if (wire.peerExtensions.dht && self.client.dht && self.client.dht.port) { + wire.port(self.client.dht.port) + } + + // When peer sends PORT, add them to the routing table + wire.on('port', function (port) { + debug('port message from ' + wire.remoteAddress) + // TODO: dht should support adding a node when you don't know the nodeId + // dht.addNode(wire.remoteAddress + ':' + port) + }) + + wire.on('timeout', function () { + debug('wire timeout from ' + wire.remoteAddress) + // TODO: this might be destroying wires too eagerly + wire.destroy() + }) + + // Timeout for piece requests to this peer + wire.setTimeout(self.pieceTimeout) + + if (self.metadata) { + self._onWireWithMetadata(wire) + } +} + +Torrent.prototype._onWireWithMetadata = function (wire) { + var self = this + var timeoutId = null + var timeoutMs = self.chokeTimeout + + function onChokeTimeout () { + if (self._destroyed || wire._destroyed) return + + if (self.swarm.numQueued > 2 * (self.swarm.numConns - self.swarm.numPeers) && wire.amInterested) { + wire.destroy() + } else { + timeoutId = setTimeout(onChokeTimeout, timeoutMs) + } + } + + var i = 0 + function updateSeedStatus () { + if (wire.peerPieces.length !== self.storage.pieces.length) return + for (; i < self.storage.pieces.length; ++i) { + if (!wire.peerPieces.get(i)) return + } + wire.isSeeder = true + wire.choke() // always choke seeders + } + + wire.on('bitfield', function () { + updateSeedStatus() + self._update() + }) + + wire.on('have', function () { + updateSeedStatus() + self._update() + }) + + wire.once('interested', function () { + wire.unchoke() + }) + + wire.on('close', function () { + clearTimeout(timeoutId) + }) + + wire.on('choke', function () { + clearTimeout(timeoutId) + timeoutId = setTimeout(onChokeTimeout, timeoutMs) + }) + + wire.on('unchoke', function () { + clearTimeout(timeoutId) + self._update() + }) + + wire.on('request', function (index, offset, length, cb) { + // Disconnect from peers that request more than 128KB, per spec + if (length > MAX_BLOCK_LENGTH) { + debug(wire.remoteAddress, 'requested invalid block size', length) + return wire.destroy() + } + + self.storage.readBlock(index, offset, length, cb) + }) + + wire.bitfield(self.storage.bitfield) // always send bitfield (required) + wire.interested() // always start out interested + + timeoutId = setTimeout(onChokeTimeout, timeoutMs) + + wire.isSeeder = false + updateSeedStatus() +} + +/** + * Called when the metadata, swarm, and underlying storage are all fully initialized. + */ +Torrent.prototype._onStorage = function () { + var self = this + debug('on storage') + + // allow writes to storage only after initial piece verification is finished + self.storage.readonly = false + + // start off selecting the entire torrent with low priority + self.select(0, self.storage.pieces.length - 1, false) + + self._rechokeIntervalId = setInterval(self._rechoke.bind(self), RECHOKE_INTERVAL) + self._rechokeIntervalId.unref && self._rechokeIntervalId.unref() + + process.nextTick(function () { + self.ready = true + self.emit('ready') + }) +} + +/** + * When a piece is fully downloaded, notify all peers with a HAVE message. + * @param {Piece} piece + */ +Torrent.prototype._onStoragePiece = function (piece) { + var self = this + debug('piece done %s', piece.index) + self._reservations[piece.index] = null + + self.swarm.wires.forEach(function (wire) { + wire.have(piece.index) + }) + + self._gcSelections() +} + +/** + * Called on selection changes. + */ +Torrent.prototype._updateSelections = function () { + var self = this + if (!self.swarm || self._destroyed) return + if (!self.metadata) return self.once('metadata', self._updateSelections.bind(self)) + + process.nextTick(self._gcSelections.bind(self)) + self._updateInterest() + self._update() +} + +/** + * Garbage collect selections with respect to the storage's current state. + */ +Torrent.prototype._gcSelections = function () { + var self = this + + for (var i = 0; i < self._selections.length; i++) { + var s = self._selections[i] + var oldOffset = s.offset + + // check for newly downloaded pieces in selection + while (self.storage.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) { + s.offset++ + } + + if (oldOffset !== s.offset) s.notify() + if (s.to !== s.from + s.offset) continue + if (!self.storage.bitfield.get(s.from + s.offset)) continue + + // remove fully downloaded selection + self._selections.splice(i--, 1) // decrement i to offset splice + s.notify() // TODO: this may notify twice in a row. is this a problem? + self._updateInterest() + } + + if (!self._selections.length) self.emit('idle') +} + +/** + * Update interested status for all peers. + */ +Torrent.prototype._updateInterest = function () { + var self = this + + var prev = self._amInterested + self._amInterested = !!self._selections.length + + self.swarm.wires.forEach(function (wire) { + // TODO: only call wire.interested if the wire has at least one piece we need + if (self._amInterested) wire.interested() + else wire.uninterested() + }) + + if (prev === self._amInterested) return + if (self._amInterested) self.emit('interested') + else self.emit('uninterested') +} + +/** + * Heartbeat to update all peers and their requests. + */ +Torrent.prototype._update = function () { + var self = this + if (self._destroyed) return + + // update wires in random order for better request distribution + randomizedForEach(self.swarm.wires, self._updateWire.bind(self)) +} + +/** + * Attempts to update a peer's requests + */ +Torrent.prototype._updateWire = function (wire) { + var self = this + + if (wire.peerChoking) return + if (!wire.downloaded) return validateWire() + + trySelectWire(false) || trySelectWire(true) + + function genPieceFilterFunc (start, end, tried, rank) { + return function (i) { + return i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i)) + } + } + + // TODO: Do we need both validateWire and trySelectWire? + function validateWire () { + if (wire.requests.length) return + + for (var i = self._selections.length; i--;) { + var next = self._selections[i] + + var piece + if (self.strategy === 'rarest') { + var start = next.from + next.offset + var end = next.to + var len = end - start + 1 + var tried = {} + var tries = 0 + var filter = genPieceFilterFunc(start, end, tried) + + while (tries < len) { + piece = self.rarityMap.getRarestPiece(filter) + if (piece < 0) break + if (self._request(wire, piece, false)) return + tried[piece] = true + tries += 1 + } + } else { + for (piece = next.to; piece >= next.from + next.offset; --piece) { + if (!wire.peerPieces.get(piece)) continue + if (self._request(wire, piece, false)) return + } + } + } + + // TODO: wire failed to validate as useful; should we close it? + } + + function speedRanker () { + var speed = wire.downloadSpeed() || 1 + if (speed > SPEED_THRESHOLD) return function () { return true } + + var secs = MAX_OUTSTANDING_REQUESTS * Storage.BLOCK_LENGTH / speed + var tries = 10 + var ptr = 0 + + return function (index) { + if (!tries || self.storage.bitfield.get(index)) return true + + var piece = self.storage.pieces[index] + var missing = piece.blocks.length - piece.blocksWritten + + for (; ptr < self.swarm.wires.length; ptr++) { + var otherWire = self.swarm.wires[ptr] + var otherSpeed = otherWire.downloadSpeed() + + if (otherSpeed < SPEED_THRESHOLD) continue + if (otherSpeed <= speed) continue + if (!otherWire.peerPieces.get(index)) continue + if ((missing -= otherSpeed * secs) > 0) continue + + tries-- + return false + } + + return true + } + } + + function shufflePriority (i) { + var last = i + for (var j = i; j < self._selections.length && self._selections[j].priority; j++) { + last = j + } + var tmp = self._selections[i] + self._selections[i] = self._selections[last] + self._selections[last] = tmp + } + + function trySelectWire (hotswap) { + if (wire.requests.length >= MAX_OUTSTANDING_REQUESTS) return true + var rank = speedRanker() + + for (var i = 0; i < self._selections.length; i++) { + var next = self._selections[i] + + var piece + if (self.strategy === 'rarest') { + var start = next.from + next.offset + var end = next.to + var len = end - start + 1 + var tried = {} + var tries = 0 + var filter = genPieceFilterFunc(start, end, tried, rank) + + while (tries < len) { + piece = self.rarityMap.getRarestPiece(filter) + if (piece < 0) break + + // request all non-reserved blocks in this piece + while (self._request(wire, piece, self._critical[piece] || hotswap)) {} + + if (wire.requests.length < MAX_OUTSTANDING_REQUESTS) { + tried[piece] = true + tries++ + continue + } + + if (next.priority) shufflePriority(i) + return true + } + } else { + for (piece = next.from + next.offset; piece <= next.to; piece++) { + if (!wire.peerPieces.get(piece) || !rank(piece)) continue + + // request all non-reserved blocks in piece + while (self._request(wire, piece, self._critical[piece] || hotswap)) {} + + if (wire.requests.length < MAX_OUTSTANDING_REQUESTS) continue + + if (next.priority) shufflePriority(i) + return true + } + } + } + + return false + } +} + +/** + * Called periodically to update the choked status of all peers, handling optimistic + * unchoking as described in BEP3. + */ +Torrent.prototype._rechoke = function () { + var self = this + + if (self._rechokeOptimisticTime > 0) + self._rechokeOptimisticTime -= 1 + else + self._rechokeOptimisticWire = null + + var peers = [] + + self.swarm.wires.forEach(function (wire) { + if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) { + peers.push({ + wire: wire, + downloadSpeed: wire.downloadSpeed(), + uploadSpeed: wire.uploadSpeed(), + salt: Math.random(), + isChoked: true + }) + } + }) + + peers.sort(rechokeSort) + + var unchokeInterested = 0 + var i = 0 + for (; i < peers.length && unchokeInterested < self._rechokeNumSlots; ++i) { + peers[i].isChoked = false + if (peers[i].wire.peerInterested) unchokeInterested += 1 + } + + // Optimistically unchoke a peer + if (!self._rechokeOptimisticWire && i < peers.length && self._rechokeNumSlots) { + var candidates = peers.slice(i).filter(function (peer) { return peer.wire.peerInterested }) + var optimistic = candidates[randomInt(candidates.length)] + + if (optimistic) { + optimistic.isChoked = false + self._rechokeOptimisticWire = optimistic.wire + self._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION + } + } + + // Unchoke best peers + peers.forEach(function (peer) { + if (peer.wire.amChoking !== peer.isChoked) { + if (peer.isChoked) peer.wire.choke() + else peer.wire.unchoke() + } + }) + + function rechokeSort (peerA, peerB) { + // Prefer higher download speed + if (peerA.downloadSpeed !== peerB.downloadSpeed) + return peerB.downloadSpeed - peerA.downloadSpeed + + // Prefer higher upload speed + if (peerA.uploadSpeed !== peerB.uploadSpeed) + return peerB.uploadSpeed - peerA.uploadSpeed + + // Prefer unchoked + if (peerA.wire.amChoking !== peerB.wire.amChoking) + return peerA.wire.amChoking ? 1 : -1 + + // Random order + return peerA.salt - peerB.salt + } +} + +/** + * Attempts to cancel a slow block request from another wire such that the + * given wire may effectively swap out the request for one of its own. + */ +Torrent.prototype._hotswap = function (wire, index) { + var self = this + if (!self.hotswapEnabled) return false + + var speed = wire.downloadSpeed() + if (speed < Storage.BLOCK_LENGTH) return false + if (!self._reservations[index]) return false + + var r = self._reservations[index] + if (!r) { + return false + } + + var minSpeed = Infinity + var minWire + + var i + for (i = 0; i < r.length; i++) { + var otherWire = r[i] + if (!otherWire || otherWire === wire) continue + + var otherSpeed = otherWire.downloadSpeed() + if (otherSpeed >= SPEED_THRESHOLD) continue + if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue + + minWire = otherWire + minSpeed = otherSpeed + } + + if (!minWire) return false + + for (i = 0; i < r.length; i++) { + if (r[i] === minWire) r[i] = null + } + + for (i = 0; i < minWire.requests.length; i++) { + var req = minWire.requests[i] + if (req.piece !== index) continue + + self.storage.cancelBlock(index, req.offset) + } + + self.emit('hotswap', minWire, wire, index) + return true +} + +/** + * Attempts to request a block from the given wire. + */ +Torrent.prototype._request = function (wire, index, hotswap) { + var self = this + var numRequests = wire.requests.length + + if (self.storage.bitfield.get(index)) return false + if (numRequests >= MAX_OUTSTANDING_REQUESTS) return false + + var endGame = (wire.requests.length === 0 && self.storage.numMissing < 30) + var block = self.storage.reserveBlock(index, endGame) + + if (!block && !endGame && hotswap && self._hotswap(wire, index)) + block = self.storage.reserveBlock(index, false) + if (!block) return false + + var r = self._reservations[index] + if (!r) { + r = self._reservations[index] = [] + } + var i = r.indexOf(null) + if (i === -1) i = r.length + r[i] = wire + + function gotPiece (err, buffer) { + if (!self.ready) { + self.once('ready', function () { + gotPiece(err, buffer) + }) + return + } + + if (r[i] === wire) r[i] = null + + if (err) { + debug('error getting piece ' + index + '(offset: ' + block.offset + ' length: ' + block.length + ') from ' + wire.remoteAddress + ' ' + err.message) + self.storage.cancelBlock(index, block.offset) + process.nextTick(self._update.bind(self)) + return false + } else { + // debug('got piece ' + index + '(offset: ' + block.offset + ' length: ' + block.length + ') from ' + wire.remoteAddress) + self.storage.writeBlock(index, block.offset, buffer, function (err) { + if (err) { + debug('error writing block') + self.storage.cancelBlock(index, block.offset) + } + + process.nextTick(self._update.bind(self)) + }) + } + } + + wire.request(index, block.offset, block.length, gotPiece) + + return true +} + +/** + * Returns a random integer in [0,high) + */ +function randomInt (high) { + return Math.random() * high | 0 +} + +/** + * Iterates through the given array in a random order, calling the given + * callback for each element. + */ +function randomizedForEach (array, cb) { + var indices = array.map(function (value, index) { return index }) + + for (var i = 0, len = indices.length; i < len; ++i) { + var j = randomInt(len) + var tmp = indices[i] + indices[i] = indices[j] + indices[j] = tmp + } + + indices.forEach(function (index) { + cb(array[index], index, array) + }) +} |