diff options
author | Feross Aboukhadijeh <feross@feross.org> | 2016-04-21 09:10:32 +0300 |
---|---|---|
committer | Feross Aboukhadijeh <feross@feross.org> | 2016-04-21 09:10:32 +0300 |
commit | 3daee2c66cbf752b9e6e49b99492b8c1914a4a58 (patch) | |
tree | fbdd6c10f6a64674268dc77d1b3ccb4a37c44a65 /lib | |
parent | 7a7c4a8b8c49f5c92b7c20ff439bc8614f7d607e (diff) |
BREAKING: Major cleanup
### Added
- `client.listening` property to signal whether TCP server is listening
for incoming
connections.
### Changed
- Merged `Swarm` class into `Torrent` object. Properties on
`torrent.swarm` (like
`torrent.swarm.wires`) now exist on `torrent` (e.g. `torrent.wires`).
- `torrent.addPeer` can no longer be called before the `infoHash` event
has been
emitted.
- Remove `torrent.on('listening')` event. Use `client.on('listening')`
instead.
- Remove support from `TCPPool` for listening on multiple ports. This
was not used by
WebTorrent and just added complexity. There is now a single `TCPPool`
instance for the
whole WebTorrent client.
- Deprecate: Do not use `client.download()` anymore. Use `client.add()`
instead.
- Deprecate: Do not use `torrent.swarm` anymore. Use `torrent` instead.
### Fixed
- When there is a `torrent.on('error')` listener, don't also emit
`client.on('error')`.
- Do not return existing torrent object when duplicate torrent is
added. Fire an
`'error'` event instead.
- Memory leak of `Torrent` object caused by `RarityMap`
- Memory leak of `Torrent` object caused by `TCPPool`
- `client.ratio` and `torrent.ratio` are now calculated as `uploaded /
received` instead
of `uploaded / downloaded`.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/peer.js | 34 | ||||
-rw-r--r-- | lib/rarity-map.js | 127 | ||||
-rw-r--r-- | lib/swarm.js | 413 | ||||
-rw-r--r-- | lib/tcp-pool.js | 190 | ||||
-rw-r--r-- | lib/torrent.js | 452 | ||||
-rw-r--r-- | lib/webconn.js | 3 |
6 files changed, 491 insertions, 728 deletions
diff --git a/lib/peer.js b/lib/peer.js index 06786c5..fcd296d 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -60,10 +60,10 @@ exports.createTCPOutgoingPeer = function (addr, swarm) { /** * Peer that represents a Web Seed (BEP17 / BEP19). */ -exports.createWebSeedPeer = function (url, parsedTorrent, swarm) { +exports.createWebSeedPeer = function (url, swarm) { var peer = new Peer(url, 'webSeed') peer.swarm = swarm - peer.conn = new WebConn(url, parsedTorrent) + peer.conn = new WebConn(url, swarm) peer.onConnect() @@ -71,7 +71,7 @@ exports.createWebSeedPeer = function (url, parsedTorrent, swarm) { } /** - * Peer. Represents a peer in the Swarm. + * Peer. Represents a peer in the torrent swarm. * * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds) * @param {string} type the type of the peer @@ -161,7 +161,7 @@ Peer.prototype.onHandshake = function (infoHash, peerId) { return self.destroy(new Error('unexpected handshake info hash for this swarm')) } if (peerId === self.swarm.peerId) { - return self.destroy(new Error('refusing to handshake with self')) + return self.destroy(new Error('refusing to connect to ourselves')) } debug('Peer %s got handshake %s', self.id, infoHash) @@ -170,27 +170,12 @@ Peer.prototype.onHandshake = function (infoHash, peerId) { self.retries = 0 - self.wire.on('download', function (downloaded) { - if (self.destroyed) return - self.swarm.downloaded += downloaded - self.swarm.downloadSpeed(downloaded) - self.swarm.emit('download', downloaded) - }) - - self.wire.on('upload', function (uploaded) { - if (self.destroyed) return - self.swarm.uploaded += uploaded - self.swarm.uploadSpeed(uploaded) - self.swarm.emit('upload', uploaded) - }) - - self.swarm.wires.push(self.wire) - var addr = self.addr if (!addr && self.conn.remoteAddress) { addr = self.conn.remoteAddress + ':' + self.conn.remotePort } - self.swarm.emit('wire', self.wire, addr) + self.swarm._onWire(self.wire, addr) + // swarm could be destroyed in user's 'wire' event handler if (!self.swarm || self.swarm.destroyed) return @@ -199,7 +184,10 @@ Peer.prototype.onHandshake = function (infoHash, peerId) { Peer.prototype.handshake = function () { var self = this - self.wire.handshake(self.swarm.infoHash, self.swarm.peerId, self.swarm.handshakeOpts) + var opts = { + dht: self.swarm.private ? false : !!self.swarm.client.dht + } + self.wire.handshake(self.swarm.infoHash, self.swarm.client.peerId, opts) self.sentHandshake = true } @@ -236,8 +224,8 @@ Peer.prototype.destroy = function (err) { var conn = self.conn var wire = self.wire - self.conn = null self.swarm = null + self.conn = null self.wire = null if (swarm && wire) { diff --git a/lib/rarity-map.js b/lib/rarity-map.js index 0b1d147..c362077 100644 --- a/lib/rarity-map.js +++ b/lib/rarity-map.js @@ -1,57 +1,34 @@ module.exports = RarityMap /** - * Mapping of torrent pieces to their respective availability in the swarm. Used by - * the torrent manager for implementing the rarest piece first selection strategy. + * Mapping of torrent pieces to their respective availability in the torrent swarm. Used + * by the torrent manager for implementing the rarest piece first selection strategy. * - * @param {Swarm} swarm bittorrent-swarm to track availability - * @param {number} numPieces number of pieces in the torrent + * @param {Torrent} torrent */ -function RarityMap (swarm, numPieces) { +function RarityMap (torrent) { var self = this - self.pieces = [] - self.swarm = swarm - self.numPieces = numPieces - - function initWire (wire) { - wire.on('have', function (index) { - self.pieces[index] += 1 - }) - wire.on('bitfield', function () { - self.recalculate() - }) - wire.on('close', function () { - for (var i = 0; i < self.numPieces; ++i) { - self.pieces[i] -= wire.peerPieces.get(i) - } - }) - } + self._torrent = torrent + self._numPieces = torrent.pieces.length + self._pieces = [] - self.swarm.wires.forEach(initWire) - self.swarm.on('wire', function (wire) { + self._onWire = function (wire) { + self.recalculate() + self._initWire(wire) + } + self._onWireHave = function (index) { + self._pieces[index] += 1 + } + self._onWireBitfield = function () { self.recalculate() - initWire(wire) - }) - - self.recalculate() -} - -/** - * Recalculates piece availability across all peers in the swarm. - */ -RarityMap.prototype.recalculate = function () { - var self = this - - for (var i = 0; i < self.numPieces; ++i) { - self.pieces[i] = 0 } - self.swarm.wires.forEach(function (wire) { - for (var i = 0; i < self.numPieces; ++i) { - self.pieces[i] += wire.peerPieces.get(i) - } + self._torrent.wires.forEach(function (wire) { + self._initWire(wire) }) + self._torrent.on('wire', self._onWire) + self.recalculate() } /** @@ -62,15 +39,15 @@ RarityMap.prototype.recalculate = function () { * @return {number} index of rarest piece, or -1 */ RarityMap.prototype.getRarestPiece = function (pieceFilterFunc) { - var self = this + if (!pieceFilterFunc) pieceFilterFunc = trueFn + var candidates = [] var min = Infinity - pieceFilterFunc = pieceFilterFunc || function () { return true } - for (var i = 0; i < self.numPieces; ++i) { + for (var i = 0; i < this._numPieces; ++i) { if (!pieceFilterFunc(i)) continue - var availability = self.pieces[i] + var availability = this._pieces[i] if (availability === min) { candidates.push(i) } else if (availability < min) { @@ -86,3 +63,61 @@ RarityMap.prototype.getRarestPiece = function (pieceFilterFunc) { return -1 } } + +RarityMap.prototype.destroy = function () { + var self = this + self._torrent.removeListener('wire', self._onWire) + self._torrent.wires.forEach(function (wire) { + self._cleanupWireEvents(wire) + }) + self._torrent = null + self._pieces = null + + self._onWire = null + self._onWireHave = null + self._onWireBitfield = null +} + +RarityMap.prototype._initWire = function (wire) { + var self = this + + wire._onClose = function () { + self._cleanupWireEvents(wire) + for (var i = 0; i < this._numPieces; ++i) { + self._pieces[i] -= wire.peerPieces.get(i) + } + } + + wire.on('have', self._onWireHave) + wire.on('bitfield', self._onWireBitfield) + wire.once('close', wire._onClose) +} + +/** + * Recalculates piece availability across all peers in the torrent. + */ +RarityMap.prototype.recalculate = function () { + var i + for (i = 0; i < this._numPieces; ++i) { + this._pieces[i] = 0 + } + + var numWires = this._torrent.wires.length + for (i = 0; i < numWires; ++i) { + var wire = this._torrent.wires[i] + for (var j = 0; j < this._numPieces; ++j) { + this._pieces[j] += wire.peerPieces.get(j) + } + } +} + +RarityMap.prototype._cleanupWireEvents = function (wire) { + wire.removeListener('have', this._onWireHave) + wire.removeListener('bitfield', this._onWireBitfield) + if (wire._onClose) wire.removeListener('close', wire._onClose) + wire._onClose = null +} + +function trueFn () { + return true +} diff --git a/lib/swarm.js b/lib/swarm.js deleted file mode 100644 index 7edee82..0000000 --- a/lib/swarm.js +++ /dev/null @@ -1,413 +0,0 @@ -module.exports = Swarm - -var addrToIPPort = require('addr-to-ip-port') -var debug = require('debug')('webtorrent:swarm') -var EventEmitter = require('events').EventEmitter -var inherits = require('inherits') -var net = require('net') // browser exclude -var speedometer = require('speedometer') - -var Peer = require('./peer') -var TCPPool = require('./tcp-pool') // browser-exclude - -var MAX_CONNS = 55 -var RECONNECT_WAIT = [ 1000, 5000, 15000 ] - -inherits(Swarm, EventEmitter) - -/** - * BitTorrent Swarm - * - * Abstraction of a BitTorrent "swarm", which is handy for managing all peer - * connections for a given torrent download. This handles connecting to peers, - * listening for incoming connections, and doing the initial peer wire protocol - * handshake with peers. It also tracks total data uploaded/downloaded to/from - * the swarm. - * - * @param {Buffer|string} infoHash - * @param {Buffer|string} peerId - * @param {Object} opts - * @param {Object} opts.handshake handshake options (passed to bittorrent-protocol) - * @param {number} opts.maxConns maximum number of connections in swarm - */ -function Swarm (infoHash, peerId, opts) { - var self = this - if (!(self instanceof Swarm)) return new Swarm(infoHash, peerId, opts) - EventEmitter.call(self) - - self.infoHash = typeof infoHash === 'string' - ? infoHash - : infoHash.toString('hex') - self.infoHashBuffer = new Buffer(self.infoHash, 'hex') - - self.peerId = typeof peerId === 'string' - ? peerId - : peerId.toString('hex') - self.peerIdBuffer = new Buffer(self.peerId, 'hex') - - if (!opts) opts = {} - - debug('new swarm (i %s p %s)', self.infoHash, self.peerId) - - self.handshakeOpts = opts.handshake // handshake extensions (optional) - self.maxConns = Number(opts.maxConns) || MAX_CONNS - - self.destroyed = false - self.listening = false - self.paused = false - - self.server = null // tcp listening socket - self.wires = [] // open wires (added *after* handshake) - - self._queue = [] // queue of outgoing tcp peers to connect to - self._peers = {} // connected peers (addr/peerId -> Peer) - self._peersLength = 0 // number of elements in `self._peers` (cache, for perf) - self._port = 0 // tcp listening port (cache, for perf) - - // track stats - self.downloaded = 0 - self.uploaded = 0 - self.downloadSpeed = speedometer() - self.uploadSpeed = speedometer() -} - -Object.defineProperty(Swarm.prototype, 'ratio', { - get: function () { - var self = this - return (self.uploaded / self.downloaded) || 0 - } -}) - -Object.defineProperty(Swarm.prototype, 'numQueued', { - get: function () { - var self = this - return self._queue.length + (self._peersLength - self.numConns) - } -}) - -Object.defineProperty(Swarm.prototype, 'numConns', { - get: function () { - var self = this - var numConns = 0 - for (var id in self._peers) { - if (self._peers[id].connected) numConns += 1 - } - return numConns - } -}) - -Object.defineProperty(Swarm.prototype, 'numPeers', { - get: function () { - var self = this - return self.wires.length - } -}) - -/** - * Add a peer to the swarm. - * @param {string|simple-peer} peer "ip:port" string or simple-peer instance - * @param {string} peer.id bittorrent peer id (when `peer` is simple-peer) - * @return {boolean} true if peer was added, false if peer was invalid - - */ -Swarm.prototype.addPeer = function (peer) { - var self = this - var newPeer = self._addPeer(peer) - return !!newPeer // don't expose private Peer instance in return value -} - -Swarm.prototype._addPeer = function (peer) { - var self = this - if (self.destroyed) { - debug('ignoring added peer: swarm already destroyed') - if (typeof peer !== 'string') peer.destroy() - return null - } - if (typeof peer === 'string' && !self._validAddr(peer)) { - debug('ignoring added peer: invalid address %s', peer) - return null - } - - var id = (peer && peer.id) || peer - if (self._peers[id]) { - debug('ignoring added peer: duplicate peer id') - if (typeof peer !== 'string') peer.destroy() - return null - } - - if (self.paused) { - debug('ignoring added peer: swarm paused') - if (typeof peer !== 'string') peer.destroy() - return null - } - - debug('addPeer %s', id) - - var newPeer - if (typeof peer === 'string') { - // `peer` is an addr ("ip:port" string) - newPeer = Peer.createTCPOutgoingPeer(peer, self) - } else { - // `peer` is a WebRTC connection (simple-peer) - newPeer = Peer.createWebRTCPeer(peer, self) - } - - self._peers[newPeer.id] = newPeer - self._peersLength += 1 - - if (typeof peer === 'string') { - // `peer` is an addr ("ip:port" string) - self._queue.push(newPeer) - self._drain() - } - - return newPeer -} - -/** - * Add a web seed to the swarm. - * @param {string} url web seed url - * @param {Object} parsedTorrent - */ -Swarm.prototype.addWebSeed = function (url, parsedTorrent) { - var self = this - if (self.destroyed) return - - if (!/^https?:\/\/.+/.test(url)) { - debug('ignoring invalid web seed %s (from swarm.addWebSeed)', url) - return - } - - if (self._peers[url]) return - - debug('addWebSeed %s', url) - - var newPeer = Peer.createWebSeedPeer(url, parsedTorrent, self) - self._peers[newPeer.id] = newPeer - self._peersLength += 1 -} - -/** - * Called whenever a new incoming TCP peer connects to this swarm. Called with a peer - * that has already sent a handshake. - * @param {Peer} peer - */ -Swarm.prototype._addIncomingPeer = function (peer) { - var self = this - if (self.destroyed) return peer.destroy(new Error('swarm already destroyed')) - if (self.paused) return peer.destroy(new Error('swarm paused')) - - if (!self._validAddr(peer.addr)) { - return peer.destroy(new Error('invalid addr ' + peer.addr + ' (from incoming)')) - } - debug('_addIncomingPeer %s', peer.id) - - self._peers[peer.id] = peer - self._peersLength += 1 -} - -/** - * Remove a peer from the swarm. - * @param {string} id for tcp peers, "ip:port" string; for webrtc peers, peerId - */ -Swarm.prototype.removePeer = function (id) { - var self = this - var peer = self._peers[id] - if (!peer) return - - debug('removePeer %s', id) - - delete self._peers[id] - self._peersLength -= 1 - - peer.destroy() - - // If swarm was at capacity before, try to open a new connection now - self._drain() -} - -/** - * Temporarily stop connecting to new peers. Note that this does not pause the streams - * of existing connections or their wires. - */ -Swarm.prototype.pause = function () { - var self = this - if (self.destroyed) return - debug('pause') - self.paused = true -} - -/** - * Resume connecting to new peers. - */ -Swarm.prototype.resume = function () { - var self = this - if (self.destroyed) return - debug('resume') - self.paused = false - self._drain() -} - -/** - * Listen on the given port for peer connections. - * @param {number} port - * @param {string=} hostname - * @param {function=} onlistening - */ -Swarm.prototype.listen = function (port, hostname, onlistening) { - var self = this - if (typeof hostname === 'function') { - onlistening = hostname - hostname = undefined - } - if (self.listening) throw new Error('swarm already listening') - if (onlistening) self.once('listening', onlistening) - - if (typeof TCPPool === 'function') { - self._port = port || TCPPool.getDefaultListenPort(self.infoHash) - self._hostname = hostname - - debug('listen %s', port) - - var pool = TCPPool.addSwarm(self) - self.server = pool.server - } else { - // In browser, listen() is no-op, but still fire 'listening' event so that - // same code works in node and the browser. - process.nextTick(function () { - self._onListening(0) - }) - } -} - -Swarm.prototype._onListening = function (port) { - var self = this - self._port = port - self.listening = true - self.emit('listening') -} - -Swarm.prototype.address = function () { - var self = this - if (!self.listening) return null - return self.server - ? self.server.address() - : { port: 0, family: 'IPv4', address: '127.0.0.1' } -} - -/** - * Destroy the swarm, close all open peer connections, and do cleanup. - * @param {function} onclose - */ -Swarm.prototype.destroy = function (onclose) { - var self = this - if (self.destroyed) return - - self.destroyed = true - self.listening = false - self.paused = false - - if (onclose) self.once('close', onclose) - - debug('destroy') - - for (var id in self._peers) { - self.removePeer(id) - } - - if (typeof TCPPool === 'function') { - TCPPool.removeSwarm(self, function () { - // TODO: only emit when all peers are destroyed - self.emit('close') - }) - } else { - process.nextTick(function () { - self.emit('close') - }) - } -} - -/** - * Pop a peer off the FIFO queue and connect to it. When _drain() gets called, - * the queue will usually have only one peer in it, except when there are too - * many peers (over `this.maxConns`) in which case they will just sit in the - * queue until another connection closes. - */ -Swarm.prototype._drain = function () { - var self = this - debug('_drain numConns %s maxConns %s', self.numConns, self.maxConns) - if (typeof net.connect !== 'function' || self.destroyed || self.paused || - self.numConns >= self.maxConns) { - return - } - debug('drain (%s queued, %s/%s peers)', self.numQueued, self.numPeers, self.maxConns) - - var peer = self._queue.shift() - if (!peer) return // queue could be empty - - debug('tcp connect attempt to %s', peer.addr) - - var parts = addrToIPPort(peer.addr) - var opts = { - host: parts[0], - port: parts[1] - } - if (self._hostname) opts.localAddress = self._hostname - - var conn = peer.conn = net.connect(opts) - - conn.once('connect', function () { peer.onConnect() }) - conn.once('error', function (err) { peer.destroy(err) }) - peer.startConnectTimeout() - - // When connection closes, attempt reconnect after timeout (with exponential backoff) - conn.on('close', function () { - if (self.destroyed) return - - // TODO: If torrent is done, do not try to reconnect after a timeout - - if (peer.retries >= RECONNECT_WAIT.length) { - debug( - 'conn %s closed: will not re-add (max %s attempts)', - peer.addr, RECONNECT_WAIT.length - ) - return - } - - var ms = RECONNECT_WAIT[peer.retries] - debug( - 'conn %s closed: will re-add to queue in %sms (attempt %s)', - peer.addr, ms, peer.retries + 1 - ) - - var reconnectTimeout = setTimeout(function reconnectTimeout () { - var newPeer = self._addPeer(peer.addr) - if (newPeer) newPeer.retries = peer.retries + 1 - }, ms) - if (reconnectTimeout.unref) reconnectTimeout.unref() - }) -} - -Swarm.prototype._onError = function (err) { - var self = this - self.emit('error', err) - self.destroy() -} - -/** - * Returns `true` if string is valid IPv4/6 address, and is not the address of this swarm. - * @param {string} addr - * @return {boolean} - */ -Swarm.prototype._validAddr = function (addr) { - var self = this - var parts - try { - parts = addrToIPPort(addr) - } catch (e) { - return false - } - var host = parts[0] - var port = parts[1] - return port > 0 && port < 65535 && !(host === '127.0.0.1' && port === self._port) -} diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js index 7e95f6c..5e15d0a 100644 --- a/lib/tcp-pool.js +++ b/lib/tcp-pool.js @@ -7,12 +7,6 @@ var net = require('net') // browser exclude var Peer = require('./peer') /** - * Shared TCP pools; shared among all swarms - * @type {Object} port: number -> pool: TCPPool - */ -var tcpPools = {} - -/** * TCPPool * * A "TCP pool" allows multiple swarms to listen on the same TCP port and determines @@ -20,107 +14,35 @@ var tcpPools = {} * handshake that the remote peer sends. * * @param {number} port - * @param {string} hostname */ -function TCPPool (port, hostname) { +function TCPPool (client) { var self = this - - self.port = port - self.listening = false - self.swarms = {} // infoHash (hex) -> Swarm - - debug('new TCPPool (port: %s, hostname: %s)', port, hostname) - - // Save incoming conns so they can be destroyed if server is closed before the conn is - // passed off to a Swarm. - self.pendingConns = [] + debug('create tcp pool (port %s)', client.torrentPort) self.server = net.createServer() - self.server.on('connection', function (conn) { self._onConnection(conn) }) - self.server.on('error', function (err) { self._onError(err) }) - self.server.on('listening', function () { self._onListening() }) - self.server.listen(self.port, hostname) -} - -/** - * STATIC METHOD - * Add a swarm to a pool, creating a new pool if necessary. - * @param {Swarm} swarm - */ -TCPPool.addSwarm = function (swarm) { - var pool = tcpPools[swarm._port] - if (!pool) pool = tcpPools[swarm._port] = new TCPPool(swarm._port, swarm._hostname) - pool.addSwarm(swarm) - return pool -} - -/** - * STATIC METHOD - * Remove a swarm from its pool. - * @param {Swarm} swarm - */ -TCPPool.removeSwarm = function (swarm, cb) { - var pool = tcpPools[swarm._port] - if (!pool) return cb() - pool.removeSwarm(swarm) + self._client = client - if (Object.keys(pool.swarms).length === 0) pool.destroy(cb) - else process.nextTick(cb) -} + // Temporarily store incoming connections so they can be destroyed if the server is + // closed before the connection is passed off to a Torrent. + self._pendingConns = [] -/** - * STATIC METHOD - * When `Swarm.prototype.listen` is called without specifying a port, a reasonable - * default port must be chosen. If there already exists an active TCP pool, then return - * that pool's port so that TCP server can be re-used. Otherwise, return 0 so node will - * pick a free port. - * - * @return {number} port - */ -TCPPool.getDefaultListenPort = function (infoHash) { - for (var port in tcpPools) { - var pool = tcpPools[port] - if (!pool.swarms[infoHash]) return pool.port + self._onConnectionBound = function (conn) { + self._onConnection(conn) } - return 0 -} - -/** - * Add a swarm to this TCP pool. - * @param {Swarm} swarm - */ -TCPPool.prototype.addSwarm = function (swarm) { - var self = this - if (self.swarms[swarm.infoHash]) { - process.nextTick(function () { - swarm._onError(new Error( - 'There is already a swarm with info hash ' + swarm.infoHash + ' ' + - 'listening on port ' + swarm._port - )) - }) - return + self._onListening = function () { + self._client._onListening() } - self.swarms[swarm.infoHash] = swarm - - if (self.listening) { - process.nextTick(function () { - swarm._onListening(self.port) - }) + self._onError = function (err) { + self._client._destroy(err) } - debug('add swarm %s to tcp pool %s', swarm.infoHash, self.port) -} + self.server.on('connection', self._onConnectionBound) + self.server.on('listening', self._onListening) + self.server.on('error', self._onError) -/** - * Remove a swarm from this TCP pool. - * @param {Swarm} swarm - */ -TCPPool.prototype.removeSwarm = function (swarm) { - var self = this - debug('remove swarm %s from tcp pool %s', swarm.infoHash, self.port) - delete self.swarms[swarm.infoHash] + self.server.listen(client.torrentPort) } /** @@ -129,47 +51,28 @@ TCPPool.prototype.removeSwarm = function (swarm) { */ TCPPool.prototype.destroy = function (cb) { var self = this - debug('destroy tcp pool %s', self.port) + debug('destroy tcp pool') - self.listening = false + self.server.removeListener('connection', self._onConnectionBound) + self.server.removeListener('listening', self._onListening) + self.server.removeListener('error', self._onError) // Destroy all open connection objects so server can close gracefully without waiting // for connection timeout or remote peer to disconnect. - self.pendingConns.forEach(function (conn) { + self._pendingConns.forEach(function (conn) { + conn.on('error', noop) conn.destroy() }) - delete tcpPools[self.port] - try { self.server.close(cb) } catch (err) { if (cb) process.nextTick(cb) } -} - -TCPPool.prototype._onListening = function () { - var self = this - - // Fix for Docker Node image. Sometimes server.address() returns `null`. - // See issue: https://github.com/feross/bittorrent-swarm/pull/18 - var address = self.server.address() || { port: 0 } - var port = address.port - - debug('tcp pool listening on %s', port) - if (port !== self.port) { - // `port` was 0 when `listen` was called; update to the port that node selected - delete tcpPools[self.port] - self.port = port - tcpPools[self.port] = self - } - - self.listening = true - - for (var infoHash in self.swarms) { - self.swarms[infoHash]._onListening(self.port) - } + self.server = null + self._client = null + self._pendingConns = null } /** @@ -189,39 +92,34 @@ TCPPool.prototype._onConnection = function (conn) { return } - self.pendingConns.push(conn) - conn.once('close', removePendingConn) - - function removePendingConn () { - arrayRemove(self.pendingConns, self.pendingConns.indexOf(conn)) - } + self._pendingConns.push(conn) + conn.once('close', cleanupPending) var peer = Peer.createTCPIncomingPeer(conn) - peer.wire.once('handshake', function (infoHash, peerId) { - removePendingConn() - conn.removeListener('close', removePendingConn) + var wire = peer.wire + wire.once('handshake', onHandshake) - var swarm = self.swarms[infoHash] - if (swarm) { - peer.swarm = swarm - swarm._addIncomingPeer(peer) + function onHandshake (infoHash, peerId) { + cleanupPending() + + var torrent = self._client.get(infoHash) + if (torrent) { + peer.swarm = torrent + torrent._addIncomingPeer(peer) peer.onHandshake(infoHash, peerId) } else { - var err = new Error('Unexpected info hash ' + infoHash + ' from incoming peer ' + - peer.id + ': destroying peer') + var err = new Error( + 'Unexpected info hash ' + infoHash + ' from incoming peer ' + peer.id + ) peer.destroy(err) } - }) -} + } -TCPPool.prototype._onError = function (err) { - var self = this - self.destroy() - for (var infoHash in self.swarms) { - var swarm = self.swarms[infoHash] - self.removeSwarm(swarm) - swarm._onError(err) + function cleanupPending () { + conn.removeListener('close', cleanupPending) + wire.removeListener('handshake', onHandshake) + arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn)) } } diff --git a/lib/torrent.js b/lib/torrent.js index 7681da0..a5dc4c2 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -1,3 +1,7 @@ +// TODO: remove _onError, add _destroy(err, cb) +// TODO: cleanup event listeners +// TODO: Remove all inline docs, and move to docs/api.md + /* global URL, Blob */ module.exports = Torrent @@ -15,6 +19,7 @@ var FSChunkStore = require('fs-chunk-store') // browser: `memory-chunk-store` var ImmediateChunkStore = require('immediate-chunk-store') var inherits = require('inherits') var MultiStream = require('multistream') +var net = require('net') // browser exclude var os = require('os') // browser exclude var parallel = require('run-parallel') var parallelLimit = require('run-parallel-limit') @@ -25,14 +30,15 @@ var Piece = require('torrent-piece') var pump = require('pump') var randomIterate = require('random-iterate') var sha1 = require('simple-sha1') +var speedometer = require('speedometer') var uniq = require('uniq') var ut_metadata = require('ut_metadata') var ut_pex = require('ut_pex') // browser exclude var File = require('./file') +var Peer = require('./peer') var RarityMap = require('./rarity-map') var Server = require('./server') // browser exclude -var Swarm = require('./swarm') var MAX_BLOCK_LENGTH = 128 * 1024 var PIECE_TIMEOUT = 30000 @@ -47,6 +53,8 @@ var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds var FILESYSTEM_CONCURRENCY = 2 +var RECONNECT_WAIT = [ 1000, 5000, 15000 ] + var TMP = typeof pathExists.sync === 'function' ? path.join(pathExists.sync('/tmp') ? '/tmp' : os.tmpDir(), 'webtorrent') : '/tmp/webtorrent' @@ -86,20 +94,34 @@ function Torrent (torrentId, client, opts) { this.ready = false this.destroyed = false + this.paused = false + this.done = false + this.metadata = null this.store = null - this.numBlockedPeers = 0 this.files = null - this.done = false this._amInterested = false this.pieces = [] this._selections = [] this._critical = [] + this.wires = [] // open wires (added *after* handshake) + + this._queue = [] // queue of outgoing tcp peers to connect to + this._peers = {} // connected peers (addr/peerId -> Peer) + this._peersLength = 0 // number of elements in `this._peers` (cache, for perf) + + // stats + this.received = 0 + this.uploaded = 0 + this._downloadSpeed = speedometer() + this._uploadSpeed = speedometer() + // for cleanup this._servers = [] + // TODO: remove this and expose a hook instead // optimization: don't recheck every file if it hasn't changed this._fileModtimes = opts.fileModtimes @@ -130,14 +152,6 @@ Object.defineProperty(Torrent.prototype, 'downloaded', { } }) -Object.defineProperty(Torrent.prototype, 'received', { - get: function () { return this.swarm ? this.swarm.downloaded : 0 } -}) - -Object.defineProperty(Torrent.prototype, 'uploaded', { - get: function () { return this.swarm ? this.swarm.uploaded : 0 } -}) - // The number of missing pieces. Used to implement 'end game' mode. // Object.defineProperty(Storage.prototype, 'numMissing', { // get: function () { @@ -151,11 +165,11 @@ Object.defineProperty(Torrent.prototype, 'uploaded', { // }) Object.defineProperty(Torrent.prototype, 'downloadSpeed', { - get: function () { return this.swarm ? this.swarm.downloadSpeed() : 0 } + get: function () { return this._downloadSpeed() } }) Object.defineProperty(Torrent.prototype, 'uploadSpeed', { - get: function () { return this.swarm ? this.swarm.uploadSpeed() : 0 } + get: function () { return this._uploadSpeed() } }) Object.defineProperty(Torrent.prototype, 'progress', { @@ -163,14 +177,14 @@ Object.defineProperty(Torrent.prototype, 'progress', { }) Object.defineProperty(Torrent.prototype, 'ratio', { - get: function () { return this.uploaded / (this.downloaded || 1) } + get: function () { return this.uploaded / (this.received || 1) } }) Object.defineProperty(Torrent.prototype, 'numPeers', { - get: function () { return this.swarm ? this.swarm.numPeers : 0 } + get: function () { return this.wires.length } }) -// TODO: remove this +// TODO: remove this (and file.getBlobURL?) // Torrent file as a blob url Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', { get: function () { @@ -182,13 +196,36 @@ Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', { } }) +Object.defineProperty(Torrent.prototype, '_numQueued', { + get: function () { + return this._queue.length + (this._peersLength - this._numConns) + } +}) + +Object.defineProperty(Torrent.prototype, '_numConns', { + get: function () { + var self = this + var numConns = 0 + for (var id in self._peers) { + if (self._peers[id].connected) numConns += 1 + } + return numConns + } +}) + +// TODO: remove in v2 +Object.defineProperty(Torrent.prototype, 'swarm', { + get: function () { + console.log('WebTorrent: `torrent.swarm` is deprecated. Use `torrent` directly instead.') + } +}) + Torrent.prototype._onTorrentId = function (torrentId) { var self = this if (self.destroyed) return var parsedTorrent try { parsedTorrent = parseTorrent(torrentId) } catch (err) {} - if (parsedTorrent) { // Attempt to set infoHash property synchronously self.infoHash = parsedTorrent.infoHash @@ -210,6 +247,7 @@ Torrent.prototype._onTorrentId = function (torrentId) { Torrent.prototype._onParsedTorrent = function (parsedTorrent) { var self = this if (self.destroyed) return + console.log('on parsed torrent') self._processParsedTorrent(parsedTorrent) @@ -219,44 +257,20 @@ Torrent.prototype._onParsedTorrent = function (parsedTorrent) { if (!self.path) self.path = path.join(TMP, self.infoHash) - // create swarm - self.swarm = new Swarm(self.infoHash, self.client.peerId, { - handshake: { - dht: self.private ? false : !!self.client.dht - }, - maxConns: self.client.maxConns - }) - self.swarm.on('error', function (err) { - self._onError(err) - }) - self.swarm.on('wire', function (wire, addr) { - self._onWire(wire, addr) - }) - - self.swarm.on('download', function (downloaded) { - self.client._downloadSpeed(downloaded) // update overall client stats - self.client.emit('download', downloaded) - self.emit('download', downloaded) - }) - - self.swarm.on('upload', function (uploaded) { - self.client._uploadSpeed(uploaded) // update overall client stats - self.client.emit('upload', uploaded) - self.emit('upload', uploaded) - }) - - // listen for peers (note: in the browser, this is a no-op and callback is called on - // next tick) - self.swarm.listen(self.client.torrentPort, function () { - self._onSwarmListening() - }) - self._rechokeIntervalId = setInterval(function () { self._rechoke() }, RECHOKE_INTERVAL) if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref() self.emit('infoHash', self.infoHash) + + if (self.client.listening) { + self._onListening() + } else { + self.client.once('listening', function () { + self._onListening() + }) + } } Torrent.prototype._processParsedTorrent = function (parsedTorrent) { @@ -282,14 +296,13 @@ Torrent.prototype._processParsedTorrent = function (parsedTorrent) { this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent) this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent) + console.log('process done') } -Torrent.prototype._onSwarmListening = function () { +Torrent.prototype._onListening = function () { var self = this - if (self.destroyed) return - - if (self.swarm.server) self.client.torrentPort = self.swarm.address().port - + if (self.discovery || self.destroyed) return + console.log('on listening') var trackerOpts = { rtcConfig: self.client._rtcConfig, wrtc: self.client._wrtc, @@ -336,8 +349,6 @@ Torrent.prototype._onSwarmListening = function () { // if full metadata was included in initial torrent id, use it immediately. Otherwise, // wait for torrent-discovery to find peers and ut_metadata to get the metadata. if (self.info) self._onMetadata(self) - - self.emit('listening', self.client.torrentPort) } /** @@ -368,7 +379,7 @@ Torrent.prototype._onMetadata = function (metadata) { self.addWebSeed(url) }) - self.rarityMap = new RarityMap(self.swarm, self.pieces.length) + self._rarityMap = new RarityMap(self) self.store = new ImmediateChunkStore( new self._store(self.pieceLength, { @@ -403,7 +414,7 @@ Torrent.prototype._onMetadata = function (metadata) { self.bitfield = new BitField(self.pieces.length) - self.swarm.wires.forEach(function (wire) { + self.wires.forEach(function (wire) { // If we didn't have the metadata at the time ut_metadata was initialized for this // wire, we still want to make it available to the peer in case they request it. if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata) @@ -492,7 +503,7 @@ Torrent.prototype._markVerified = function (index) { } /** - * Called when the metadata, swarm, and underlying chunk store is initialized. + * Called when the metadata, listening server, and underlying chunk store is initialized. */ Torrent.prototype._onStore = function () { var self = this @@ -525,36 +536,46 @@ Torrent.prototype.destroy = function (cb) { clearInterval(self._rechokeIntervalId) - var tasks = [] + if (self._rarityMap) { + self._rarityMap.destroy() + } + + for (var id in self._peers) { + self.removePeer(id) + } - self._servers.forEach(function (server) { - tasks.push(function (cb) { server.destroy(cb) }) + var tasks = self._servers.map(function (server) { + return function (cb) { + server.destroy(cb) + } }) - if (self.swarm) tasks.push(function (cb) { self.swarm.destroy(cb) }) - if (self.discovery) tasks.push(function (cb) { self.discovery.destroy(cb) }) - if (self.store) tasks.push(function (cb) { self.store.close(cb) }) + if (self.discovery) { + tasks.push(function (cb) { + self.discovery.destroy(cb) + }) + } + if (self.store) { + tasks.push(function (cb) { + self.store.close(cb) + }) + } parallel(tasks, cb) } /** - * Add a peer to the swarm - * @param {string|SimplePeer} peer + * Add a peer to the torrent swarm + * @param {string|simple-peer} peer "ip:port" string or simple-peer instance + * @param {string} peer.id bittorrent peer id (when `peer` is simple-peer) * @return {boolean} true if peer was added, false if peer was blocked */ Torrent.prototype.addPeer = function (peer) { var self = this if (self.destroyed) throw new Error('torrent is destroyed') - - function addPeer () { - var wasAdded = self.swarm.addPeer(peer) - if (wasAdded) { - self.emit('peer', peer) - } else { - self.emit('invalidPeer', peer) - } - } + if (!self.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event') + console.log('addPeer:', peer) + console.log(self.infoHash) if (self.client.blocked) { var host @@ -563,6 +584,7 @@ Torrent.prototype.addPeer = function (peer) { try { parts = addrToIPPort(peer) } catch (e) { + self._debug('ignoring peer: invalid %s', peer) self.emit('invalidPeer', peer) return false } @@ -572,25 +594,143 @@ Torrent.prototype.addPeer = function (peer) { } if (host && self.client.blocked.contains(host)) { - self.numBlockedPeers += 1 // TODO: remove this. less api surface area + self._debug('ignoring peer: blocked %s', peer) self.emit('blockedPeer', peer) return false } } - if (self.swarm) addPeer() - else self.once('listening', addPeer) - return true + var wasAdded = !!self._addPeer(peer) + if (wasAdded) { + self.emit('peer', peer) + } else { + self.emit('invalidPeer', peer) + } + return wasAdded +} + +Torrent.prototype._addPeer = function (peer) { + var self = this + if (self.destroyed) { + if (typeof peer === 'string') { + self._debug('ignoring peer: torrent is destroyed') + } else { + peer.destroy(new Error('torrent is destroyed')) + } + return null + } + if (typeof peer === 'string' && !self._validAddr(peer)) { + self._debug('ignoring peer: invalid %s', peer) + return null + } + + var id = (peer && peer.id) || peer + if (self._peers[id]) { + if (typeof peer === 'string') { + self._debug('ignoring peer: duplicate (%s)', id) + } else { + peer.destroy(new Error('duplicate peer ' + id)) + } + return null + } + + if (self.paused) { + if (typeof peer === 'string') { + self._debug('ignoring peer: torrent is paused') + } else { + peer.destroy(new Error('torrent is paused')) + } + return null + } + + self._debug('add peer %s', id) + + var newPeer + if (typeof peer === 'string') { + // `peer` is an addr ("ip:port" string) + newPeer = Peer.createTCPOutgoingPeer(peer, self) + } else { + // `peer` is a WebRTC connection (simple-peer) + newPeer = Peer.createWebRTCPeer(peer, self) + } + + self._peers[newPeer.id] = newPeer + self._peersLength += 1 + + if (typeof peer === 'string') { + // `peer` is an addr ("ip:port" string) + self._queue.push(newPeer) + self._drain() + } + + return newPeer } /** - * Add a web seed to the swarm + * Add a web seed to the torrent swarm. * @param {string} url web seed url + * @param {Object} parsedTorrent */ Torrent.prototype.addWebSeed = function (url) { if (this.destroyed) throw new Error('torrent is destroyed') + + if (!/^https?:\/\/.+/.test(url)) { + this._debug('ignoring invalid web seed %s', url) + this.emit('invalidPeer', url) + return + } + + if (this._peers[url]) { + this._debug('ignoring duplicate web seed %s', url) + this.emit('invalidPeer', url) + return + } + this._debug('add web seed %s', url) - this.swarm.addWebSeed(url, this) + + var newPeer = Peer.createWebSeedPeer(url, this) + this._peers[newPeer.id] = newPeer + this._peersLength += 1 + + this.emit('peer', url) +} + +/** + * Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a + * peer that has already sent a handshake. + * @param {Peer} peer + */ +Torrent.prototype._addIncomingPeer = function (peer) { + var self = this + if (self.destroyed) return peer.destroy(new Error('torrent is destroyed')) + if (self.paused) return peer.destroy(new Error('torrent is paused')) + + this._debug('add incoming peer %s', peer.id) + + self._peers[peer.id] = peer + self._peersLength += 1 +} + +/** + * Remove a peer from the torrent swarm. + * @param {string} peer "ip:port" string, peerId string, or simple-peer instance + */ +Torrent.prototype.removePeer = function (peer) { + var self = this + var id = (peer && peer.id) || peer + peer = self._peers[id] + + if (!peer) return + + this._debug('removePeer %s', id) + + delete self._peers[id] + self._peersLength -= 1 + + peer.destroy() + + // If torrent swarm was at capacity before, try to open a new connection now + self._drain() } /** @@ -675,6 +815,26 @@ Torrent.prototype._onWire = function (wire, addr) { var self = this self._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown') + wire.on('download', function (downloaded) { + if (self.destroyed) return + self.received += downloaded + self._downloadSpeed(downloaded) + self.client._downloadSpeed(downloaded) + self.emit('download', downloaded) + self.client.emit('download', downloaded) + }) + + wire.on('upload', function (uploaded) { + if (self.destroyed) return + self.uploaded += uploaded + self._uploadSpeed(uploaded) + self.client._uploadSpeed(uploaded) + self.emit('upload', uploaded) + self.client.emit('upload', uploaded) + }) + + self.wires.push(wire) + if (addr) { // Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers var parts = addrToIPPort(addr) @@ -739,12 +899,12 @@ Torrent.prototype._onWire = function (wire, addr) { }) wire.ut_pex.on('dropped', function (peer) { - // the remote peer believes a given peer has been dropped from the swarm. - // if we're not currently connected to it, then remove it from the swarm's queue. - var peerObj = self.swarm._peers[peer] + // the remote peer believes a given peer has been dropped from the torrent swarm. + // if we're not currently connected to it, then remove it from the queue. + var peerObj = self._peers[peer] if (peerObj && !peerObj.connected) { self._debug('ut_pex: dropped peer: %s (from %s)', peer, addr) - self.swarm.removePeer(peer) + self.removePeer(peer) } }) @@ -760,8 +920,8 @@ Torrent.prototype._onWire = function (wire, addr) { if (self.metadata) { process.nextTick(function () { - // nextTick allows wire.handshake() to be called by `bittorrent-swarm` - // first, before we send any other messages on the wire + // This allows wire.handshake() to be called (by Peer.onHandshake) before any + // messages get sent on the wire self._onWireWithMetadata(wire) }) } @@ -774,7 +934,7 @@ Torrent.prototype._onWireWithMetadata = function (wire) { function onChokeTimeout () { if (self.destroyed || wire.destroyed) return - if (self.swarm.numQueued > 2 * (self.swarm.numConns - self.swarm.numPeers) && + if (self._numQueued > 2 * (self._numConns - self.numPeers) && wire.amInterested) { wire.destroy() } else { @@ -897,7 +1057,7 @@ Torrent.prototype._updateInterest = function () { var prev = self._amInterested self._amInterested = !!self._selections.length - self.swarm.wires.forEach(function (wire) { + self.wires.forEach(function (wire) { // TODO: only call wire.interested if the wire has at least one piece we need if (self._amInterested) wire.interested() else wire.uninterested() @@ -916,7 +1076,7 @@ Torrent.prototype._update = function () { if (self.destroyed) return // update wires in random order for better request distribution - var ite = randomIterate(self.swarm.wires) + var ite = randomIterate(self.wires) var wire while ((wire = ite())) { self._updateWire(wire) @@ -961,7 +1121,7 @@ Torrent.prototype._updateWire = function (wire) { var filter = genPieceFilterFunc(start, end, tried) while (tries < len) { - piece = self.rarityMap.getRarestPiece(filter) + piece = self._rarityMap.getRarestPiece(filter) if (piece < 0) break if (self._request(wire, piece, false)) return tried[piece] = true @@ -992,8 +1152,8 @@ Torrent.prototype._updateWire = function (wire) { var missing = self.pieces[index].missing - for (; ptr < self.swarm.wires.length; ptr++) { - var otherWire = self.swarm.wires[ptr] + for (; ptr < self.wires.length; ptr++) { + var otherWire = self.wires[ptr] var otherSpeed = otherWire.downloadSpeed() if (otherSpeed < SPEED_THRESHOLD) continue @@ -1036,7 +1196,7 @@ Torrent.prototype._updateWire = function (wire) { var filter = genPieceFilterFunc(start, end, tried, rank) while (tries < len) { - piece = self.rarityMap.getRarestPiece(filter) + piece = self._rarityMap.getRarestPiece(filter) if (piece < 0) break // request all non-reserved blocks in this piece @@ -1083,7 +1243,7 @@ Torrent.prototype._rechoke = function () { var peers = [] - self.swarm.wires.forEach(function (wire) { + self.wires.forEach(function (wire) { if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) { peers.push({ wire: wire, @@ -1272,7 +1432,7 @@ Torrent.prototype._request = function (wire, index, hotswap) { self.store.put(index, buf) - self.swarm.wires.forEach(function (wire) { + self.wires.forEach(function (wire) { wire.have(index) }) @@ -1361,21 +1521,35 @@ Torrent.prototype.createServer = function (opts) { return server } +/** + * Temporarily stop connecting to new peers. Note that this does not pause the streams + * of existing connections or their wires. + */ Torrent.prototype.pause = function () { if (this.destroyed) return - this.swarm.pause() + this._debug('pause') + this.paused = true } +/** + * Resume connecting to new peers. + */ Torrent.prototype.resume = function () { if (this.destroyed) return - this.swarm.resume() + this._debug('resume') + this.paused = false + this._drain() } Torrent.prototype._onError = function (err) { var self = this self._debug('torrent error: %s', err.message || err) self.destroy() - self.emit('error', err) + if (self.listenerCount('error') === 0) { + self.client.emit('error', err) + } else { + self.emit('error', err) + } } Torrent.prototype._debug = function () { @@ -1400,3 +1574,81 @@ function randomInt (high) { } function noop () {} + +/** + * Pop a peer off the FIFO queue and connect to it. When _drain() gets called, + * the queue will usually have only one peer in it, except when there are too + * many peers (over `this.maxConns`) in which case they will just sit in the + * queue until another connection closes. + */ +Torrent.prototype._drain = function () { + var self = this + this._debug('_drain numConns %s maxConns %s', self._numConns, self.client.maxConns) + if (typeof net.connect !== 'function' || self.destroyed || self.paused || + self._numConns >= self.client.maxConns) { + return + } + this._debug('drain (%s queued, %s/%s peers)', self._numQueued, self.numPeers, self.client.maxConns) + + var peer = self._queue.shift() + if (!peer) return // queue could be empty + + this._debug('tcp connect attempt to %s', peer.addr) + + var parts = addrToIPPort(peer.addr) + var opts = { + host: parts[0], + port: parts[1] + } + + var conn = peer.conn = net.connect(opts) + + conn.once('connect', function () { peer.onConnect() }) + conn.once('error', function (err) { peer.destroy(err) }) + peer.startConnectTimeout() + + // When connection closes, attempt reconnect after timeout (with exponential backoff) + conn.on('close', function () { + if (self.destroyed) return + + // TODO: If torrent is done, do not try to reconnect after a timeout + + if (peer.retries >= RECONNECT_WAIT.length) { + this._debug( + 'conn %s closed: will not re-add (max %s attempts)', + peer.addr, RECONNECT_WAIT.length + ) + return + } + + var ms = RECONNECT_WAIT[peer.retries] + this._debug( + 'conn %s closed: will re-add to queue in %sms (attempt %s)', + peer.addr, ms, peer.retries + 1 + ) + + var reconnectTimeout = setTimeout(function reconnectTimeout () { + var newPeer = self._addPeer(peer.addr) + if (newPeer) newPeer.retries = peer.retries + 1 + }, ms) + if (reconnectTimeout.unref) reconnectTimeout.unref() + }) +} + +/** + * Returns `true` if string is valid IPv4/6 address. + * @param {string} addr + * @return {boolean} + */ +Torrent.prototype._validAddr = function (addr) { + var parts + try { + parts = addrToIPPort(addr) + } catch (e) { + return false + } + var host = parts[0] + var port = parts[1] + return port > 0 && port < 65535 && + !(host === '127.0.0.1' && port === this.client.torrentPort) +} diff --git a/lib/webconn.js b/lib/webconn.js index ca6760f..945edd4 100644 --- a/lib/webconn.js +++ b/lib/webconn.js @@ -1,3 +1,6 @@ +// TODO: cleanup events +// TODO: cleanup reference to parsedTorrent (i.e. Torrent object) + module.exports = WebConn var BitField = require('bitfield') |