diff options
author | Feross Aboukhadijeh <feross@feross.org> | 2016-03-29 03:56:02 +0300 |
---|---|---|
committer | Feross Aboukhadijeh <feross@feross.org> | 2016-03-29 04:02:08 +0300 |
commit | 4c21d2d154a4775e3f706426acc4625f7db0a53a (patch) | |
tree | b01b7144dcc6fb580a8ff97180dec5e8ac9b9a38 | |
parent | 0ee69bc16de506d64ec6c5af657a012c1edba43b (diff) |
Deprecate bittorrent-swarm, inline into webtorrent
bittorrent-swarm and the Torrent object in webtorrent are very coupled.
It doesn't make much sense to publish them separately, as
bittorrent-swarm can't be used independently.
-rw-r--r-- | lib/peer.js | 253 | ||||
-rw-r--r-- | lib/swarm.js | 412 | ||||
-rw-r--r-- | lib/tcp-pool.js | 235 | ||||
-rw-r--r-- | lib/torrent.js | 2 | ||||
-rw-r--r-- | lib/webconn.js | 135 | ||||
-rw-r--r-- | package.json | 14 | ||||
-rw-r--r-- | test/node/swarm-basic.js | 77 | ||||
-rw-r--r-- | test/node/swarm-reconnect.js | 62 | ||||
-rw-r--r-- | test/node/swarm-timeout.js | 50 | ||||
-rw-r--r-- | test/swarm.js | 82 |
10 files changed, 1318 insertions, 4 deletions
diff --git a/lib/peer.js b/lib/peer.js new file mode 100644 index 0000000..f4667b5 --- /dev/null +++ b/lib/peer.js @@ -0,0 +1,253 @@ +var debug = require('debug')('webtorrent:peer') +var Wire = require('bittorrent-protocol') + +var WebConn = require('./webconn') + +var CONNECT_TIMEOUT = 25000 +var HANDSHAKE_TIMEOUT = 25000 + +/** + * WebRTC peer connections start out connected, because WebRTC peers require an + * "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address + * that lets you refer to a WebRTC endpoint. + */ +exports.createWebRTCPeer = function (conn, swarm) { + var peer = new Peer(conn.id, 'webrtc') + peer.conn = conn + peer.swarm = swarm + + if (peer.conn.connected) { + peer.onConnect() + } else { + peer.conn.once('connect', function () { peer.onConnect() }) + peer.conn.once('error', function (err) { peer.destroy(err) }) + peer.setConnectTimeout() + } + + return peer +} + +/** + * Incoming TCP peers start out connected, because the remote peer connected to the + * listening port of the TCP server. Until the remote peer sends a handshake, we don't + * know what swarm the connection is intended for. + */ +exports.createTCPIncomingPeer = function (conn) { + var addr = conn.remoteAddress + ':' + conn.remotePort + var peer = new Peer(addr, 'tcpIncoming') + peer.conn = conn + peer.addr = addr + + peer.onConnect() + + return peer +} + +/** + * Outgoing TCP peers start out with just an IP address. At some point (when there is an + * available connection), the client can attempt to connect to the address. + */ +exports.createTCPOutgoingPeer = function (addr, swarm) { + var peer = new Peer(addr, 'tcpOutgoing') + peer.addr = addr + peer.swarm = swarm + + return peer +} + +/** + * Peer that represents a Web Seed (BEP17 / BEP19). + */ +exports.createWebSeedPeer = function (url, parsedTorrent, swarm) { + var peer = new Peer(url, 'webSeed') + peer.swarm = swarm + peer.conn = new WebConn(url, parsedTorrent) + + peer.onConnect() + + return peer +} + +/** + * Peer. Represents a peer in the Swarm. + * + * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds) + * @param {string} type the type of the peer + */ +function Peer (id, type) { + var self = this + self.id = id + self.type = type + + debug('new Peer %s', id) + + self.addr = null + self.conn = null + self.swarm = null + self.wire = null + + self.connected = false + self.destroyed = false + self.timeout = null // handshake timeout + self.retries = 0 // outgoing TCP connection retry count + + self.sentHandshake = false +} + +/** + * Called once the peer is connected (i.e. fired 'connect' event) + * @param {Socket} conn + */ +Peer.prototype.onConnect = function () { + var self = this + if (self.destroyed) return + self.connected = true + + debug('Peer %s connected', self.id) + + clearTimeout(self.connectTimeout) + + var conn = self.conn + conn.once('end', function () { + self.destroy() + }) + conn.once('close', function () { + self.destroy() + }) + conn.once('finish', function () { + self.destroy() + }) + conn.once('error', function (err) { + self.destroy(err) + }) + + var wire = self.wire = new Wire() + wire.type = self.type + wire.once('end', function () { + self.destroy() + }) + wire.once('close', function () { + self.destroy() + }) + wire.once('finish', function () { + self.destroy() + }) + wire.once('error', function (err) { + self.destroy(err) + }) + + wire.once('handshake', function (infoHash, peerId) { + self.onHandshake(infoHash, peerId) + }) + self.setHandshakeTimeout() + + conn.pipe(wire).pipe(conn) + if (self.swarm && !self.sentHandshake) self.handshake() +} + +/** + * Called when handshake is received from remote peer. + * @param {string} infoHash + * @param {string} peerId + */ +Peer.prototype.onHandshake = function (infoHash, peerId) { + var self = this + if (!self.swarm) return // `self.swarm` not set yet, so do nothing + + if (self.swarm.destroyed) return self.destroy(new Error('swarm already destroyed')) + if (infoHash !== self.swarm.infoHash) { + return self.destroy(new Error('unexpected handshake info hash for this swarm')) + } + if (peerId === self.swarm.peerId) { + return self.destroy(new Error('refusing to handshake with self')) + } + + debug('Peer %s got handshake %s', self.id, infoHash) + + clearTimeout(self.handshakeTimeout) + + self.retries = 0 + + self.wire.on('download', function (downloaded) { + if (self.destroyed) return + self.swarm.downloaded += downloaded + self.swarm.downloadSpeed(downloaded) + self.swarm.emit('download', downloaded) + }) + + self.wire.on('upload', function (uploaded) { + if (self.destroyed) return + self.swarm.uploaded += uploaded + self.swarm.uploadSpeed(uploaded) + self.swarm.emit('upload', uploaded) + }) + + self.swarm.wires.push(self.wire) + + var addr = self.addr + if (!addr && self.conn.remoteAddress) { + addr = self.conn.remoteAddress + ':' + self.conn.remotePort + } + self.swarm.emit('wire', self.wire, addr) + // swarm could be destroyed in user's 'wire' event handler + if (!self.swarm || self.swarm.destroyed) return + + if (!self.sentHandshake) self.handshake() +} + +Peer.prototype.handshake = function () { + var self = this + self.wire.handshake(self.swarm.infoHash, self.swarm.peerId, self.swarm.handshakeOpts) + self.sentHandshake = true +} + +Peer.prototype.setConnectTimeout = function () { + var self = this + clearTimeout(self.connectTimeout) + self.connectTimeout = setTimeout(function () { + self.destroy(new Error('connect timeout')) + }, CONNECT_TIMEOUT) + if (self.connectTimeout.unref) self.connectTimeout.unref() +} + +Peer.prototype.setHandshakeTimeout = function () { + var self = this + clearTimeout(self.handshakeTimeout) + self.handshakeTimeout = setTimeout(function () { + self.destroy(new Error('handshake timeout')) + }, HANDSHAKE_TIMEOUT) + if (self.handshakeTimeout.unref) self.handshakeTimeout.unref() +} + +Peer.prototype.destroy = function (err) { + var self = this + if (self.destroyed) return + self.destroyed = true + self.connected = false + + debug('destroy %s (error: %s)', self.id, err && (err.message || err)) + + clearTimeout(self.connectTimeout) + clearTimeout(self.handshakeTimeout) + + var swarm = self.swarm + var conn = self.conn + var wire = self.wire + + self.conn = null + self.swarm = null + self.wire = null + + if (swarm && wire) { + var index = swarm.wires.indexOf(wire) + if (index >= 0) swarm.wires.splice(index, 1) + } + if (conn) { + conn.on('error', noop) + conn.destroy() + } + if (wire) wire.destroy() + if (swarm) swarm.removePeer(self.id) +} + +function noop () {} diff --git a/lib/swarm.js b/lib/swarm.js new file mode 100644 index 0000000..c648b15 --- /dev/null +++ b/lib/swarm.js @@ -0,0 +1,412 @@ +module.exports = Swarm + +var addrToIPPort = require('addr-to-ip-port') +var debug = require('debug')('webtorrent:swarm') +var EventEmitter = require('events').EventEmitter +var inherits = require('inherits') +var net = require('net') // browser exclude +var speedometer = require('speedometer') + +var Peer = require('./peer') +var TCPPool = require('./tcp-pool') // browser-exclude + +var MAX_CONNS = 55 +var RECONNECT_WAIT = [ 1000, 5000, 15000 ] + +inherits(Swarm, EventEmitter) + +/** + * BitTorrent Swarm + * + * Abstraction of a BitTorrent "swarm", which is handy for managing all peer + * connections for a given torrent download. This handles connecting to peers, + * listening for incoming connections, and doing the initial peer wire protocol + * handshake with peers. It also tracks total data uploaded/downloaded to/from + * the swarm. + * + * @param {Buffer|string} infoHash + * @param {Buffer|string} peerId + * @param {Object} opts + * @param {Object} opts.handshake handshake options (passed to bittorrent-protocol) + * @param {number} opts.maxConns maximum number of connections in swarm + */ +function Swarm (infoHash, peerId, opts) { + var self = this + if (!(self instanceof Swarm)) return new Swarm(infoHash, peerId, opts) + EventEmitter.call(self) + + self.infoHash = typeof infoHash === 'string' + ? infoHash + : infoHash.toString('hex') + self.infoHashBuffer = new Buffer(self.infoHash, 'hex') + + self.peerId = typeof peerId === 'string' + ? peerId + : peerId.toString('hex') + self.peerIdBuffer = new Buffer(self.peerId, 'hex') + + if (!opts) opts = {} + + debug('new swarm (i %s p %s)', self.infoHash, self.peerId) + + self.handshakeOpts = opts.handshake // handshake extensions (optional) + self.maxConns = Number(opts.maxConns) || MAX_CONNS + + self.destroyed = false + self.listening = false + self.paused = false + + self.server = null // tcp listening socket + self.wires = [] // open wires (added *after* handshake) + + self._queue = [] // queue of outgoing tcp peers to connect to + self._peers = {} // connected peers (addr/peerId -> Peer) + self._peersLength = 0 // number of elements in `self._peers` (cache, for perf) + self._port = 0 // tcp listening port (cache, for perf) + + // track stats + self.downloaded = 0 + self.uploaded = 0 + self.downloadSpeed = speedometer() + self.uploadSpeed = speedometer() +} + +Object.defineProperty(Swarm.prototype, 'ratio', { + get: function () { + var self = this + return (self.uploaded / self.downloaded) || 0 + } +}) + +Object.defineProperty(Swarm.prototype, 'numQueued', { + get: function () { + var self = this + return self._queue.length + (self._peersLength - self.numConns) + } +}) + +Object.defineProperty(Swarm.prototype, 'numConns', { + get: function () { + var self = this + var numConns = 0 + for (var id in self._peers) { + var peer = self._peers[id] + if (peer && peer.connected) numConns += 1 + } + return numConns + } +}) + +Object.defineProperty(Swarm.prototype, 'numPeers', { + get: function () { + var self = this + return self.wires.length + } +}) + +/** + * Add a peer to the swarm. + * @param {string|simple-peer} peer "ip:port" string or simple-peer instance + * @param {string} peer.id bittorrent peer id (when `peer` is simple-peer) + * @return {boolean} true if peer was added, false if peer was invalid + + */ +Swarm.prototype.addPeer = function (peer) { + var self = this + var newPeer = self._addPeer(peer) + return !!newPeer // don't expose private Peer instance in return value +} + +Swarm.prototype._addPeer = function (peer) { + var self = this + if (self.destroyed) { + debug('ignoring added peer: swarm already destroyed') + if (typeof peer !== 'string') peer.destroy() + return null + } + if (typeof peer === 'string' && !self._validAddr(peer)) { + debug('ignoring added peer: invalid address %s', peer) + return null + } + + var id = (peer && peer.id) || peer + if (self._peers[id]) { + debug('ignoring added peer: duplicate peer id') + if (typeof peer !== 'string') peer.destroy() + return null + } + + if (self.paused) { + debug('ignoring added peer: swarm paused') + if (typeof peer !== 'string') peer.destroy() + return null + } + + debug('addPeer %s', id) + + var newPeer + if (typeof peer === 'string') { + // `peer` is an addr ("ip:port" string) + newPeer = Peer.createTCPOutgoingPeer(peer, self) + } else { + // `peer` is a WebRTC connection (simple-peer) + newPeer = Peer.createWebRTCPeer(peer, self) + } + + self._peers[newPeer.id] = newPeer + self._peersLength += 1 + + if (typeof peer === 'string') { + // `peer` is an addr ("ip:port" string) + self._queue.push(newPeer) + self._drain() + } + + return newPeer +} + +/** + * Add a web seed to the swarm. + * @param {string} url web seed url + * @param {Object} parsedTorrent + */ +Swarm.prototype.addWebSeed = function (url, parsedTorrent) { + var self = this + if (self.destroyed) return + + if (!/^https?:\/\/.+/.test(url)) { + debug('ignoring invalid web seed %s (from swarm.addWebSeed)', url) + return + } + + if (self._peers[url]) return + + debug('addWebSeed %s', url) + + var newPeer = Peer.createWebSeedPeer(url, parsedTorrent, self) + self._peers[newPeer.id] = newPeer + self._peersLength += 1 +} + +/** + * Called whenever a new incoming TCP peer connects to this swarm. Called with a peer + * that has already sent a handshake. + * @param {Peer} peer + */ +Swarm.prototype._addIncomingPeer = function (peer) { + var self = this + if (self.destroyed) return peer.destroy(new Error('swarm already destroyed')) + if (self.paused) return peer.destroy(new Error('swarm paused')) + + if (!self._validAddr(peer.addr)) { + return peer.destroy(new Error('invalid addr ' + peer.addr + ' (from incoming)')) + } + debug('_addIncomingPeer %s', peer.id) + + self._peers[peer.id] = peer + self._peersLength += 1 +} + +/** + * Remove a peer from the swarm. + * @param {string} id for tcp peers, "ip:port" string; for webrtc peers, peerId + */ +Swarm.prototype.removePeer = function (id) { + var self = this + var peer = self._peers[id] + if (!peer) return + + debug('removePeer %s', id) + + self._peers[id] = null + self._peersLength -= 1 + + peer.destroy() + + // If swarm was at capacity before, try to open a new connection now + self._drain() +} + +/** + * Temporarily stop connecting to new peers. Note that this does not pause the streams + * of existing connections or their wires. + */ +Swarm.prototype.pause = function () { + var self = this + if (self.destroyed) return + debug('pause') + self.paused = true +} + +/** + * Resume connecting to new peers. + */ +Swarm.prototype.resume = function () { + var self = this + if (self.destroyed) return + debug('resume') + self.paused = false + self._drain() +} + +/** + * Listen on the given port for peer connections. + * @param {number} port + * @param {string=} hostname + * @param {function=} onlistening + */ +Swarm.prototype.listen = function (port, hostname, onlistening) { + var self = this + if (typeof hostname === 'function') { + onlistening = hostname + hostname = undefined + } + if (self.listening) throw new Error('swarm already listening') + if (onlistening) self.once('listening', onlistening) + + if (typeof TCPPool === 'function') { + self._port = port || TCPPool.getDefaultListenPort(self.infoHash) + self._hostname = hostname + + debug('listen %s', port) + + var pool = TCPPool.addSwarm(self) + self.server = pool.server + } else { + // In browser, listen() is no-op, but still fire 'listening' event so that + // same code works in node and the browser. + process.nextTick(function () { + self._onListening(0) + }) + } +} + +Swarm.prototype._onListening = function (port) { + var self = this + self._port = port + self.listening = true + self.emit('listening') +} + +Swarm.prototype.address = function () { + var self = this + if (!self.listening) return null + return self.server + ? self.server.address() + : { port: 0, family: 'IPv4', address: '127.0.0.1' } +} + +/** + * Destroy the swarm, close all open peer connections, and do cleanup. + * @param {function} onclose + */ +Swarm.prototype.destroy = function (onclose) { + var self = this + if (self.destroyed) return + + self.destroyed = true + self.listening = false + self.paused = false + + if (onclose) self.once('close', onclose) + + debug('destroy') + + for (var id in self._peers) { + self.removePeer(id) + } + + if (typeof TCPPool === 'function') { + TCPPool.removeSwarm(self, function () { + // TODO: only emit when all peers are destroyed + self.emit('close') + }) + } else { + process.nextTick(function () { + self.emit('close') + }) + } +} + +/** + * Pop a peer off the FIFO queue and connect to it. When _drain() gets called, + * the queue will usually have only one peer in it, except when there are too + * many peers (over `this.maxConns`) in which case they will just sit in the + * queue until another connection closes. + */ +Swarm.prototype._drain = function () { + var self = this + debug('_drain numConns %s maxConns %s', self.numConns, self.maxConns) + if (typeof net.connect !== 'function' || self.destroyed || self.paused || + self.numConns >= self.maxConns) { + return + } + debug('drain (%s queued, %s/%s peers)', self.numQueued, self.numPeers, self.maxConns) + + var peer = self._queue.shift() + if (!peer) return // queue could be empty + + debug('tcp connect attempt to %s', peer.addr) + + var parts = addrToIPPort(peer.addr) + var opts = { + host: parts[0], + port: parts[1] + } + if (self._hostname) opts.localAddress = self._hostname + + var conn = peer.conn = net.connect(opts) + + conn.once('connect', function () { peer.onConnect() }) + conn.once('error', function (err) { peer.destroy(err) }) + peer.setConnectTimeout() + + // When connection closes, attempt reconnect after timeout (with exponential backoff) + conn.on('close', function () { + if (self.destroyed) return + + if (peer.retries >= RECONNECT_WAIT.length) { + debug( + 'conn %s closed: will not re-add (max %s attempts)', + peer.addr, RECONNECT_WAIT.length + ) + return + } + + var ms = RECONNECT_WAIT[peer.retries] + debug( + 'conn %s closed: will re-add to queue in %sms (attempt %s)', + peer.addr, ms, peer.retries + 1 + ) + + var reconnectTimeout = setTimeout(function reconnectTimeout () { + var newPeer = self._addPeer(peer.addr) + if (newPeer) newPeer.retries = peer.retries + 1 + }, ms) + if (reconnectTimeout.unref) reconnectTimeout.unref() + }) +} + +Swarm.prototype._onError = function (err) { + var self = this + self.emit('error', err) + self.destroy() +} + +/** + * Returns `true` if string is valid IPv4/6 address, and is not the address of this swarm. + * @param {string} addr + * @return {boolean} + */ +Swarm.prototype._validAddr = function (addr) { + var self = this + var parts + try { + parts = addrToIPPort(addr) + } catch (e) { + return false + } + var host = parts[0] + var port = parts[1] + return port > 0 && port < 65535 && !(host === '127.0.0.1' && port === self._port) +} diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js new file mode 100644 index 0000000..02148e3 --- /dev/null +++ b/lib/tcp-pool.js @@ -0,0 +1,235 @@ +module.exports = TCPPool + +var debug = require('debug')('webtorrent:tcp-pool') +var net = require('net') // browser exclude + +var Peer = require('./peer') + +/** + * Shared TCP pools; shared among all swarms + * @type {Object} port: number -> pool: TCPPool + */ +var tcpPools = {} + +/** + * TCPPool + * + * A "TCP pool" allows multiple swarms to listen on the same TCP port and determines + * which swarm incoming connections are intended for by inspecting the bittorrent + * handshake that the remote peer sends. + * + * @param {number} port + * @param {string} hostname + */ +function TCPPool (port, hostname) { + var self = this + + self.port = port + self.listening = false + self.swarms = {} // infoHash (hex) -> Swarm + + debug('new TCPPool (port: %s, hostname: %s)', port, hostname) + + // Save incoming conns so they can be destroyed if server is closed before the conn is + // passed off to a Swarm. + self.pendingConns = [] + + self.server = net.createServer() + self.server.on('connection', function (conn) { self._onConnection(conn) }) + self.server.on('error', function (err) { self._onError(err) }) + self.server.on('listening', function () { self._onListening() }) + self.server.listen(self.port, hostname) +} + +/** + * STATIC METHOD + * Add a swarm to a pool, creating a new pool if necessary. + * @param {Swarm} swarm + */ +TCPPool.addSwarm = function (swarm) { + var pool = tcpPools[swarm._port] + if (!pool) pool = tcpPools[swarm._port] = new TCPPool(swarm._port, swarm._hostname) + pool.addSwarm(swarm) + return pool +} + +/** + * STATIC METHOD + * Remove a swarm from its pool. + * @param {Swarm} swarm + */ +TCPPool.removeSwarm = function (swarm, cb) { + var pool = tcpPools[swarm._port] + if (!pool) return cb() + pool.removeSwarm(swarm) + + var numSwarms = 0 + for (var infoHash in pool.swarms) { + var s = pool.swarms[infoHash] + if (s) numSwarms += 1 + } + if (numSwarms === 0) pool.destroy(cb) + else process.nextTick(cb) +} + +/** + * STATIC METHOD + * When `Swarm.prototype.listen` is called without specifying a port, a reasonable + * default port must be chosen. If there already exists an active TCP pool, then return + * that pool's port so that TCP server can be re-used. Otherwise, return 0 so node will + * pick a free port. + * + * @return {number} port + */ +TCPPool.getDefaultListenPort = function (infoHash) { + for (var port in tcpPools) { + var pool = tcpPools[port] + if (pool && !pool.swarms[infoHash]) return pool.port + } + return 0 +} + +/** + * Add a swarm to this TCP pool. + * @param {Swarm} swarm + */ +TCPPool.prototype.addSwarm = function (swarm) { + var self = this + + if (self.swarms[swarm.infoHash]) { + process.nextTick(function () { + swarm._onError(new Error( + 'There is already a swarm with info hash ' + swarm.infoHash + ' ' + + 'listening on port ' + swarm._port + )) + }) + return + } + + self.swarms[swarm.infoHash] = swarm + + if (self.listening) { + process.nextTick(function () { + swarm._onListening(self.port) + }) + } + + debug('add swarm %s to tcp pool %s', swarm.infoHash, self.port) +} + +/** + * Remove a swarm from this TCP pool. + * @param {Swarm} swarm + */ +TCPPool.prototype.removeSwarm = function (swarm) { + var self = this + debug('remove swarm %s from tcp pool %s', swarm.infoHash, self.port) + self.swarms[swarm.infoHash] = null +} + +/** + * Destroy this TCP pool. + * @param {function} cb + */ +TCPPool.prototype.destroy = function (cb) { + var self = this + debug('destroy tcp pool %s', self.port) + + self.listening = false + + // Destroy all open connection objects so server can close gracefully without waiting + // for connection timeout or remote peer to disconnect. + self.pendingConns.forEach(function (conn) { + conn.destroy() + }) + + tcpPools[self.port] = null + + try { + self.server.close(cb) + } catch (err) { + if (cb) process.nextTick(cb) + } +} + +TCPPool.prototype._onListening = function () { + var self = this + + // Fix for Docker Node image. Sometimes server.address() returns `null`. + // See issue: https://github.com/feross/bittorrent-swarm/pull/18 + var address = self.server.address() || { port: 0 } + var port = address.port + + debug('tcp pool listening on %s', port) + + if (port !== self.port) { + // `port` was 0 when `listen` was called; update to the port that node selected + tcpPools[self.port] = null + self.port = port + tcpPools[self.port] = self + } + + self.listening = true + + for (var infoHash in self.swarms) { + var swarm = self.swarms[infoHash] + if (swarm) swarm._onListening(self.port) + } +} + +/** + * On incoming connections, we expect the remote peer to send a handshake first. Based + * on the infoHash in that handshake, route the peer to the right swarm. + */ +TCPPool.prototype._onConnection = function (conn) { + var self = this + + // If the connection has already been closed before the `connect` event is fired, + // then `remoteAddress` will not be available, and we can't use this connection. + // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566 + // - WebTorrent issue: https://github.com/feross/webtorrent/issues/398 + if (!conn.remoteAddress) { + conn.on('error', noop) + conn.destroy() + return + } + + self.pendingConns.push(conn) + conn.once('close', removePendingConn) + + function removePendingConn () { + self.pendingConns.splice(self.pendingConns.indexOf(conn)) + } + + var peer = Peer.createTCPIncomingPeer(conn) + + peer.wire.once('handshake', function (infoHash, peerId) { + removePendingConn() + conn.removeListener('close', removePendingConn) + + var swarm = self.swarms[infoHash] + if (swarm) { + peer.swarm = swarm + swarm._addIncomingPeer(peer) + peer.onHandshake(infoHash, peerId) + } else { + var err = new Error('Unexpected info hash ' + infoHash + ' from incoming peer ' + + peer.id + ': destroying peer') + peer.destroy(err) + } + }) +} + +TCPPool.prototype._onError = function (err) { + var self = this + self.destroy() + for (var infoHash in self.swarms) { + var swarm = self.swarms[infoHash] + if (swarm) { + self.removeSwarm(swarm) + swarm._onError(err) + } + } +} + +function noop () {} diff --git a/lib/torrent.js b/lib/torrent.js index ed47e5b..79db2ab 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -2,7 +2,7 @@ module.exports = Torrent -var addrToIPPort = require('addr-to-ip-port') // browser exclude +var addrToIPPort = require('addr-to-ip-port') var BitField = require('bitfield') var ChunkStoreWriteStream = require('chunk-store-stream/write') var cpus = require('cpus') diff --git a/lib/webconn.js b/lib/webconn.js new file mode 100644 index 0000000..52176f9 --- /dev/null +++ b/lib/webconn.js @@ -0,0 +1,135 @@ +module.exports = WebConn + +var BitField = require('bitfield') +var debug = require('debug')('bittorrent-swarm:webconn') +var get = require('simple-get') +var inherits = require('inherits') +var sha1 = require('simple-sha1') +var Wire = require('bittorrent-protocol') + +inherits(WebConn, Wire) + +/** + * Converts requests for torrent blocks into http range requests. + * @param {string} url web seed url + * @param {Object} parsedTorrent + */ +function WebConn (url, parsedTorrent) { + var self = this + Wire.call(this) + + self.url = url + self.webPeerId = sha1.sync(url) + self.parsedTorrent = parsedTorrent + + self.setKeepAlive(true) + + self.on('handshake', function (infoHash, peerId) { + self.handshake(infoHash, self.webPeerId) + var numPieces = self.parsedTorrent.pieces.length + var bitfield = new BitField(numPieces) + for (var i = 0; i <= numPieces; i++) { + bitfield.set(i, true) + } + self.bitfield(bitfield) + }) + + self.on('choke', function () { debug('choke') }) + self.on('unchoke', function () { debug('unchoke') }) + + self.once('interested', function () { + debug('interested') + self.unchoke() + }) + self.on('uninterested', function () { debug('uninterested') }) + + self.on('bitfield', function () { debug('bitfield') }) + + self.on('request', function (pieceIndex, offset, length, callback) { + debug('request pieceIndex=%d offset=%d length=%d', pieceIndex, offset, length) + self.httpRequest(pieceIndex, offset, length, callback) + }) +} + +WebConn.prototype.httpRequest = function (pieceIndex, offset, length, cb) { + var self = this + var pieceOffset = pieceIndex * self.parsedTorrent.pieceLength + var rangeStart = pieceOffset + offset /* offset within whole torrent */ + var rangeEnd = rangeStart + length - 1 + + // Web seed URL format + // For single-file torrents, you just make HTTP range requests directly to the web seed URL + // For multi-file torrents, you have to add the torrent folder and file name to the URL + var files = self.parsedTorrent.files + var requests + if (files.length <= 1) { + requests = [{ + url: self.url, + start: rangeStart, + end: rangeEnd + }] + } else { + var requestedFiles = files.filter(function (file) { + return file.offset <= rangeEnd && (file.offset + file.length) > rangeStart + }) + if (requestedFiles.length < 1) return cb(new Error('Could not find file corresponnding to web seed range request')) + + requests = requestedFiles.map(function (requestedFile) { + var fileEnd = requestedFile.offset + requestedFile.length - 1 + var url = self.url + + (self.url[self.url.length - 1] === '/' ? '' : '/') + + requestedFile.path + return { + url: url, + fileOffsetInRange: Math.max(requestedFile.offset - rangeStart, 0), + start: Math.max(rangeStart - requestedFile.offset, 0), + end: Math.min(fileEnd, rangeEnd - requestedFile.offset) + } + }) + } + + // Now make all the HTTP requests we need in order to load this piece + // Usually that's one requests, but sometimes it will be multiple + // Send requests in parallel and wait for them all to come back + var numRequestsSucceeded = 0 + var hasError = false + if (requests.length > 1) var ret = new Buffer(length) + requests.forEach(function (request) { + var url = request.url + var start = request.start + var end = request.end + debug( + 'Requesting url=%s pieceIndex=%d offset=%d length=%d start=%d end=%d', + url, pieceIndex, offset, length, start, end + ) + var opts = { + url: url, + method: 'GET', + headers: { + 'user-agent': 'WebTorrent (http://webtorrent.io)', + 'range': 'bytes=' + start + '-' + end + } + } + get.concat(opts, function (err, res, data) { + if (hasError) return + if (err) { + hasError = true + return cb(err) + } + if (res.statusCode < 200 || res.statusCode >= 300) { + hasError = true + return cb(new Error('Unexpected HTTP status code ' + res.statusCode)) + } + debug('Got data of length %d', data.length) + if (requests.length === 1) { + // Common case: fetch piece in a single HTTP request, return directly + return cb(null, data) + } + // Rare case: reconstruct multiple HTTP requests across 2+ files into one piece buffer + data.copy(ret, request.fileOffsetInRange) + if (++numRequestsSucceeded === requests.length) { + cb(null, ret) + } + }) + }) +} diff --git a/package.json b/package.json index 53da356..ff586d7 100644 --- a/package.json +++ b/package.json @@ -9,9 +9,11 @@ }, "browser": { "./lib/server.js": false, + "./lib/tcp-pool.js": false, "bittorrent-dht/client": false, "fs-chunk-store": "memory-chunk-store", "load-ip-set": false, + "net": false, "os": false, "path-exists": false, "ut_pex": false @@ -28,14 +30,14 @@ "addr-to-ip-port": "^1.0.1", "bitfield": "^1.0.2", "bittorrent-dht": "^7.0.0", - "bittorrent-swarm": "^7.1.0", + "bittorrent-protocol": "^2.0.0", "chunk-store-stream": "^2.0.0", "cpus": "^1.0.0", "create-torrent": "^3.4.0", "debug": "^2.1.0", "end-of-stream": "^1.0.0", "fs-chunk-store": "^1.3.4", - "hat": "0.0.3", + "hat": "^0.0.3", "immediate-chunk-store": "^1.0.7", "inherits": "^2.0.1", "load-ip-set": "^1.0.3", @@ -54,6 +56,7 @@ "render-media": "^2.0.0", "run-parallel": "^1.0.0", "run-parallel-limit": "^1.0.2", + "simple-get": "^2.0.0", "simple-peer": "^6.0.0", "simple-sha1": "^2.0.0", "speedometer": "^1.0.0", @@ -75,9 +78,9 @@ "electron-prebuilt": "^0.37.2", "finalhandler": "^0.4.0", "network-address": "^1.1.0", + "portfinder": "^1.0.0", "run-series": "^1.0.2", "serve-static": "^1.9.3", - "simple-get": "^2.0.0", "standard": "^6.0.1", "tape": "^4.0.0", "uglify-js": "^2.4.15", @@ -90,8 +93,13 @@ "bittorrent client", "download", "mad science", + "p2p", + "peer-to-peer", + "peers", "streaming", + "swarm", "torrent", + "web torrent", "webrtc", "webrtc data", "webtorrent" diff --git a/test/node/swarm-basic.js b/test/node/swarm-basic.js new file mode 100644 index 0000000..9e38d0a --- /dev/null +++ b/test/node/swarm-basic.js @@ -0,0 +1,77 @@ +var hat = require('hat') +var portfinder = require('portfinder') +var Swarm = require('../../lib/swarm') +var test = require('tape') + +var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +var infoHash2 = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa37' +var peerId = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') + +test('swarm listen (explicit port)', function (t) { + t.plan(1) + + var swarm = new Swarm(infoHash, peerId) + portfinder.getPort(function (err, port) { + if (err) throw err + swarm.listen(port) + swarm.on('listening', function () { + t.equal(port, swarm.address().port) + swarm.destroy() + }) + }) +}) + +test('two swarms listen on same port (explicit)', function (t) { + t.plan(2) + + var swarm1 = new Swarm(infoHash, peerId) + portfinder.getPort(function (err, port) { + if (err) throw err + swarm1.listen(port, function () { + t.equal(swarm1.address().port, port, 'listened on requested port') + + var swarm2 = new Swarm(infoHash2, peerId) + swarm2.listen(port, function () { + t.equal(swarm2.address().port, port, 'listened on requested port') + swarm1.destroy() + swarm2.destroy() + }) + }) + }) +}) + +test('swarm join', function (t) { + t.plan(10) + + var swarm1 = new Swarm(infoHash, peerId) + swarm1.listen(0, function () { + var swarm2 = new Swarm(infoHash, peerId2) + + t.equal(swarm1.wires.length, 0) + t.equal(swarm2.wires.length, 0) + + swarm2.addPeer('127.0.0.1:' + swarm1.address().port) + + swarm1.on('wire', function (wire, addr) { + t.ok(wire, 'Peer join our swarm via listening port') + + t.equal(swarm1.wires.length, 1) + t.ok(/127\.0\.0\.1:\d{1,5}/.test(addr)) + t.equal(wire.peerId.toString('hex'), peerId2) + }) + + swarm2.on('wire', function (wire, addr) { + t.ok(wire, 'Joined swarm, got wire') + + t.equal(swarm2.wires.length, 1) + t.ok(/127\.0\.0\.1:\d{1,5}/.test(addr)) + t.equal(wire.peerId.toString('hex'), peerId) + }) + + t.on('end', function () { + swarm1.destroy() + swarm2.destroy() + }) + }) +}) diff --git a/test/node/swarm-reconnect.js b/test/node/swarm-reconnect.js new file mode 100644 index 0000000..4fad7d0 --- /dev/null +++ b/test/node/swarm-reconnect.js @@ -0,0 +1,62 @@ +var hat = require('hat') +var Swarm = require('../../lib/swarm') +var test = require('tape') + +var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +var peerId1 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') + +test('reconnect when peer disconnects', function (t) { + t.plan(10) + + var swarm1 = new Swarm(infoHash, peerId1) + swarm1.listen(0, function () { + var swarm2 = new Swarm(infoHash, peerId2) + + var time1 = 0 + swarm1.on('wire', function (wire) { + if (time1 === 0) { + t.ok(wire, 'Peer joined via listening port') + t.equal(swarm1.wires.length, 1) + + // at some point in future, end wire + setTimeout(function () { + wire.destroy() + }, 100) + + // ...and prevent reconnect + swarm1._drain = function () {} + } else if (time1 === 1) { + t.ok(wire, 'Remote peer reconnected') + t.equal(swarm1.wires.length, 1) + } else { + throw new Error('too many wire events (1)') + } + time1 += 1 + }) + + var time2 = 0 + swarm2.on('wire', function (wire) { + if (time2 === 0) { + t.ok(wire, 'Joined swarm, got wire') + t.equal(swarm2.wires.length, 1) + + wire.on('end', function () { + t.pass('Wire ended by remote peer') + t.equal(swarm1.wires.length, 0) + }) + } else if (time2 === 1) { + t.ok(wire, 'Reconnected to remote peer') + t.equal(swarm2.wires.length, 1) + + swarm1.destroy() + swarm2.destroy() + } else { + throw new Error('too many wire events (2)') + } + time2 += 1 + }) + + swarm2.addPeer('127.0.0.1:' + swarm1.address().port) + }) +}) diff --git a/test/node/swarm-timeout.js b/test/node/swarm-timeout.js new file mode 100644 index 0000000..4341465 --- /dev/null +++ b/test/node/swarm-timeout.js @@ -0,0 +1,50 @@ +var hat = require('hat') +var Swarm = require('../../lib/swarm') +var test = require('tape') + +var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +var peerId1 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') + +test('timeout if no handshake in 25 seconds', function (t) { + t.plan(4) + + var swarm1 = new Swarm(infoHash, peerId1) + + var _addIncomingPeer = swarm1._addIncomingPeer + swarm1._addIncomingPeer = function (peer) { + // Nuke the handshake function on swarm1's peer to test swarm2's + // handshake timeout code + peer.wire.handshake = function () {} + _addIncomingPeer.call(swarm1, peer) + } + + swarm1.listen(0, function () { + var swarm2 = new Swarm(infoHash, peerId2) + + var numWires = 0 + swarm1.on('wire', function (wire) { + numWires += 1 + if (numWires === 1) { + t.ok(wire, 'Got wire via listening port') + t.equal(swarm1.wires.length, 1) + + // swarm2 should never get a wire since swarm1 refuses to send it a + // handshake + t.equal(swarm2.wires.length, 0) + } else if (numWires === 2) { + t.pass('swarm2 reconnected after timeout') + swarm1.destroy() + swarm2.destroy() + } else { + t.fail('got wire after destroy') + } + }) + + swarm2.on('wire', function (wire) { + t.fail('Should not get a wire because peer did not handshake') + }) + + swarm2.addPeer('127.0.0.1:' + swarm1.address().port) + }) +}) diff --git a/test/swarm.js b/test/swarm.js new file mode 100644 index 0000000..9662d4d --- /dev/null +++ b/test/swarm.js @@ -0,0 +1,82 @@ +var hat = require('hat') +var Swarm = require('../lib/swarm') +var test = require('tape') + +var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +var infoHash2 = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa37' +var peerId = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') +var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex') + +test('create swarm, check invariants', function (t) { + var swarm = new Swarm(infoHash, peerId) + + t.equal(swarm.infoHash.toString('hex'), infoHash) + t.equal(swarm.peerId.toString('hex'), peerId) + t.equal(swarm.downloaded, 0) + t.equal(swarm.uploaded, 0) + t.ok(Array.isArray(swarm.wires)) + t.equal(swarm.wires.length, 0) + t.end() +}) + +test('swarm listen(0) selects free port', function (t) { + t.plan(2) + + var swarm = new Swarm(infoHash, peerId) + swarm.listen(0) + swarm.on('listening', function () { + var port = swarm.address().port + t.equal(typeof port, 'number', 'port is a number') + if (process.browser) { + t.equal(port, 0, 'port number is 0') + } else { + t.ok(port > 0 && port < 65535, 'valid port number') + } + swarm.destroy() + }) +}) + +test('two swarms listen on same port (implicit)', function (t) { + t.plan(5) + + // When no port is specified and listen is called twice, they should get assigned the same port. + + var swarm1 = new Swarm(infoHash, peerId) + var swarm2 = new Swarm(infoHash2, peerId2) + + var swarm1Port + var swarm2Port + + function maybeDone () { + if (swarm1.listening && swarm2.listening) { + t.equal(swarm1Port, swarm2Port, 'swarms were given same port') + + t.equal(typeof swarm1Port, 'number', 'port is a number') + if (process.browser) { + t.equal(swarm1Port, 0, 'port number is 0') + } else { + t.ok(swarm1Port > 0 && swarm1Port < 65535, 'valid port number') + } + + t.equal(typeof swarm2Port, 'number', 'port is a number') + if (process.browser) { + t.equal(swarm2Port, 0, 'port number is 0') + } else { + t.ok(swarm2Port > 0 && swarm2Port < 65535, 'valid port number') + } + + swarm1.destroy() + swarm2.destroy() + } + } + + swarm1.listen(0, function () { + swarm1Port = swarm1.address().port + maybeDone() + }) + + swarm2.listen(0, function (port2) { + swarm2Port = swarm2.address().port + maybeDone() + }) +}) |