Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorFeross Aboukhadijeh <feross@feross.org>2016-04-21 09:10:32 +0300
committerFeross Aboukhadijeh <feross@feross.org>2016-04-21 09:10:32 +0300
commit3daee2c66cbf752b9e6e49b99492b8c1914a4a58 (patch)
treefbdd6c10f6a64674268dc77d1b3ccb4a37c44a65 /lib
parent7a7c4a8b8c49f5c92b7c20ff439bc8614f7d607e (diff)
BREAKING: Major cleanup
### Added - `client.listening` property to signal whether TCP server is listening for incoming connections. ### Changed - Merged `Swarm` class into `Torrent` object. Properties on `torrent.swarm` (like `torrent.swarm.wires`) now exist on `torrent` (e.g. `torrent.wires`). - `torrent.addPeer` can no longer be called before the `infoHash` event has been emitted. - Remove `torrent.on('listening')` event. Use `client.on('listening')` instead. - Remove support from `TCPPool` for listening on multiple ports. This was not used by WebTorrent and just added complexity. There is now a single `TCPPool` instance for the whole WebTorrent client. - Deprecate: Do not use `client.download()` anymore. Use `client.add()` instead. - Deprecate: Do not use `torrent.swarm` anymore. Use `torrent` instead. ### Fixed - When there is a `torrent.on('error')` listener, don't also emit `client.on('error')`. - Do not return existing torrent object when duplicate torrent is added. Fire an `'error'` event instead. - Memory leak of `Torrent` object caused by `RarityMap` - Memory leak of `Torrent` object caused by `TCPPool` - `client.ratio` and `torrent.ratio` are now calculated as `uploaded / received` instead of `uploaded / downloaded`.
Diffstat (limited to 'lib')
-rw-r--r--lib/peer.js34
-rw-r--r--lib/rarity-map.js127
-rw-r--r--lib/swarm.js413
-rw-r--r--lib/tcp-pool.js190
-rw-r--r--lib/torrent.js452
-rw-r--r--lib/webconn.js3
6 files changed, 491 insertions, 728 deletions
diff --git a/lib/peer.js b/lib/peer.js
index 06786c5..fcd296d 100644
--- a/lib/peer.js
+++ b/lib/peer.js
@@ -60,10 +60,10 @@ exports.createTCPOutgoingPeer = function (addr, swarm) {
/**
* Peer that represents a Web Seed (BEP17 / BEP19).
*/
-exports.createWebSeedPeer = function (url, parsedTorrent, swarm) {
+exports.createWebSeedPeer = function (url, swarm) {
var peer = new Peer(url, 'webSeed')
peer.swarm = swarm
- peer.conn = new WebConn(url, parsedTorrent)
+ peer.conn = new WebConn(url, swarm)
peer.onConnect()
@@ -71,7 +71,7 @@ exports.createWebSeedPeer = function (url, parsedTorrent, swarm) {
}
/**
- * Peer. Represents a peer in the Swarm.
+ * Peer. Represents a peer in the torrent swarm.
*
* @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds)
* @param {string} type the type of the peer
@@ -161,7 +161,7 @@ Peer.prototype.onHandshake = function (infoHash, peerId) {
return self.destroy(new Error('unexpected handshake info hash for this swarm'))
}
if (peerId === self.swarm.peerId) {
- return self.destroy(new Error('refusing to handshake with self'))
+ return self.destroy(new Error('refusing to connect to ourselves'))
}
debug('Peer %s got handshake %s', self.id, infoHash)
@@ -170,27 +170,12 @@ Peer.prototype.onHandshake = function (infoHash, peerId) {
self.retries = 0
- self.wire.on('download', function (downloaded) {
- if (self.destroyed) return
- self.swarm.downloaded += downloaded
- self.swarm.downloadSpeed(downloaded)
- self.swarm.emit('download', downloaded)
- })
-
- self.wire.on('upload', function (uploaded) {
- if (self.destroyed) return
- self.swarm.uploaded += uploaded
- self.swarm.uploadSpeed(uploaded)
- self.swarm.emit('upload', uploaded)
- })
-
- self.swarm.wires.push(self.wire)
-
var addr = self.addr
if (!addr && self.conn.remoteAddress) {
addr = self.conn.remoteAddress + ':' + self.conn.remotePort
}
- self.swarm.emit('wire', self.wire, addr)
+ self.swarm._onWire(self.wire, addr)
+
// swarm could be destroyed in user's 'wire' event handler
if (!self.swarm || self.swarm.destroyed) return
@@ -199,7 +184,10 @@ Peer.prototype.onHandshake = function (infoHash, peerId) {
Peer.prototype.handshake = function () {
var self = this
- self.wire.handshake(self.swarm.infoHash, self.swarm.peerId, self.swarm.handshakeOpts)
+ var opts = {
+ dht: self.swarm.private ? false : !!self.swarm.client.dht
+ }
+ self.wire.handshake(self.swarm.infoHash, self.swarm.client.peerId, opts)
self.sentHandshake = true
}
@@ -236,8 +224,8 @@ Peer.prototype.destroy = function (err) {
var conn = self.conn
var wire = self.wire
- self.conn = null
self.swarm = null
+ self.conn = null
self.wire = null
if (swarm && wire) {
diff --git a/lib/rarity-map.js b/lib/rarity-map.js
index 0b1d147..c362077 100644
--- a/lib/rarity-map.js
+++ b/lib/rarity-map.js
@@ -1,57 +1,34 @@
module.exports = RarityMap
/**
- * Mapping of torrent pieces to their respective availability in the swarm. Used by
- * the torrent manager for implementing the rarest piece first selection strategy.
+ * Mapping of torrent pieces to their respective availability in the torrent swarm. Used
+ * by the torrent manager for implementing the rarest piece first selection strategy.
*
- * @param {Swarm} swarm bittorrent-swarm to track availability
- * @param {number} numPieces number of pieces in the torrent
+ * @param {Torrent} torrent
*/
-function RarityMap (swarm, numPieces) {
+function RarityMap (torrent) {
var self = this
- self.pieces = []
- self.swarm = swarm
- self.numPieces = numPieces
-
- function initWire (wire) {
- wire.on('have', function (index) {
- self.pieces[index] += 1
- })
- wire.on('bitfield', function () {
- self.recalculate()
- })
- wire.on('close', function () {
- for (var i = 0; i < self.numPieces; ++i) {
- self.pieces[i] -= wire.peerPieces.get(i)
- }
- })
- }
+ self._torrent = torrent
+ self._numPieces = torrent.pieces.length
+ self._pieces = []
- self.swarm.wires.forEach(initWire)
- self.swarm.on('wire', function (wire) {
+ self._onWire = function (wire) {
+ self.recalculate()
+ self._initWire(wire)
+ }
+ self._onWireHave = function (index) {
+ self._pieces[index] += 1
+ }
+ self._onWireBitfield = function () {
self.recalculate()
- initWire(wire)
- })
-
- self.recalculate()
-}
-
-/**
- * Recalculates piece availability across all peers in the swarm.
- */
-RarityMap.prototype.recalculate = function () {
- var self = this
-
- for (var i = 0; i < self.numPieces; ++i) {
- self.pieces[i] = 0
}
- self.swarm.wires.forEach(function (wire) {
- for (var i = 0; i < self.numPieces; ++i) {
- self.pieces[i] += wire.peerPieces.get(i)
- }
+ self._torrent.wires.forEach(function (wire) {
+ self._initWire(wire)
})
+ self._torrent.on('wire', self._onWire)
+ self.recalculate()
}
/**
@@ -62,15 +39,15 @@ RarityMap.prototype.recalculate = function () {
* @return {number} index of rarest piece, or -1
*/
RarityMap.prototype.getRarestPiece = function (pieceFilterFunc) {
- var self = this
+ if (!pieceFilterFunc) pieceFilterFunc = trueFn
+
var candidates = []
var min = Infinity
- pieceFilterFunc = pieceFilterFunc || function () { return true }
- for (var i = 0; i < self.numPieces; ++i) {
+ for (var i = 0; i < this._numPieces; ++i) {
if (!pieceFilterFunc(i)) continue
- var availability = self.pieces[i]
+ var availability = this._pieces[i]
if (availability === min) {
candidates.push(i)
} else if (availability < min) {
@@ -86,3 +63,61 @@ RarityMap.prototype.getRarestPiece = function (pieceFilterFunc) {
return -1
}
}
+
+RarityMap.prototype.destroy = function () {
+ var self = this
+ self._torrent.removeListener('wire', self._onWire)
+ self._torrent.wires.forEach(function (wire) {
+ self._cleanupWireEvents(wire)
+ })
+ self._torrent = null
+ self._pieces = null
+
+ self._onWire = null
+ self._onWireHave = null
+ self._onWireBitfield = null
+}
+
+RarityMap.prototype._initWire = function (wire) {
+ var self = this
+
+ wire._onClose = function () {
+ self._cleanupWireEvents(wire)
+ for (var i = 0; i < this._numPieces; ++i) {
+ self._pieces[i] -= wire.peerPieces.get(i)
+ }
+ }
+
+ wire.on('have', self._onWireHave)
+ wire.on('bitfield', self._onWireBitfield)
+ wire.once('close', wire._onClose)
+}
+
+/**
+ * Recalculates piece availability across all peers in the torrent.
+ */
+RarityMap.prototype.recalculate = function () {
+ var i
+ for (i = 0; i < this._numPieces; ++i) {
+ this._pieces[i] = 0
+ }
+
+ var numWires = this._torrent.wires.length
+ for (i = 0; i < numWires; ++i) {
+ var wire = this._torrent.wires[i]
+ for (var j = 0; j < this._numPieces; ++j) {
+ this._pieces[j] += wire.peerPieces.get(j)
+ }
+ }
+}
+
+RarityMap.prototype._cleanupWireEvents = function (wire) {
+ wire.removeListener('have', this._onWireHave)
+ wire.removeListener('bitfield', this._onWireBitfield)
+ if (wire._onClose) wire.removeListener('close', wire._onClose)
+ wire._onClose = null
+}
+
+function trueFn () {
+ return true
+}
diff --git a/lib/swarm.js b/lib/swarm.js
deleted file mode 100644
index 7edee82..0000000
--- a/lib/swarm.js
+++ /dev/null
@@ -1,413 +0,0 @@
-module.exports = Swarm
-
-var addrToIPPort = require('addr-to-ip-port')
-var debug = require('debug')('webtorrent:swarm')
-var EventEmitter = require('events').EventEmitter
-var inherits = require('inherits')
-var net = require('net') // browser exclude
-var speedometer = require('speedometer')
-
-var Peer = require('./peer')
-var TCPPool = require('./tcp-pool') // browser-exclude
-
-var MAX_CONNS = 55
-var RECONNECT_WAIT = [ 1000, 5000, 15000 ]
-
-inherits(Swarm, EventEmitter)
-
-/**
- * BitTorrent Swarm
- *
- * Abstraction of a BitTorrent "swarm", which is handy for managing all peer
- * connections for a given torrent download. This handles connecting to peers,
- * listening for incoming connections, and doing the initial peer wire protocol
- * handshake with peers. It also tracks total data uploaded/downloaded to/from
- * the swarm.
- *
- * @param {Buffer|string} infoHash
- * @param {Buffer|string} peerId
- * @param {Object} opts
- * @param {Object} opts.handshake handshake options (passed to bittorrent-protocol)
- * @param {number} opts.maxConns maximum number of connections in swarm
- */
-function Swarm (infoHash, peerId, opts) {
- var self = this
- if (!(self instanceof Swarm)) return new Swarm(infoHash, peerId, opts)
- EventEmitter.call(self)
-
- self.infoHash = typeof infoHash === 'string'
- ? infoHash
- : infoHash.toString('hex')
- self.infoHashBuffer = new Buffer(self.infoHash, 'hex')
-
- self.peerId = typeof peerId === 'string'
- ? peerId
- : peerId.toString('hex')
- self.peerIdBuffer = new Buffer(self.peerId, 'hex')
-
- if (!opts) opts = {}
-
- debug('new swarm (i %s p %s)', self.infoHash, self.peerId)
-
- self.handshakeOpts = opts.handshake // handshake extensions (optional)
- self.maxConns = Number(opts.maxConns) || MAX_CONNS
-
- self.destroyed = false
- self.listening = false
- self.paused = false
-
- self.server = null // tcp listening socket
- self.wires = [] // open wires (added *after* handshake)
-
- self._queue = [] // queue of outgoing tcp peers to connect to
- self._peers = {} // connected peers (addr/peerId -> Peer)
- self._peersLength = 0 // number of elements in `self._peers` (cache, for perf)
- self._port = 0 // tcp listening port (cache, for perf)
-
- // track stats
- self.downloaded = 0
- self.uploaded = 0
- self.downloadSpeed = speedometer()
- self.uploadSpeed = speedometer()
-}
-
-Object.defineProperty(Swarm.prototype, 'ratio', {
- get: function () {
- var self = this
- return (self.uploaded / self.downloaded) || 0
- }
-})
-
-Object.defineProperty(Swarm.prototype, 'numQueued', {
- get: function () {
- var self = this
- return self._queue.length + (self._peersLength - self.numConns)
- }
-})
-
-Object.defineProperty(Swarm.prototype, 'numConns', {
- get: function () {
- var self = this
- var numConns = 0
- for (var id in self._peers) {
- if (self._peers[id].connected) numConns += 1
- }
- return numConns
- }
-})
-
-Object.defineProperty(Swarm.prototype, 'numPeers', {
- get: function () {
- var self = this
- return self.wires.length
- }
-})
-
-/**
- * Add a peer to the swarm.
- * @param {string|simple-peer} peer "ip:port" string or simple-peer instance
- * @param {string} peer.id bittorrent peer id (when `peer` is simple-peer)
- * @return {boolean} true if peer was added, false if peer was invalid
-
- */
-Swarm.prototype.addPeer = function (peer) {
- var self = this
- var newPeer = self._addPeer(peer)
- return !!newPeer // don't expose private Peer instance in return value
-}
-
-Swarm.prototype._addPeer = function (peer) {
- var self = this
- if (self.destroyed) {
- debug('ignoring added peer: swarm already destroyed')
- if (typeof peer !== 'string') peer.destroy()
- return null
- }
- if (typeof peer === 'string' && !self._validAddr(peer)) {
- debug('ignoring added peer: invalid address %s', peer)
- return null
- }
-
- var id = (peer && peer.id) || peer
- if (self._peers[id]) {
- debug('ignoring added peer: duplicate peer id')
- if (typeof peer !== 'string') peer.destroy()
- return null
- }
-
- if (self.paused) {
- debug('ignoring added peer: swarm paused')
- if (typeof peer !== 'string') peer.destroy()
- return null
- }
-
- debug('addPeer %s', id)
-
- var newPeer
- if (typeof peer === 'string') {
- // `peer` is an addr ("ip:port" string)
- newPeer = Peer.createTCPOutgoingPeer(peer, self)
- } else {
- // `peer` is a WebRTC connection (simple-peer)
- newPeer = Peer.createWebRTCPeer(peer, self)
- }
-
- self._peers[newPeer.id] = newPeer
- self._peersLength += 1
-
- if (typeof peer === 'string') {
- // `peer` is an addr ("ip:port" string)
- self._queue.push(newPeer)
- self._drain()
- }
-
- return newPeer
-}
-
-/**
- * Add a web seed to the swarm.
- * @param {string} url web seed url
- * @param {Object} parsedTorrent
- */
-Swarm.prototype.addWebSeed = function (url, parsedTorrent) {
- var self = this
- if (self.destroyed) return
-
- if (!/^https?:\/\/.+/.test(url)) {
- debug('ignoring invalid web seed %s (from swarm.addWebSeed)', url)
- return
- }
-
- if (self._peers[url]) return
-
- debug('addWebSeed %s', url)
-
- var newPeer = Peer.createWebSeedPeer(url, parsedTorrent, self)
- self._peers[newPeer.id] = newPeer
- self._peersLength += 1
-}
-
-/**
- * Called whenever a new incoming TCP peer connects to this swarm. Called with a peer
- * that has already sent a handshake.
- * @param {Peer} peer
- */
-Swarm.prototype._addIncomingPeer = function (peer) {
- var self = this
- if (self.destroyed) return peer.destroy(new Error('swarm already destroyed'))
- if (self.paused) return peer.destroy(new Error('swarm paused'))
-
- if (!self._validAddr(peer.addr)) {
- return peer.destroy(new Error('invalid addr ' + peer.addr + ' (from incoming)'))
- }
- debug('_addIncomingPeer %s', peer.id)
-
- self._peers[peer.id] = peer
- self._peersLength += 1
-}
-
-/**
- * Remove a peer from the swarm.
- * @param {string} id for tcp peers, "ip:port" string; for webrtc peers, peerId
- */
-Swarm.prototype.removePeer = function (id) {
- var self = this
- var peer = self._peers[id]
- if (!peer) return
-
- debug('removePeer %s', id)
-
- delete self._peers[id]
- self._peersLength -= 1
-
- peer.destroy()
-
- // If swarm was at capacity before, try to open a new connection now
- self._drain()
-}
-
-/**
- * Temporarily stop connecting to new peers. Note that this does not pause the streams
- * of existing connections or their wires.
- */
-Swarm.prototype.pause = function () {
- var self = this
- if (self.destroyed) return
- debug('pause')
- self.paused = true
-}
-
-/**
- * Resume connecting to new peers.
- */
-Swarm.prototype.resume = function () {
- var self = this
- if (self.destroyed) return
- debug('resume')
- self.paused = false
- self._drain()
-}
-
-/**
- * Listen on the given port for peer connections.
- * @param {number} port
- * @param {string=} hostname
- * @param {function=} onlistening
- */
-Swarm.prototype.listen = function (port, hostname, onlistening) {
- var self = this
- if (typeof hostname === 'function') {
- onlistening = hostname
- hostname = undefined
- }
- if (self.listening) throw new Error('swarm already listening')
- if (onlistening) self.once('listening', onlistening)
-
- if (typeof TCPPool === 'function') {
- self._port = port || TCPPool.getDefaultListenPort(self.infoHash)
- self._hostname = hostname
-
- debug('listen %s', port)
-
- var pool = TCPPool.addSwarm(self)
- self.server = pool.server
- } else {
- // In browser, listen() is no-op, but still fire 'listening' event so that
- // same code works in node and the browser.
- process.nextTick(function () {
- self._onListening(0)
- })
- }
-}
-
-Swarm.prototype._onListening = function (port) {
- var self = this
- self._port = port
- self.listening = true
- self.emit('listening')
-}
-
-Swarm.prototype.address = function () {
- var self = this
- if (!self.listening) return null
- return self.server
- ? self.server.address()
- : { port: 0, family: 'IPv4', address: '127.0.0.1' }
-}
-
-/**
- * Destroy the swarm, close all open peer connections, and do cleanup.
- * @param {function} onclose
- */
-Swarm.prototype.destroy = function (onclose) {
- var self = this
- if (self.destroyed) return
-
- self.destroyed = true
- self.listening = false
- self.paused = false
-
- if (onclose) self.once('close', onclose)
-
- debug('destroy')
-
- for (var id in self._peers) {
- self.removePeer(id)
- }
-
- if (typeof TCPPool === 'function') {
- TCPPool.removeSwarm(self, function () {
- // TODO: only emit when all peers are destroyed
- self.emit('close')
- })
- } else {
- process.nextTick(function () {
- self.emit('close')
- })
- }
-}
-
-/**
- * Pop a peer off the FIFO queue and connect to it. When _drain() gets called,
- * the queue will usually have only one peer in it, except when there are too
- * many peers (over `this.maxConns`) in which case they will just sit in the
- * queue until another connection closes.
- */
-Swarm.prototype._drain = function () {
- var self = this
- debug('_drain numConns %s maxConns %s', self.numConns, self.maxConns)
- if (typeof net.connect !== 'function' || self.destroyed || self.paused ||
- self.numConns >= self.maxConns) {
- return
- }
- debug('drain (%s queued, %s/%s peers)', self.numQueued, self.numPeers, self.maxConns)
-
- var peer = self._queue.shift()
- if (!peer) return // queue could be empty
-
- debug('tcp connect attempt to %s', peer.addr)
-
- var parts = addrToIPPort(peer.addr)
- var opts = {
- host: parts[0],
- port: parts[1]
- }
- if (self._hostname) opts.localAddress = self._hostname
-
- var conn = peer.conn = net.connect(opts)
-
- conn.once('connect', function () { peer.onConnect() })
- conn.once('error', function (err) { peer.destroy(err) })
- peer.startConnectTimeout()
-
- // When connection closes, attempt reconnect after timeout (with exponential backoff)
- conn.on('close', function () {
- if (self.destroyed) return
-
- // TODO: If torrent is done, do not try to reconnect after a timeout
-
- if (peer.retries >= RECONNECT_WAIT.length) {
- debug(
- 'conn %s closed: will not re-add (max %s attempts)',
- peer.addr, RECONNECT_WAIT.length
- )
- return
- }
-
- var ms = RECONNECT_WAIT[peer.retries]
- debug(
- 'conn %s closed: will re-add to queue in %sms (attempt %s)',
- peer.addr, ms, peer.retries + 1
- )
-
- var reconnectTimeout = setTimeout(function reconnectTimeout () {
- var newPeer = self._addPeer(peer.addr)
- if (newPeer) newPeer.retries = peer.retries + 1
- }, ms)
- if (reconnectTimeout.unref) reconnectTimeout.unref()
- })
-}
-
-Swarm.prototype._onError = function (err) {
- var self = this
- self.emit('error', err)
- self.destroy()
-}
-
-/**
- * Returns `true` if string is valid IPv4/6 address, and is not the address of this swarm.
- * @param {string} addr
- * @return {boolean}
- */
-Swarm.prototype._validAddr = function (addr) {
- var self = this
- var parts
- try {
- parts = addrToIPPort(addr)
- } catch (e) {
- return false
- }
- var host = parts[0]
- var port = parts[1]
- return port > 0 && port < 65535 && !(host === '127.0.0.1' && port === self._port)
-}
diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js
index 7e95f6c..5e15d0a 100644
--- a/lib/tcp-pool.js
+++ b/lib/tcp-pool.js
@@ -7,12 +7,6 @@ var net = require('net') // browser exclude
var Peer = require('./peer')
/**
- * Shared TCP pools; shared among all swarms
- * @type {Object} port: number -> pool: TCPPool
- */
-var tcpPools = {}
-
-/**
* TCPPool
*
* A "TCP pool" allows multiple swarms to listen on the same TCP port and determines
@@ -20,107 +14,35 @@ var tcpPools = {}
* handshake that the remote peer sends.
*
* @param {number} port
- * @param {string} hostname
*/
-function TCPPool (port, hostname) {
+function TCPPool (client) {
var self = this
-
- self.port = port
- self.listening = false
- self.swarms = {} // infoHash (hex) -> Swarm
-
- debug('new TCPPool (port: %s, hostname: %s)', port, hostname)
-
- // Save incoming conns so they can be destroyed if server is closed before the conn is
- // passed off to a Swarm.
- self.pendingConns = []
+ debug('create tcp pool (port %s)', client.torrentPort)
self.server = net.createServer()
- self.server.on('connection', function (conn) { self._onConnection(conn) })
- self.server.on('error', function (err) { self._onError(err) })
- self.server.on('listening', function () { self._onListening() })
- self.server.listen(self.port, hostname)
-}
-
-/**
- * STATIC METHOD
- * Add a swarm to a pool, creating a new pool if necessary.
- * @param {Swarm} swarm
- */
-TCPPool.addSwarm = function (swarm) {
- var pool = tcpPools[swarm._port]
- if (!pool) pool = tcpPools[swarm._port] = new TCPPool(swarm._port, swarm._hostname)
- pool.addSwarm(swarm)
- return pool
-}
-
-/**
- * STATIC METHOD
- * Remove a swarm from its pool.
- * @param {Swarm} swarm
- */
-TCPPool.removeSwarm = function (swarm, cb) {
- var pool = tcpPools[swarm._port]
- if (!pool) return cb()
- pool.removeSwarm(swarm)
+ self._client = client
- if (Object.keys(pool.swarms).length === 0) pool.destroy(cb)
- else process.nextTick(cb)
-}
+ // Temporarily store incoming connections so they can be destroyed if the server is
+ // closed before the connection is passed off to a Torrent.
+ self._pendingConns = []
-/**
- * STATIC METHOD
- * When `Swarm.prototype.listen` is called without specifying a port, a reasonable
- * default port must be chosen. If there already exists an active TCP pool, then return
- * that pool's port so that TCP server can be re-used. Otherwise, return 0 so node will
- * pick a free port.
- *
- * @return {number} port
- */
-TCPPool.getDefaultListenPort = function (infoHash) {
- for (var port in tcpPools) {
- var pool = tcpPools[port]
- if (!pool.swarms[infoHash]) return pool.port
+ self._onConnectionBound = function (conn) {
+ self._onConnection(conn)
}
- return 0
-}
-
-/**
- * Add a swarm to this TCP pool.
- * @param {Swarm} swarm
- */
-TCPPool.prototype.addSwarm = function (swarm) {
- var self = this
- if (self.swarms[swarm.infoHash]) {
- process.nextTick(function () {
- swarm._onError(new Error(
- 'There is already a swarm with info hash ' + swarm.infoHash + ' ' +
- 'listening on port ' + swarm._port
- ))
- })
- return
+ self._onListening = function () {
+ self._client._onListening()
}
- self.swarms[swarm.infoHash] = swarm
-
- if (self.listening) {
- process.nextTick(function () {
- swarm._onListening(self.port)
- })
+ self._onError = function (err) {
+ self._client._destroy(err)
}
- debug('add swarm %s to tcp pool %s', swarm.infoHash, self.port)
-}
+ self.server.on('connection', self._onConnectionBound)
+ self.server.on('listening', self._onListening)
+ self.server.on('error', self._onError)
-/**
- * Remove a swarm from this TCP pool.
- * @param {Swarm} swarm
- */
-TCPPool.prototype.removeSwarm = function (swarm) {
- var self = this
- debug('remove swarm %s from tcp pool %s', swarm.infoHash, self.port)
- delete self.swarms[swarm.infoHash]
+ self.server.listen(client.torrentPort)
}
/**
@@ -129,47 +51,28 @@ TCPPool.prototype.removeSwarm = function (swarm) {
*/
TCPPool.prototype.destroy = function (cb) {
var self = this
- debug('destroy tcp pool %s', self.port)
+ debug('destroy tcp pool')
- self.listening = false
+ self.server.removeListener('connection', self._onConnectionBound)
+ self.server.removeListener('listening', self._onListening)
+ self.server.removeListener('error', self._onError)
// Destroy all open connection objects so server can close gracefully without waiting
// for connection timeout or remote peer to disconnect.
- self.pendingConns.forEach(function (conn) {
+ self._pendingConns.forEach(function (conn) {
+ conn.on('error', noop)
conn.destroy()
})
- delete tcpPools[self.port]
-
try {
self.server.close(cb)
} catch (err) {
if (cb) process.nextTick(cb)
}
-}
-
-TCPPool.prototype._onListening = function () {
- var self = this
-
- // Fix for Docker Node image. Sometimes server.address() returns `null`.
- // See issue: https://github.com/feross/bittorrent-swarm/pull/18
- var address = self.server.address() || { port: 0 }
- var port = address.port
-
- debug('tcp pool listening on %s', port)
- if (port !== self.port) {
- // `port` was 0 when `listen` was called; update to the port that node selected
- delete tcpPools[self.port]
- self.port = port
- tcpPools[self.port] = self
- }
-
- self.listening = true
-
- for (var infoHash in self.swarms) {
- self.swarms[infoHash]._onListening(self.port)
- }
+ self.server = null
+ self._client = null
+ self._pendingConns = null
}
/**
@@ -189,39 +92,34 @@ TCPPool.prototype._onConnection = function (conn) {
return
}
- self.pendingConns.push(conn)
- conn.once('close', removePendingConn)
-
- function removePendingConn () {
- arrayRemove(self.pendingConns, self.pendingConns.indexOf(conn))
- }
+ self._pendingConns.push(conn)
+ conn.once('close', cleanupPending)
var peer = Peer.createTCPIncomingPeer(conn)
- peer.wire.once('handshake', function (infoHash, peerId) {
- removePendingConn()
- conn.removeListener('close', removePendingConn)
+ var wire = peer.wire
+ wire.once('handshake', onHandshake)
- var swarm = self.swarms[infoHash]
- if (swarm) {
- peer.swarm = swarm
- swarm._addIncomingPeer(peer)
+ function onHandshake (infoHash, peerId) {
+ cleanupPending()
+
+ var torrent = self._client.get(infoHash)
+ if (torrent) {
+ peer.swarm = torrent
+ torrent._addIncomingPeer(peer)
peer.onHandshake(infoHash, peerId)
} else {
- var err = new Error('Unexpected info hash ' + infoHash + ' from incoming peer ' +
- peer.id + ': destroying peer')
+ var err = new Error(
+ 'Unexpected info hash ' + infoHash + ' from incoming peer ' + peer.id
+ )
peer.destroy(err)
}
- })
-}
+ }
-TCPPool.prototype._onError = function (err) {
- var self = this
- self.destroy()
- for (var infoHash in self.swarms) {
- var swarm = self.swarms[infoHash]
- self.removeSwarm(swarm)
- swarm._onError(err)
+ function cleanupPending () {
+ conn.removeListener('close', cleanupPending)
+ wire.removeListener('handshake', onHandshake)
+ arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn))
}
}
diff --git a/lib/torrent.js b/lib/torrent.js
index 7681da0..a5dc4c2 100644
--- a/lib/torrent.js
+++ b/lib/torrent.js
@@ -1,3 +1,7 @@
+// TODO: remove _onError, add _destroy(err, cb)
+// TODO: cleanup event listeners
+// TODO: Remove all inline docs, and move to docs/api.md
+
/* global URL, Blob */
module.exports = Torrent
@@ -15,6 +19,7 @@ var FSChunkStore = require('fs-chunk-store') // browser: `memory-chunk-store`
var ImmediateChunkStore = require('immediate-chunk-store')
var inherits = require('inherits')
var MultiStream = require('multistream')
+var net = require('net') // browser exclude
var os = require('os') // browser exclude
var parallel = require('run-parallel')
var parallelLimit = require('run-parallel-limit')
@@ -25,14 +30,15 @@ var Piece = require('torrent-piece')
var pump = require('pump')
var randomIterate = require('random-iterate')
var sha1 = require('simple-sha1')
+var speedometer = require('speedometer')
var uniq = require('uniq')
var ut_metadata = require('ut_metadata')
var ut_pex = require('ut_pex') // browser exclude
var File = require('./file')
+var Peer = require('./peer')
var RarityMap = require('./rarity-map')
var Server = require('./server') // browser exclude
-var Swarm = require('./swarm')
var MAX_BLOCK_LENGTH = 128 * 1024
var PIECE_TIMEOUT = 30000
@@ -47,6 +53,8 @@ var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds
var FILESYSTEM_CONCURRENCY = 2
+var RECONNECT_WAIT = [ 1000, 5000, 15000 ]
+
var TMP = typeof pathExists.sync === 'function'
? path.join(pathExists.sync('/tmp') ? '/tmp' : os.tmpDir(), 'webtorrent')
: '/tmp/webtorrent'
@@ -86,20 +94,34 @@ function Torrent (torrentId, client, opts) {
this.ready = false
this.destroyed = false
+ this.paused = false
+ this.done = false
+
this.metadata = null
this.store = null
- this.numBlockedPeers = 0
this.files = null
- this.done = false
this._amInterested = false
this.pieces = []
this._selections = []
this._critical = []
+ this.wires = [] // open wires (added *after* handshake)
+
+ this._queue = [] // queue of outgoing tcp peers to connect to
+ this._peers = {} // connected peers (addr/peerId -> Peer)
+ this._peersLength = 0 // number of elements in `this._peers` (cache, for perf)
+
+ // stats
+ this.received = 0
+ this.uploaded = 0
+ this._downloadSpeed = speedometer()
+ this._uploadSpeed = speedometer()
+
// for cleanup
this._servers = []
+ // TODO: remove this and expose a hook instead
// optimization: don't recheck every file if it hasn't changed
this._fileModtimes = opts.fileModtimes
@@ -130,14 +152,6 @@ Object.defineProperty(Torrent.prototype, 'downloaded', {
}
})
-Object.defineProperty(Torrent.prototype, 'received', {
- get: function () { return this.swarm ? this.swarm.downloaded : 0 }
-})
-
-Object.defineProperty(Torrent.prototype, 'uploaded', {
- get: function () { return this.swarm ? this.swarm.uploaded : 0 }
-})
-
// The number of missing pieces. Used to implement 'end game' mode.
// Object.defineProperty(Storage.prototype, 'numMissing', {
// get: function () {
@@ -151,11 +165,11 @@ Object.defineProperty(Torrent.prototype, 'uploaded', {
// })
Object.defineProperty(Torrent.prototype, 'downloadSpeed', {
- get: function () { return this.swarm ? this.swarm.downloadSpeed() : 0 }
+ get: function () { return this._downloadSpeed() }
})
Object.defineProperty(Torrent.prototype, 'uploadSpeed', {
- get: function () { return this.swarm ? this.swarm.uploadSpeed() : 0 }
+ get: function () { return this._uploadSpeed() }
})
Object.defineProperty(Torrent.prototype, 'progress', {
@@ -163,14 +177,14 @@ Object.defineProperty(Torrent.prototype, 'progress', {
})
Object.defineProperty(Torrent.prototype, 'ratio', {
- get: function () { return this.uploaded / (this.downloaded || 1) }
+ get: function () { return this.uploaded / (this.received || 1) }
})
Object.defineProperty(Torrent.prototype, 'numPeers', {
- get: function () { return this.swarm ? this.swarm.numPeers : 0 }
+ get: function () { return this.wires.length }
})
-// TODO: remove this
+// TODO: remove this (and file.getBlobURL?)
// Torrent file as a blob url
Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', {
get: function () {
@@ -182,13 +196,36 @@ Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', {
}
})
+Object.defineProperty(Torrent.prototype, '_numQueued', {
+ get: function () {
+ return this._queue.length + (this._peersLength - this._numConns)
+ }
+})
+
+Object.defineProperty(Torrent.prototype, '_numConns', {
+ get: function () {
+ var self = this
+ var numConns = 0
+ for (var id in self._peers) {
+ if (self._peers[id].connected) numConns += 1
+ }
+ return numConns
+ }
+})
+
+// TODO: remove in v2
+Object.defineProperty(Torrent.prototype, 'swarm', {
+ get: function () {
+ console.log('WebTorrent: `torrent.swarm` is deprecated. Use `torrent` directly instead.')
+ }
+})
+
Torrent.prototype._onTorrentId = function (torrentId) {
var self = this
if (self.destroyed) return
var parsedTorrent
try { parsedTorrent = parseTorrent(torrentId) } catch (err) {}
-
if (parsedTorrent) {
// Attempt to set infoHash property synchronously
self.infoHash = parsedTorrent.infoHash
@@ -210,6 +247,7 @@ Torrent.prototype._onTorrentId = function (torrentId) {
Torrent.prototype._onParsedTorrent = function (parsedTorrent) {
var self = this
if (self.destroyed) return
+ console.log('on parsed torrent')
self._processParsedTorrent(parsedTorrent)
@@ -219,44 +257,20 @@ Torrent.prototype._onParsedTorrent = function (parsedTorrent) {
if (!self.path) self.path = path.join(TMP, self.infoHash)
- // create swarm
- self.swarm = new Swarm(self.infoHash, self.client.peerId, {
- handshake: {
- dht: self.private ? false : !!self.client.dht
- },
- maxConns: self.client.maxConns
- })
- self.swarm.on('error', function (err) {
- self._onError(err)
- })
- self.swarm.on('wire', function (wire, addr) {
- self._onWire(wire, addr)
- })
-
- self.swarm.on('download', function (downloaded) {
- self.client._downloadSpeed(downloaded) // update overall client stats
- self.client.emit('download', downloaded)
- self.emit('download', downloaded)
- })
-
- self.swarm.on('upload', function (uploaded) {
- self.client._uploadSpeed(uploaded) // update overall client stats
- self.client.emit('upload', uploaded)
- self.emit('upload', uploaded)
- })
-
- // listen for peers (note: in the browser, this is a no-op and callback is called on
- // next tick)
- self.swarm.listen(self.client.torrentPort, function () {
- self._onSwarmListening()
- })
-
self._rechokeIntervalId = setInterval(function () {
self._rechoke()
}, RECHOKE_INTERVAL)
if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref()
self.emit('infoHash', self.infoHash)
+
+ if (self.client.listening) {
+ self._onListening()
+ } else {
+ self.client.once('listening', function () {
+ self._onListening()
+ })
+ }
}
Torrent.prototype._processParsedTorrent = function (parsedTorrent) {
@@ -282,14 +296,13 @@ Torrent.prototype._processParsedTorrent = function (parsedTorrent) {
this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent)
this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent)
+ console.log('process done')
}
-Torrent.prototype._onSwarmListening = function () {
+Torrent.prototype._onListening = function () {
var self = this
- if (self.destroyed) return
-
- if (self.swarm.server) self.client.torrentPort = self.swarm.address().port
-
+ if (self.discovery || self.destroyed) return
+ console.log('on listening')
var trackerOpts = {
rtcConfig: self.client._rtcConfig,
wrtc: self.client._wrtc,
@@ -336,8 +349,6 @@ Torrent.prototype._onSwarmListening = function () {
// if full metadata was included in initial torrent id, use it immediately. Otherwise,
// wait for torrent-discovery to find peers and ut_metadata to get the metadata.
if (self.info) self._onMetadata(self)
-
- self.emit('listening', self.client.torrentPort)
}
/**
@@ -368,7 +379,7 @@ Torrent.prototype._onMetadata = function (metadata) {
self.addWebSeed(url)
})
- self.rarityMap = new RarityMap(self.swarm, self.pieces.length)
+ self._rarityMap = new RarityMap(self)
self.store = new ImmediateChunkStore(
new self._store(self.pieceLength, {
@@ -403,7 +414,7 @@ Torrent.prototype._onMetadata = function (metadata) {
self.bitfield = new BitField(self.pieces.length)
- self.swarm.wires.forEach(function (wire) {
+ self.wires.forEach(function (wire) {
// If we didn't have the metadata at the time ut_metadata was initialized for this
// wire, we still want to make it available to the peer in case they request it.
if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata)
@@ -492,7 +503,7 @@ Torrent.prototype._markVerified = function (index) {
}
/**
- * Called when the metadata, swarm, and underlying chunk store is initialized.
+ * Called when the metadata, listening server, and underlying chunk store is initialized.
*/
Torrent.prototype._onStore = function () {
var self = this
@@ -525,36 +536,46 @@ Torrent.prototype.destroy = function (cb) {
clearInterval(self._rechokeIntervalId)
- var tasks = []
+ if (self._rarityMap) {
+ self._rarityMap.destroy()
+ }
+
+ for (var id in self._peers) {
+ self.removePeer(id)
+ }
- self._servers.forEach(function (server) {
- tasks.push(function (cb) { server.destroy(cb) })
+ var tasks = self._servers.map(function (server) {
+ return function (cb) {
+ server.destroy(cb)
+ }
})
- if (self.swarm) tasks.push(function (cb) { self.swarm.destroy(cb) })
- if (self.discovery) tasks.push(function (cb) { self.discovery.destroy(cb) })
- if (self.store) tasks.push(function (cb) { self.store.close(cb) })
+ if (self.discovery) {
+ tasks.push(function (cb) {
+ self.discovery.destroy(cb)
+ })
+ }
+ if (self.store) {
+ tasks.push(function (cb) {
+ self.store.close(cb)
+ })
+ }
parallel(tasks, cb)
}
/**
- * Add a peer to the swarm
- * @param {string|SimplePeer} peer
+ * Add a peer to the torrent swarm
+ * @param {string|simple-peer} peer "ip:port" string or simple-peer instance
+ * @param {string} peer.id bittorrent peer id (when `peer` is simple-peer)
* @return {boolean} true if peer was added, false if peer was blocked
*/
Torrent.prototype.addPeer = function (peer) {
var self = this
if (self.destroyed) throw new Error('torrent is destroyed')
-
- function addPeer () {
- var wasAdded = self.swarm.addPeer(peer)
- if (wasAdded) {
- self.emit('peer', peer)
- } else {
- self.emit('invalidPeer', peer)
- }
- }
+ if (!self.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event')
+ console.log('addPeer:', peer)
+ console.log(self.infoHash)
if (self.client.blocked) {
var host
@@ -563,6 +584,7 @@ Torrent.prototype.addPeer = function (peer) {
try {
parts = addrToIPPort(peer)
} catch (e) {
+ self._debug('ignoring peer: invalid %s', peer)
self.emit('invalidPeer', peer)
return false
}
@@ -572,25 +594,143 @@ Torrent.prototype.addPeer = function (peer) {
}
if (host && self.client.blocked.contains(host)) {
- self.numBlockedPeers += 1 // TODO: remove this. less api surface area
+ self._debug('ignoring peer: blocked %s', peer)
self.emit('blockedPeer', peer)
return false
}
}
- if (self.swarm) addPeer()
- else self.once('listening', addPeer)
- return true
+ var wasAdded = !!self._addPeer(peer)
+ if (wasAdded) {
+ self.emit('peer', peer)
+ } else {
+ self.emit('invalidPeer', peer)
+ }
+ return wasAdded
+}
+
+Torrent.prototype._addPeer = function (peer) {
+ var self = this
+ if (self.destroyed) {
+ if (typeof peer === 'string') {
+ self._debug('ignoring peer: torrent is destroyed')
+ } else {
+ peer.destroy(new Error('torrent is destroyed'))
+ }
+ return null
+ }
+ if (typeof peer === 'string' && !self._validAddr(peer)) {
+ self._debug('ignoring peer: invalid %s', peer)
+ return null
+ }
+
+ var id = (peer && peer.id) || peer
+ if (self._peers[id]) {
+ if (typeof peer === 'string') {
+ self._debug('ignoring peer: duplicate (%s)', id)
+ } else {
+ peer.destroy(new Error('duplicate peer ' + id))
+ }
+ return null
+ }
+
+ if (self.paused) {
+ if (typeof peer === 'string') {
+ self._debug('ignoring peer: torrent is paused')
+ } else {
+ peer.destroy(new Error('torrent is paused'))
+ }
+ return null
+ }
+
+ self._debug('add peer %s', id)
+
+ var newPeer
+ if (typeof peer === 'string') {
+ // `peer` is an addr ("ip:port" string)
+ newPeer = Peer.createTCPOutgoingPeer(peer, self)
+ } else {
+ // `peer` is a WebRTC connection (simple-peer)
+ newPeer = Peer.createWebRTCPeer(peer, self)
+ }
+
+ self._peers[newPeer.id] = newPeer
+ self._peersLength += 1
+
+ if (typeof peer === 'string') {
+ // `peer` is an addr ("ip:port" string)
+ self._queue.push(newPeer)
+ self._drain()
+ }
+
+ return newPeer
}
/**
- * Add a web seed to the swarm
+ * Add a web seed to the torrent swarm.
* @param {string} url web seed url
+ * @param {Object} parsedTorrent
*/
Torrent.prototype.addWebSeed = function (url) {
if (this.destroyed) throw new Error('torrent is destroyed')
+
+ if (!/^https?:\/\/.+/.test(url)) {
+ this._debug('ignoring invalid web seed %s', url)
+ this.emit('invalidPeer', url)
+ return
+ }
+
+ if (this._peers[url]) {
+ this._debug('ignoring duplicate web seed %s', url)
+ this.emit('invalidPeer', url)
+ return
+ }
+
this._debug('add web seed %s', url)
- this.swarm.addWebSeed(url, this)
+
+ var newPeer = Peer.createWebSeedPeer(url, this)
+ this._peers[newPeer.id] = newPeer
+ this._peersLength += 1
+
+ this.emit('peer', url)
+}
+
+/**
+ * Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a
+ * peer that has already sent a handshake.
+ * @param {Peer} peer
+ */
+Torrent.prototype._addIncomingPeer = function (peer) {
+ var self = this
+ if (self.destroyed) return peer.destroy(new Error('torrent is destroyed'))
+ if (self.paused) return peer.destroy(new Error('torrent is paused'))
+
+ this._debug('add incoming peer %s', peer.id)
+
+ self._peers[peer.id] = peer
+ self._peersLength += 1
+}
+
+/**
+ * Remove a peer from the torrent swarm.
+ * @param {string} peer "ip:port" string, peerId string, or simple-peer instance
+ */
+Torrent.prototype.removePeer = function (peer) {
+ var self = this
+ var id = (peer && peer.id) || peer
+ peer = self._peers[id]
+
+ if (!peer) return
+
+ this._debug('removePeer %s', id)
+
+ delete self._peers[id]
+ self._peersLength -= 1
+
+ peer.destroy()
+
+ // If torrent swarm was at capacity before, try to open a new connection now
+ self._drain()
}
/**
@@ -675,6 +815,26 @@ Torrent.prototype._onWire = function (wire, addr) {
var self = this
self._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown')
+ wire.on('download', function (downloaded) {
+ if (self.destroyed) return
+ self.received += downloaded
+ self._downloadSpeed(downloaded)
+ self.client._downloadSpeed(downloaded)
+ self.emit('download', downloaded)
+ self.client.emit('download', downloaded)
+ })
+
+ wire.on('upload', function (uploaded) {
+ if (self.destroyed) return
+ self.uploaded += uploaded
+ self._uploadSpeed(uploaded)
+ self.client._uploadSpeed(uploaded)
+ self.emit('upload', uploaded)
+ self.client.emit('upload', uploaded)
+ })
+
+ self.wires.push(wire)
+
if (addr) {
// Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers
var parts = addrToIPPort(addr)
@@ -739,12 +899,12 @@ Torrent.prototype._onWire = function (wire, addr) {
})
wire.ut_pex.on('dropped', function (peer) {
- // the remote peer believes a given peer has been dropped from the swarm.
- // if we're not currently connected to it, then remove it from the swarm's queue.
- var peerObj = self.swarm._peers[peer]
+ // the remote peer believes a given peer has been dropped from the torrent swarm.
+ // if we're not currently connected to it, then remove it from the queue.
+ var peerObj = self._peers[peer]
if (peerObj && !peerObj.connected) {
self._debug('ut_pex: dropped peer: %s (from %s)', peer, addr)
- self.swarm.removePeer(peer)
+ self.removePeer(peer)
}
})
@@ -760,8 +920,8 @@ Torrent.prototype._onWire = function (wire, addr) {
if (self.metadata) {
process.nextTick(function () {
- // nextTick allows wire.handshake() to be called by `bittorrent-swarm`
- // first, before we send any other messages on the wire
+ // This allows wire.handshake() to be called (by Peer.onHandshake) before any
+ // messages get sent on the wire
self._onWireWithMetadata(wire)
})
}
@@ -774,7 +934,7 @@ Torrent.prototype._onWireWithMetadata = function (wire) {
function onChokeTimeout () {
if (self.destroyed || wire.destroyed) return
- if (self.swarm.numQueued > 2 * (self.swarm.numConns - self.swarm.numPeers) &&
+ if (self._numQueued > 2 * (self._numConns - self.numPeers) &&
wire.amInterested) {
wire.destroy()
} else {
@@ -897,7 +1057,7 @@ Torrent.prototype._updateInterest = function () {
var prev = self._amInterested
self._amInterested = !!self._selections.length
- self.swarm.wires.forEach(function (wire) {
+ self.wires.forEach(function (wire) {
// TODO: only call wire.interested if the wire has at least one piece we need
if (self._amInterested) wire.interested()
else wire.uninterested()
@@ -916,7 +1076,7 @@ Torrent.prototype._update = function () {
if (self.destroyed) return
// update wires in random order for better request distribution
- var ite = randomIterate(self.swarm.wires)
+ var ite = randomIterate(self.wires)
var wire
while ((wire = ite())) {
self._updateWire(wire)
@@ -961,7 +1121,7 @@ Torrent.prototype._updateWire = function (wire) {
var filter = genPieceFilterFunc(start, end, tried)
while (tries < len) {
- piece = self.rarityMap.getRarestPiece(filter)
+ piece = self._rarityMap.getRarestPiece(filter)
if (piece < 0) break
if (self._request(wire, piece, false)) return
tried[piece] = true
@@ -992,8 +1152,8 @@ Torrent.prototype._updateWire = function (wire) {
var missing = self.pieces[index].missing
- for (; ptr < self.swarm.wires.length; ptr++) {
- var otherWire = self.swarm.wires[ptr]
+ for (; ptr < self.wires.length; ptr++) {
+ var otherWire = self.wires[ptr]
var otherSpeed = otherWire.downloadSpeed()
if (otherSpeed < SPEED_THRESHOLD) continue
@@ -1036,7 +1196,7 @@ Torrent.prototype._updateWire = function (wire) {
var filter = genPieceFilterFunc(start, end, tried, rank)
while (tries < len) {
- piece = self.rarityMap.getRarestPiece(filter)
+ piece = self._rarityMap.getRarestPiece(filter)
if (piece < 0) break
// request all non-reserved blocks in this piece
@@ -1083,7 +1243,7 @@ Torrent.prototype._rechoke = function () {
var peers = []
- self.swarm.wires.forEach(function (wire) {
+ self.wires.forEach(function (wire) {
if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) {
peers.push({
wire: wire,
@@ -1272,7 +1432,7 @@ Torrent.prototype._request = function (wire, index, hotswap) {
self.store.put(index, buf)
- self.swarm.wires.forEach(function (wire) {
+ self.wires.forEach(function (wire) {
wire.have(index)
})
@@ -1361,21 +1521,35 @@ Torrent.prototype.createServer = function (opts) {
return server
}
+/**
+ * Temporarily stop connecting to new peers. Note that this does not pause the streams
+ * of existing connections or their wires.
+ */
Torrent.prototype.pause = function () {
if (this.destroyed) return
- this.swarm.pause()
+ this._debug('pause')
+ this.paused = true
}
+/**
+ * Resume connecting to new peers.
+ */
Torrent.prototype.resume = function () {
if (this.destroyed) return
- this.swarm.resume()
+ this._debug('resume')
+ this.paused = false
+ this._drain()
}
Torrent.prototype._onError = function (err) {
var self = this
self._debug('torrent error: %s', err.message || err)
self.destroy()
- self.emit('error', err)
+ if (self.listenerCount('error') === 0) {
+ self.client.emit('error', err)
+ } else {
+ self.emit('error', err)
+ }
}
Torrent.prototype._debug = function () {
@@ -1400,3 +1574,81 @@ function randomInt (high) {
}
function noop () {}
+
+/**
+ * Pop a peer off the FIFO queue and connect to it. When _drain() gets called,
+ * the queue will usually have only one peer in it, except when there are too
+ * many peers (over `this.maxConns`) in which case they will just sit in the
+ * queue until another connection closes.
+ */
+Torrent.prototype._drain = function () {
+ var self = this
+ this._debug('_drain numConns %s maxConns %s', self._numConns, self.client.maxConns)
+ if (typeof net.connect !== 'function' || self.destroyed || self.paused ||
+ self._numConns >= self.client.maxConns) {
+ return
+ }
+ this._debug('drain (%s queued, %s/%s peers)', self._numQueued, self.numPeers, self.client.maxConns)
+
+ var peer = self._queue.shift()
+ if (!peer) return // queue could be empty
+
+ this._debug('tcp connect attempt to %s', peer.addr)
+
+ var parts = addrToIPPort(peer.addr)
+ var opts = {
+ host: parts[0],
+ port: parts[1]
+ }
+
+ var conn = peer.conn = net.connect(opts)
+
+ conn.once('connect', function () { peer.onConnect() })
+ conn.once('error', function (err) { peer.destroy(err) })
+ peer.startConnectTimeout()
+
+ // When connection closes, attempt reconnect after timeout (with exponential backoff)
+ conn.on('close', function () {
+ if (self.destroyed) return
+
+ // TODO: If torrent is done, do not try to reconnect after a timeout
+
+ if (peer.retries >= RECONNECT_WAIT.length) {
+ this._debug(
+ 'conn %s closed: will not re-add (max %s attempts)',
+ peer.addr, RECONNECT_WAIT.length
+ )
+ return
+ }
+
+ var ms = RECONNECT_WAIT[peer.retries]
+ this._debug(
+ 'conn %s closed: will re-add to queue in %sms (attempt %s)',
+ peer.addr, ms, peer.retries + 1
+ )
+
+ var reconnectTimeout = setTimeout(function reconnectTimeout () {
+ var newPeer = self._addPeer(peer.addr)
+ if (newPeer) newPeer.retries = peer.retries + 1
+ }, ms)
+ if (reconnectTimeout.unref) reconnectTimeout.unref()
+ })
+}
+
+/**
+ * Returns `true` if string is valid IPv4/6 address.
+ * @param {string} addr
+ * @return {boolean}
+ */
+Torrent.prototype._validAddr = function (addr) {
+ var parts
+ try {
+ parts = addrToIPPort(addr)
+ } catch (e) {
+ return false
+ }
+ var host = parts[0]
+ var port = parts[1]
+ return port > 0 && port < 65535 &&
+ !(host === '127.0.0.1' && port === this.client.torrentPort)
+}
diff --git a/lib/webconn.js b/lib/webconn.js
index ca6760f..945edd4 100644
--- a/lib/webconn.js
+++ b/lib/webconn.js
@@ -1,3 +1,6 @@
+// TODO: cleanup events
+// TODO: cleanup reference to parsedTorrent (i.e. Torrent object)
+
module.exports = WebConn
var BitField = require('bitfield')