Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFeross Aboukhadijeh <feross@feross.org>2016-03-29 03:56:02 +0300
committerFeross Aboukhadijeh <feross@feross.org>2016-03-29 04:02:08 +0300
commit4c21d2d154a4775e3f706426acc4625f7db0a53a (patch)
treeb01b7144dcc6fb580a8ff97180dec5e8ac9b9a38
parent0ee69bc16de506d64ec6c5af657a012c1edba43b (diff)
Deprecate bittorrent-swarm, inline into webtorrent
bittorrent-swarm and the Torrent object in webtorrent are very coupled. It doesn't make much sense to publish them separately, as bittorrent-swarm can't be used independently.
-rw-r--r--lib/peer.js253
-rw-r--r--lib/swarm.js412
-rw-r--r--lib/tcp-pool.js235
-rw-r--r--lib/torrent.js2
-rw-r--r--lib/webconn.js135
-rw-r--r--package.json14
-rw-r--r--test/node/swarm-basic.js77
-rw-r--r--test/node/swarm-reconnect.js62
-rw-r--r--test/node/swarm-timeout.js50
-rw-r--r--test/swarm.js82
10 files changed, 1318 insertions, 4 deletions
diff --git a/lib/peer.js b/lib/peer.js
new file mode 100644
index 0000000..f4667b5
--- /dev/null
+++ b/lib/peer.js
@@ -0,0 +1,253 @@
+var debug = require('debug')('webtorrent:peer')
+var Wire = require('bittorrent-protocol')
+
+var WebConn = require('./webconn')
+
+var CONNECT_TIMEOUT = 25000
+var HANDSHAKE_TIMEOUT = 25000
+
+/**
+ * WebRTC peer connections start out connected, because WebRTC peers require an
+ * "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address
+ * that lets you refer to a WebRTC endpoint.
+ */
+exports.createWebRTCPeer = function (conn, swarm) {
+ var peer = new Peer(conn.id, 'webrtc')
+ peer.conn = conn
+ peer.swarm = swarm
+
+ if (peer.conn.connected) {
+ peer.onConnect()
+ } else {
+ peer.conn.once('connect', function () { peer.onConnect() })
+ peer.conn.once('error', function (err) { peer.destroy(err) })
+ peer.setConnectTimeout()
+ }
+
+ return peer
+}
+
+/**
+ * Incoming TCP peers start out connected, because the remote peer connected to the
+ * listening port of the TCP server. Until the remote peer sends a handshake, we don't
+ * know what swarm the connection is intended for.
+ */
+exports.createTCPIncomingPeer = function (conn) {
+ var addr = conn.remoteAddress + ':' + conn.remotePort
+ var peer = new Peer(addr, 'tcpIncoming')
+ peer.conn = conn
+ peer.addr = addr
+
+ peer.onConnect()
+
+ return peer
+}
+
+/**
+ * Outgoing TCP peers start out with just an IP address. At some point (when there is an
+ * available connection), the client can attempt to connect to the address.
+ */
+exports.createTCPOutgoingPeer = function (addr, swarm) {
+ var peer = new Peer(addr, 'tcpOutgoing')
+ peer.addr = addr
+ peer.swarm = swarm
+
+ return peer
+}
+
+/**
+ * Peer that represents a Web Seed (BEP17 / BEP19).
+ */
+exports.createWebSeedPeer = function (url, parsedTorrent, swarm) {
+ var peer = new Peer(url, 'webSeed')
+ peer.swarm = swarm
+ peer.conn = new WebConn(url, parsedTorrent)
+
+ peer.onConnect()
+
+ return peer
+}
+
+/**
+ * Peer. Represents a peer in the Swarm.
+ *
+ * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds)
+ * @param {string} type the type of the peer
+ */
+function Peer (id, type) {
+ var self = this
+ self.id = id
+ self.type = type
+
+ debug('new Peer %s', id)
+
+ self.addr = null
+ self.conn = null
+ self.swarm = null
+ self.wire = null
+
+ self.connected = false
+ self.destroyed = false
+ self.timeout = null // handshake timeout
+ self.retries = 0 // outgoing TCP connection retry count
+
+ self.sentHandshake = false
+}
+
+/**
+ * Called once the peer is connected (i.e. fired 'connect' event)
+ * @param {Socket} conn
+ */
+Peer.prototype.onConnect = function () {
+ var self = this
+ if (self.destroyed) return
+ self.connected = true
+
+ debug('Peer %s connected', self.id)
+
+ clearTimeout(self.connectTimeout)
+
+ var conn = self.conn
+ conn.once('end', function () {
+ self.destroy()
+ })
+ conn.once('close', function () {
+ self.destroy()
+ })
+ conn.once('finish', function () {
+ self.destroy()
+ })
+ conn.once('error', function (err) {
+ self.destroy(err)
+ })
+
+ var wire = self.wire = new Wire()
+ wire.type = self.type
+ wire.once('end', function () {
+ self.destroy()
+ })
+ wire.once('close', function () {
+ self.destroy()
+ })
+ wire.once('finish', function () {
+ self.destroy()
+ })
+ wire.once('error', function (err) {
+ self.destroy(err)
+ })
+
+ wire.once('handshake', function (infoHash, peerId) {
+ self.onHandshake(infoHash, peerId)
+ })
+ self.setHandshakeTimeout()
+
+ conn.pipe(wire).pipe(conn)
+ if (self.swarm && !self.sentHandshake) self.handshake()
+}
+
+/**
+ * Called when handshake is received from remote peer.
+ * @param {string} infoHash
+ * @param {string} peerId
+ */
+Peer.prototype.onHandshake = function (infoHash, peerId) {
+ var self = this
+ if (!self.swarm) return // `self.swarm` not set yet, so do nothing
+
+ if (self.swarm.destroyed) return self.destroy(new Error('swarm already destroyed'))
+ if (infoHash !== self.swarm.infoHash) {
+ return self.destroy(new Error('unexpected handshake info hash for this swarm'))
+ }
+ if (peerId === self.swarm.peerId) {
+ return self.destroy(new Error('refusing to handshake with self'))
+ }
+
+ debug('Peer %s got handshake %s', self.id, infoHash)
+
+ clearTimeout(self.handshakeTimeout)
+
+ self.retries = 0
+
+ self.wire.on('download', function (downloaded) {
+ if (self.destroyed) return
+ self.swarm.downloaded += downloaded
+ self.swarm.downloadSpeed(downloaded)
+ self.swarm.emit('download', downloaded)
+ })
+
+ self.wire.on('upload', function (uploaded) {
+ if (self.destroyed) return
+ self.swarm.uploaded += uploaded
+ self.swarm.uploadSpeed(uploaded)
+ self.swarm.emit('upload', uploaded)
+ })
+
+ self.swarm.wires.push(self.wire)
+
+ var addr = self.addr
+ if (!addr && self.conn.remoteAddress) {
+ addr = self.conn.remoteAddress + ':' + self.conn.remotePort
+ }
+ self.swarm.emit('wire', self.wire, addr)
+ // swarm could be destroyed in user's 'wire' event handler
+ if (!self.swarm || self.swarm.destroyed) return
+
+ if (!self.sentHandshake) self.handshake()
+}
+
+Peer.prototype.handshake = function () {
+ var self = this
+ self.wire.handshake(self.swarm.infoHash, self.swarm.peerId, self.swarm.handshakeOpts)
+ self.sentHandshake = true
+}
+
+Peer.prototype.setConnectTimeout = function () {
+ var self = this
+ clearTimeout(self.connectTimeout)
+ self.connectTimeout = setTimeout(function () {
+ self.destroy(new Error('connect timeout'))
+ }, CONNECT_TIMEOUT)
+ if (self.connectTimeout.unref) self.connectTimeout.unref()
+}
+
+Peer.prototype.setHandshakeTimeout = function () {
+ var self = this
+ clearTimeout(self.handshakeTimeout)
+ self.handshakeTimeout = setTimeout(function () {
+ self.destroy(new Error('handshake timeout'))
+ }, HANDSHAKE_TIMEOUT)
+ if (self.handshakeTimeout.unref) self.handshakeTimeout.unref()
+}
+
+Peer.prototype.destroy = function (err) {
+ var self = this
+ if (self.destroyed) return
+ self.destroyed = true
+ self.connected = false
+
+ debug('destroy %s (error: %s)', self.id, err && (err.message || err))
+
+ clearTimeout(self.connectTimeout)
+ clearTimeout(self.handshakeTimeout)
+
+ var swarm = self.swarm
+ var conn = self.conn
+ var wire = self.wire
+
+ self.conn = null
+ self.swarm = null
+ self.wire = null
+
+ if (swarm && wire) {
+ var index = swarm.wires.indexOf(wire)
+ if (index >= 0) swarm.wires.splice(index, 1)
+ }
+ if (conn) {
+ conn.on('error', noop)
+ conn.destroy()
+ }
+ if (wire) wire.destroy()
+ if (swarm) swarm.removePeer(self.id)
+}
+
+function noop () {}
diff --git a/lib/swarm.js b/lib/swarm.js
new file mode 100644
index 0000000..c648b15
--- /dev/null
+++ b/lib/swarm.js
@@ -0,0 +1,412 @@
+module.exports = Swarm
+
+var addrToIPPort = require('addr-to-ip-port')
+var debug = require('debug')('webtorrent:swarm')
+var EventEmitter = require('events').EventEmitter
+var inherits = require('inherits')
+var net = require('net') // browser exclude
+var speedometer = require('speedometer')
+
+var Peer = require('./peer')
+var TCPPool = require('./tcp-pool') // browser-exclude
+
+var MAX_CONNS = 55
+var RECONNECT_WAIT = [ 1000, 5000, 15000 ]
+
+inherits(Swarm, EventEmitter)
+
+/**
+ * BitTorrent Swarm
+ *
+ * Abstraction of a BitTorrent "swarm", which is handy for managing all peer
+ * connections for a given torrent download. This handles connecting to peers,
+ * listening for incoming connections, and doing the initial peer wire protocol
+ * handshake with peers. It also tracks total data uploaded/downloaded to/from
+ * the swarm.
+ *
+ * @param {Buffer|string} infoHash
+ * @param {Buffer|string} peerId
+ * @param {Object} opts
+ * @param {Object} opts.handshake handshake options (passed to bittorrent-protocol)
+ * @param {number} opts.maxConns maximum number of connections in swarm
+ */
+function Swarm (infoHash, peerId, opts) {
+ var self = this
+ if (!(self instanceof Swarm)) return new Swarm(infoHash, peerId, opts)
+ EventEmitter.call(self)
+
+ self.infoHash = typeof infoHash === 'string'
+ ? infoHash
+ : infoHash.toString('hex')
+ self.infoHashBuffer = new Buffer(self.infoHash, 'hex')
+
+ self.peerId = typeof peerId === 'string'
+ ? peerId
+ : peerId.toString('hex')
+ self.peerIdBuffer = new Buffer(self.peerId, 'hex')
+
+ if (!opts) opts = {}
+
+ debug('new swarm (i %s p %s)', self.infoHash, self.peerId)
+
+ self.handshakeOpts = opts.handshake // handshake extensions (optional)
+ self.maxConns = Number(opts.maxConns) || MAX_CONNS
+
+ self.destroyed = false
+ self.listening = false
+ self.paused = false
+
+ self.server = null // tcp listening socket
+ self.wires = [] // open wires (added *after* handshake)
+
+ self._queue = [] // queue of outgoing tcp peers to connect to
+ self._peers = {} // connected peers (addr/peerId -> Peer)
+ self._peersLength = 0 // number of elements in `self._peers` (cache, for perf)
+ self._port = 0 // tcp listening port (cache, for perf)
+
+ // track stats
+ self.downloaded = 0
+ self.uploaded = 0
+ self.downloadSpeed = speedometer()
+ self.uploadSpeed = speedometer()
+}
+
+Object.defineProperty(Swarm.prototype, 'ratio', {
+ get: function () {
+ var self = this
+ return (self.uploaded / self.downloaded) || 0
+ }
+})
+
+Object.defineProperty(Swarm.prototype, 'numQueued', {
+ get: function () {
+ var self = this
+ return self._queue.length + (self._peersLength - self.numConns)
+ }
+})
+
+Object.defineProperty(Swarm.prototype, 'numConns', {
+ get: function () {
+ var self = this
+ var numConns = 0
+ for (var id in self._peers) {
+ var peer = self._peers[id]
+ if (peer && peer.connected) numConns += 1
+ }
+ return numConns
+ }
+})
+
+Object.defineProperty(Swarm.prototype, 'numPeers', {
+ get: function () {
+ var self = this
+ return self.wires.length
+ }
+})
+
+/**
+ * Add a peer to the swarm.
+ * @param {string|simple-peer} peer "ip:port" string or simple-peer instance
+ * @param {string} peer.id bittorrent peer id (when `peer` is simple-peer)
+ * @return {boolean} true if peer was added, false if peer was invalid
+
+ */
+Swarm.prototype.addPeer = function (peer) {
+ var self = this
+ var newPeer = self._addPeer(peer)
+ return !!newPeer // don't expose private Peer instance in return value
+}
+
+Swarm.prototype._addPeer = function (peer) {
+ var self = this
+ if (self.destroyed) {
+ debug('ignoring added peer: swarm already destroyed')
+ if (typeof peer !== 'string') peer.destroy()
+ return null
+ }
+ if (typeof peer === 'string' && !self._validAddr(peer)) {
+ debug('ignoring added peer: invalid address %s', peer)
+ return null
+ }
+
+ var id = (peer && peer.id) || peer
+ if (self._peers[id]) {
+ debug('ignoring added peer: duplicate peer id')
+ if (typeof peer !== 'string') peer.destroy()
+ return null
+ }
+
+ if (self.paused) {
+ debug('ignoring added peer: swarm paused')
+ if (typeof peer !== 'string') peer.destroy()
+ return null
+ }
+
+ debug('addPeer %s', id)
+
+ var newPeer
+ if (typeof peer === 'string') {
+ // `peer` is an addr ("ip:port" string)
+ newPeer = Peer.createTCPOutgoingPeer(peer, self)
+ } else {
+ // `peer` is a WebRTC connection (simple-peer)
+ newPeer = Peer.createWebRTCPeer(peer, self)
+ }
+
+ self._peers[newPeer.id] = newPeer
+ self._peersLength += 1
+
+ if (typeof peer === 'string') {
+ // `peer` is an addr ("ip:port" string)
+ self._queue.push(newPeer)
+ self._drain()
+ }
+
+ return newPeer
+}
+
+/**
+ * Add a web seed to the swarm.
+ * @param {string} url web seed url
+ * @param {Object} parsedTorrent
+ */
+Swarm.prototype.addWebSeed = function (url, parsedTorrent) {
+ var self = this
+ if (self.destroyed) return
+
+ if (!/^https?:\/\/.+/.test(url)) {
+ debug('ignoring invalid web seed %s (from swarm.addWebSeed)', url)
+ return
+ }
+
+ if (self._peers[url]) return
+
+ debug('addWebSeed %s', url)
+
+ var newPeer = Peer.createWebSeedPeer(url, parsedTorrent, self)
+ self._peers[newPeer.id] = newPeer
+ self._peersLength += 1
+}
+
+/**
+ * Called whenever a new incoming TCP peer connects to this swarm. Called with a peer
+ * that has already sent a handshake.
+ * @param {Peer} peer
+ */
+Swarm.prototype._addIncomingPeer = function (peer) {
+ var self = this
+ if (self.destroyed) return peer.destroy(new Error('swarm already destroyed'))
+ if (self.paused) return peer.destroy(new Error('swarm paused'))
+
+ if (!self._validAddr(peer.addr)) {
+ return peer.destroy(new Error('invalid addr ' + peer.addr + ' (from incoming)'))
+ }
+ debug('_addIncomingPeer %s', peer.id)
+
+ self._peers[peer.id] = peer
+ self._peersLength += 1
+}
+
+/**
+ * Remove a peer from the swarm.
+ * @param {string} id for tcp peers, "ip:port" string; for webrtc peers, peerId
+ */
+Swarm.prototype.removePeer = function (id) {
+ var self = this
+ var peer = self._peers[id]
+ if (!peer) return
+
+ debug('removePeer %s', id)
+
+ self._peers[id] = null
+ self._peersLength -= 1
+
+ peer.destroy()
+
+ // If swarm was at capacity before, try to open a new connection now
+ self._drain()
+}
+
+/**
+ * Temporarily stop connecting to new peers. Note that this does not pause the streams
+ * of existing connections or their wires.
+ */
+Swarm.prototype.pause = function () {
+ var self = this
+ if (self.destroyed) return
+ debug('pause')
+ self.paused = true
+}
+
+/**
+ * Resume connecting to new peers.
+ */
+Swarm.prototype.resume = function () {
+ var self = this
+ if (self.destroyed) return
+ debug('resume')
+ self.paused = false
+ self._drain()
+}
+
+/**
+ * Listen on the given port for peer connections.
+ * @param {number} port
+ * @param {string=} hostname
+ * @param {function=} onlistening
+ */
+Swarm.prototype.listen = function (port, hostname, onlistening) {
+ var self = this
+ if (typeof hostname === 'function') {
+ onlistening = hostname
+ hostname = undefined
+ }
+ if (self.listening) throw new Error('swarm already listening')
+ if (onlistening) self.once('listening', onlistening)
+
+ if (typeof TCPPool === 'function') {
+ self._port = port || TCPPool.getDefaultListenPort(self.infoHash)
+ self._hostname = hostname
+
+ debug('listen %s', port)
+
+ var pool = TCPPool.addSwarm(self)
+ self.server = pool.server
+ } else {
+ // In browser, listen() is no-op, but still fire 'listening' event so that
+ // same code works in node and the browser.
+ process.nextTick(function () {
+ self._onListening(0)
+ })
+ }
+}
+
+Swarm.prototype._onListening = function (port) {
+ var self = this
+ self._port = port
+ self.listening = true
+ self.emit('listening')
+}
+
+Swarm.prototype.address = function () {
+ var self = this
+ if (!self.listening) return null
+ return self.server
+ ? self.server.address()
+ : { port: 0, family: 'IPv4', address: '127.0.0.1' }
+}
+
+/**
+ * Destroy the swarm, close all open peer connections, and do cleanup.
+ * @param {function} onclose
+ */
+Swarm.prototype.destroy = function (onclose) {
+ var self = this
+ if (self.destroyed) return
+
+ self.destroyed = true
+ self.listening = false
+ self.paused = false
+
+ if (onclose) self.once('close', onclose)
+
+ debug('destroy')
+
+ for (var id in self._peers) {
+ self.removePeer(id)
+ }
+
+ if (typeof TCPPool === 'function') {
+ TCPPool.removeSwarm(self, function () {
+ // TODO: only emit when all peers are destroyed
+ self.emit('close')
+ })
+ } else {
+ process.nextTick(function () {
+ self.emit('close')
+ })
+ }
+}
+
+/**
+ * Pop a peer off the FIFO queue and connect to it. When _drain() gets called,
+ * the queue will usually have only one peer in it, except when there are too
+ * many peers (over `this.maxConns`) in which case they will just sit in the
+ * queue until another connection closes.
+ */
+Swarm.prototype._drain = function () {
+ var self = this
+ debug('_drain numConns %s maxConns %s', self.numConns, self.maxConns)
+ if (typeof net.connect !== 'function' || self.destroyed || self.paused ||
+ self.numConns >= self.maxConns) {
+ return
+ }
+ debug('drain (%s queued, %s/%s peers)', self.numQueued, self.numPeers, self.maxConns)
+
+ var peer = self._queue.shift()
+ if (!peer) return // queue could be empty
+
+ debug('tcp connect attempt to %s', peer.addr)
+
+ var parts = addrToIPPort(peer.addr)
+ var opts = {
+ host: parts[0],
+ port: parts[1]
+ }
+ if (self._hostname) opts.localAddress = self._hostname
+
+ var conn = peer.conn = net.connect(opts)
+
+ conn.once('connect', function () { peer.onConnect() })
+ conn.once('error', function (err) { peer.destroy(err) })
+ peer.setConnectTimeout()
+
+ // When connection closes, attempt reconnect after timeout (with exponential backoff)
+ conn.on('close', function () {
+ if (self.destroyed) return
+
+ if (peer.retries >= RECONNECT_WAIT.length) {
+ debug(
+ 'conn %s closed: will not re-add (max %s attempts)',
+ peer.addr, RECONNECT_WAIT.length
+ )
+ return
+ }
+
+ var ms = RECONNECT_WAIT[peer.retries]
+ debug(
+ 'conn %s closed: will re-add to queue in %sms (attempt %s)',
+ peer.addr, ms, peer.retries + 1
+ )
+
+ var reconnectTimeout = setTimeout(function reconnectTimeout () {
+ var newPeer = self._addPeer(peer.addr)
+ if (newPeer) newPeer.retries = peer.retries + 1
+ }, ms)
+ if (reconnectTimeout.unref) reconnectTimeout.unref()
+ })
+}
+
+Swarm.prototype._onError = function (err) {
+ var self = this
+ self.emit('error', err)
+ self.destroy()
+}
+
+/**
+ * Returns `true` if string is valid IPv4/6 address, and is not the address of this swarm.
+ * @param {string} addr
+ * @return {boolean}
+ */
+Swarm.prototype._validAddr = function (addr) {
+ var self = this
+ var parts
+ try {
+ parts = addrToIPPort(addr)
+ } catch (e) {
+ return false
+ }
+ var host = parts[0]
+ var port = parts[1]
+ return port > 0 && port < 65535 && !(host === '127.0.0.1' && port === self._port)
+}
diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js
new file mode 100644
index 0000000..02148e3
--- /dev/null
+++ b/lib/tcp-pool.js
@@ -0,0 +1,235 @@
+module.exports = TCPPool
+
+var debug = require('debug')('webtorrent:tcp-pool')
+var net = require('net') // browser exclude
+
+var Peer = require('./peer')
+
+/**
+ * Shared TCP pools; shared among all swarms
+ * @type {Object} port: number -> pool: TCPPool
+ */
+var tcpPools = {}
+
+/**
+ * TCPPool
+ *
+ * A "TCP pool" allows multiple swarms to listen on the same TCP port and determines
+ * which swarm incoming connections are intended for by inspecting the bittorrent
+ * handshake that the remote peer sends.
+ *
+ * @param {number} port
+ * @param {string} hostname
+ */
+function TCPPool (port, hostname) {
+ var self = this
+
+ self.port = port
+ self.listening = false
+ self.swarms = {} // infoHash (hex) -> Swarm
+
+ debug('new TCPPool (port: %s, hostname: %s)', port, hostname)
+
+ // Save incoming conns so they can be destroyed if server is closed before the conn is
+ // passed off to a Swarm.
+ self.pendingConns = []
+
+ self.server = net.createServer()
+ self.server.on('connection', function (conn) { self._onConnection(conn) })
+ self.server.on('error', function (err) { self._onError(err) })
+ self.server.on('listening', function () { self._onListening() })
+ self.server.listen(self.port, hostname)
+}
+
+/**
+ * STATIC METHOD
+ * Add a swarm to a pool, creating a new pool if necessary.
+ * @param {Swarm} swarm
+ */
+TCPPool.addSwarm = function (swarm) {
+ var pool = tcpPools[swarm._port]
+ if (!pool) pool = tcpPools[swarm._port] = new TCPPool(swarm._port, swarm._hostname)
+ pool.addSwarm(swarm)
+ return pool
+}
+
+/**
+ * STATIC METHOD
+ * Remove a swarm from its pool.
+ * @param {Swarm} swarm
+ */
+TCPPool.removeSwarm = function (swarm, cb) {
+ var pool = tcpPools[swarm._port]
+ if (!pool) return cb()
+ pool.removeSwarm(swarm)
+
+ var numSwarms = 0
+ for (var infoHash in pool.swarms) {
+ var s = pool.swarms[infoHash]
+ if (s) numSwarms += 1
+ }
+ if (numSwarms === 0) pool.destroy(cb)
+ else process.nextTick(cb)
+}
+
+/**
+ * STATIC METHOD
+ * When `Swarm.prototype.listen` is called without specifying a port, a reasonable
+ * default port must be chosen. If there already exists an active TCP pool, then return
+ * that pool's port so that TCP server can be re-used. Otherwise, return 0 so node will
+ * pick a free port.
+ *
+ * @return {number} port
+ */
+TCPPool.getDefaultListenPort = function (infoHash) {
+ for (var port in tcpPools) {
+ var pool = tcpPools[port]
+ if (pool && !pool.swarms[infoHash]) return pool.port
+ }
+ return 0
+}
+
+/**
+ * Add a swarm to this TCP pool.
+ * @param {Swarm} swarm
+ */
+TCPPool.prototype.addSwarm = function (swarm) {
+ var self = this
+
+ if (self.swarms[swarm.infoHash]) {
+ process.nextTick(function () {
+ swarm._onError(new Error(
+ 'There is already a swarm with info hash ' + swarm.infoHash + ' ' +
+ 'listening on port ' + swarm._port
+ ))
+ })
+ return
+ }
+
+ self.swarms[swarm.infoHash] = swarm
+
+ if (self.listening) {
+ process.nextTick(function () {
+ swarm._onListening(self.port)
+ })
+ }
+
+ debug('add swarm %s to tcp pool %s', swarm.infoHash, self.port)
+}
+
+/**
+ * Remove a swarm from this TCP pool.
+ * @param {Swarm} swarm
+ */
+TCPPool.prototype.removeSwarm = function (swarm) {
+ var self = this
+ debug('remove swarm %s from tcp pool %s', swarm.infoHash, self.port)
+ self.swarms[swarm.infoHash] = null
+}
+
+/**
+ * Destroy this TCP pool.
+ * @param {function} cb
+ */
+TCPPool.prototype.destroy = function (cb) {
+ var self = this
+ debug('destroy tcp pool %s', self.port)
+
+ self.listening = false
+
+ // Destroy all open connection objects so server can close gracefully without waiting
+ // for connection timeout or remote peer to disconnect.
+ self.pendingConns.forEach(function (conn) {
+ conn.destroy()
+ })
+
+ tcpPools[self.port] = null
+
+ try {
+ self.server.close(cb)
+ } catch (err) {
+ if (cb) process.nextTick(cb)
+ }
+}
+
+TCPPool.prototype._onListening = function () {
+ var self = this
+
+ // Fix for Docker Node image. Sometimes server.address() returns `null`.
+ // See issue: https://github.com/feross/bittorrent-swarm/pull/18
+ var address = self.server.address() || { port: 0 }
+ var port = address.port
+
+ debug('tcp pool listening on %s', port)
+
+ if (port !== self.port) {
+ // `port` was 0 when `listen` was called; update to the port that node selected
+ tcpPools[self.port] = null
+ self.port = port
+ tcpPools[self.port] = self
+ }
+
+ self.listening = true
+
+ for (var infoHash in self.swarms) {
+ var swarm = self.swarms[infoHash]
+ if (swarm) swarm._onListening(self.port)
+ }
+}
+
+/**
+ * On incoming connections, we expect the remote peer to send a handshake first. Based
+ * on the infoHash in that handshake, route the peer to the right swarm.
+ */
+TCPPool.prototype._onConnection = function (conn) {
+ var self = this
+
+ // If the connection has already been closed before the `connect` event is fired,
+ // then `remoteAddress` will not be available, and we can't use this connection.
+ // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566
+ // - WebTorrent issue: https://github.com/feross/webtorrent/issues/398
+ if (!conn.remoteAddress) {
+ conn.on('error', noop)
+ conn.destroy()
+ return
+ }
+
+ self.pendingConns.push(conn)
+ conn.once('close', removePendingConn)
+
+ function removePendingConn () {
+ self.pendingConns.splice(self.pendingConns.indexOf(conn))
+ }
+
+ var peer = Peer.createTCPIncomingPeer(conn)
+
+ peer.wire.once('handshake', function (infoHash, peerId) {
+ removePendingConn()
+ conn.removeListener('close', removePendingConn)
+
+ var swarm = self.swarms[infoHash]
+ if (swarm) {
+ peer.swarm = swarm
+ swarm._addIncomingPeer(peer)
+ peer.onHandshake(infoHash, peerId)
+ } else {
+ var err = new Error('Unexpected info hash ' + infoHash + ' from incoming peer ' +
+ peer.id + ': destroying peer')
+ peer.destroy(err)
+ }
+ })
+}
+
+TCPPool.prototype._onError = function (err) {
+ var self = this
+ self.destroy()
+ for (var infoHash in self.swarms) {
+ var swarm = self.swarms[infoHash]
+ if (swarm) {
+ self.removeSwarm(swarm)
+ swarm._onError(err)
+ }
+ }
+}
+
+function noop () {}
diff --git a/lib/torrent.js b/lib/torrent.js
index ed47e5b..79db2ab 100644
--- a/lib/torrent.js
+++ b/lib/torrent.js
@@ -2,7 +2,7 @@
module.exports = Torrent
-var addrToIPPort = require('addr-to-ip-port') // browser exclude
+var addrToIPPort = require('addr-to-ip-port')
var BitField = require('bitfield')
var ChunkStoreWriteStream = require('chunk-store-stream/write')
var cpus = require('cpus')
diff --git a/lib/webconn.js b/lib/webconn.js
new file mode 100644
index 0000000..52176f9
--- /dev/null
+++ b/lib/webconn.js
@@ -0,0 +1,135 @@
+module.exports = WebConn
+
+var BitField = require('bitfield')
+var debug = require('debug')('bittorrent-swarm:webconn')
+var get = require('simple-get')
+var inherits = require('inherits')
+var sha1 = require('simple-sha1')
+var Wire = require('bittorrent-protocol')
+
+inherits(WebConn, Wire)
+
+/**
+ * Converts requests for torrent blocks into http range requests.
+ * @param {string} url web seed url
+ * @param {Object} parsedTorrent
+ */
+function WebConn (url, parsedTorrent) {
+ var self = this
+ Wire.call(this)
+
+ self.url = url
+ self.webPeerId = sha1.sync(url)
+ self.parsedTorrent = parsedTorrent
+
+ self.setKeepAlive(true)
+
+ self.on('handshake', function (infoHash, peerId) {
+ self.handshake(infoHash, self.webPeerId)
+ var numPieces = self.parsedTorrent.pieces.length
+ var bitfield = new BitField(numPieces)
+ for (var i = 0; i <= numPieces; i++) {
+ bitfield.set(i, true)
+ }
+ self.bitfield(bitfield)
+ })
+
+ self.on('choke', function () { debug('choke') })
+ self.on('unchoke', function () { debug('unchoke') })
+
+ self.once('interested', function () {
+ debug('interested')
+ self.unchoke()
+ })
+ self.on('uninterested', function () { debug('uninterested') })
+
+ self.on('bitfield', function () { debug('bitfield') })
+
+ self.on('request', function (pieceIndex, offset, length, callback) {
+ debug('request pieceIndex=%d offset=%d length=%d', pieceIndex, offset, length)
+ self.httpRequest(pieceIndex, offset, length, callback)
+ })
+}
+
+WebConn.prototype.httpRequest = function (pieceIndex, offset, length, cb) {
+ var self = this
+ var pieceOffset = pieceIndex * self.parsedTorrent.pieceLength
+ var rangeStart = pieceOffset + offset /* offset within whole torrent */
+ var rangeEnd = rangeStart + length - 1
+
+ // Web seed URL format
+ // For single-file torrents, you just make HTTP range requests directly to the web seed URL
+ // For multi-file torrents, you have to add the torrent folder and file name to the URL
+ var files = self.parsedTorrent.files
+ var requests
+ if (files.length <= 1) {
+ requests = [{
+ url: self.url,
+ start: rangeStart,
+ end: rangeEnd
+ }]
+ } else {
+ var requestedFiles = files.filter(function (file) {
+ return file.offset <= rangeEnd && (file.offset + file.length) > rangeStart
+ })
+ if (requestedFiles.length < 1) return cb(new Error('Could not find file corresponnding to web seed range request'))
+
+ requests = requestedFiles.map(function (requestedFile) {
+ var fileEnd = requestedFile.offset + requestedFile.length - 1
+ var url = self.url +
+ (self.url[self.url.length - 1] === '/' ? '' : '/') +
+ requestedFile.path
+ return {
+ url: url,
+ fileOffsetInRange: Math.max(requestedFile.offset - rangeStart, 0),
+ start: Math.max(rangeStart - requestedFile.offset, 0),
+ end: Math.min(fileEnd, rangeEnd - requestedFile.offset)
+ }
+ })
+ }
+
+ // Now make all the HTTP requests we need in order to load this piece
+ // Usually that's one requests, but sometimes it will be multiple
+ // Send requests in parallel and wait for them all to come back
+ var numRequestsSucceeded = 0
+ var hasError = false
+ if (requests.length > 1) var ret = new Buffer(length)
+ requests.forEach(function (request) {
+ var url = request.url
+ var start = request.start
+ var end = request.end
+ debug(
+ 'Requesting url=%s pieceIndex=%d offset=%d length=%d start=%d end=%d',
+ url, pieceIndex, offset, length, start, end
+ )
+ var opts = {
+ url: url,
+ method: 'GET',
+ headers: {
+ 'user-agent': 'WebTorrent (http://webtorrent.io)',
+ 'range': 'bytes=' + start + '-' + end
+ }
+ }
+ get.concat(opts, function (err, res, data) {
+ if (hasError) return
+ if (err) {
+ hasError = true
+ return cb(err)
+ }
+ if (res.statusCode < 200 || res.statusCode >= 300) {
+ hasError = true
+ return cb(new Error('Unexpected HTTP status code ' + res.statusCode))
+ }
+ debug('Got data of length %d', data.length)
+ if (requests.length === 1) {
+ // Common case: fetch piece in a single HTTP request, return directly
+ return cb(null, data)
+ }
+ // Rare case: reconstruct multiple HTTP requests across 2+ files into one piece buffer
+ data.copy(ret, request.fileOffsetInRange)
+ if (++numRequestsSucceeded === requests.length) {
+ cb(null, ret)
+ }
+ })
+ })
+}
diff --git a/package.json b/package.json
index 53da356..ff586d7 100644
--- a/package.json
+++ b/package.json
@@ -9,9 +9,11 @@
},
"browser": {
"./lib/server.js": false,
+ "./lib/tcp-pool.js": false,
"bittorrent-dht/client": false,
"fs-chunk-store": "memory-chunk-store",
"load-ip-set": false,
+ "net": false,
"os": false,
"path-exists": false,
"ut_pex": false
@@ -28,14 +30,14 @@
"addr-to-ip-port": "^1.0.1",
"bitfield": "^1.0.2",
"bittorrent-dht": "^7.0.0",
- "bittorrent-swarm": "^7.1.0",
+ "bittorrent-protocol": "^2.0.0",
"chunk-store-stream": "^2.0.0",
"cpus": "^1.0.0",
"create-torrent": "^3.4.0",
"debug": "^2.1.0",
"end-of-stream": "^1.0.0",
"fs-chunk-store": "^1.3.4",
- "hat": "0.0.3",
+ "hat": "^0.0.3",
"immediate-chunk-store": "^1.0.7",
"inherits": "^2.0.1",
"load-ip-set": "^1.0.3",
@@ -54,6 +56,7 @@
"render-media": "^2.0.0",
"run-parallel": "^1.0.0",
"run-parallel-limit": "^1.0.2",
+ "simple-get": "^2.0.0",
"simple-peer": "^6.0.0",
"simple-sha1": "^2.0.0",
"speedometer": "^1.0.0",
@@ -75,9 +78,9 @@
"electron-prebuilt": "^0.37.2",
"finalhandler": "^0.4.0",
"network-address": "^1.1.0",
+ "portfinder": "^1.0.0",
"run-series": "^1.0.2",
"serve-static": "^1.9.3",
- "simple-get": "^2.0.0",
"standard": "^6.0.1",
"tape": "^4.0.0",
"uglify-js": "^2.4.15",
@@ -90,8 +93,13 @@
"bittorrent client",
"download",
"mad science",
+ "p2p",
+ "peer-to-peer",
+ "peers",
"streaming",
+ "swarm",
"torrent",
+ "web torrent",
"webrtc",
"webrtc data",
"webtorrent"
diff --git a/test/node/swarm-basic.js b/test/node/swarm-basic.js
new file mode 100644
index 0000000..9e38d0a
--- /dev/null
+++ b/test/node/swarm-basic.js
@@ -0,0 +1,77 @@
+var hat = require('hat')
+var portfinder = require('portfinder')
+var Swarm = require('../../lib/swarm')
+var test = require('tape')
+
+var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36'
+var infoHash2 = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa37'
+var peerId = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex')
+var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex')
+
+test('swarm listen (explicit port)', function (t) {
+ t.plan(1)
+
+ var swarm = new Swarm(infoHash, peerId)
+ portfinder.getPort(function (err, port) {
+ if (err) throw err
+ swarm.listen(port)
+ swarm.on('listening', function () {
+ t.equal(port, swarm.address().port)
+ swarm.destroy()
+ })
+ })
+})
+
+test('two swarms listen on same port (explicit)', function (t) {
+ t.plan(2)
+
+ var swarm1 = new Swarm(infoHash, peerId)
+ portfinder.getPort(function (err, port) {
+ if (err) throw err
+ swarm1.listen(port, function () {
+ t.equal(swarm1.address().port, port, 'listened on requested port')
+
+ var swarm2 = new Swarm(infoHash2, peerId)
+ swarm2.listen(port, function () {
+ t.equal(swarm2.address().port, port, 'listened on requested port')
+ swarm1.destroy()
+ swarm2.destroy()
+ })
+ })
+ })
+})
+
+test('swarm join', function (t) {
+ t.plan(10)
+
+ var swarm1 = new Swarm(infoHash, peerId)
+ swarm1.listen(0, function () {
+ var swarm2 = new Swarm(infoHash, peerId2)
+
+ t.equal(swarm1.wires.length, 0)
+ t.equal(swarm2.wires.length, 0)
+
+ swarm2.addPeer('127.0.0.1:' + swarm1.address().port)
+
+ swarm1.on('wire', function (wire, addr) {
+ t.ok(wire, 'Peer join our swarm via listening port')
+
+ t.equal(swarm1.wires.length, 1)
+ t.ok(/127\.0\.0\.1:\d{1,5}/.test(addr))
+ t.equal(wire.peerId.toString('hex'), peerId2)
+ })
+
+ swarm2.on('wire', function (wire, addr) {
+ t.ok(wire, 'Joined swarm, got wire')
+
+ t.equal(swarm2.wires.length, 1)
+ t.ok(/127\.0\.0\.1:\d{1,5}/.test(addr))
+ t.equal(wire.peerId.toString('hex'), peerId)
+ })
+
+ t.on('end', function () {
+ swarm1.destroy()
+ swarm2.destroy()
+ })
+ })
+})
diff --git a/test/node/swarm-reconnect.js b/test/node/swarm-reconnect.js
new file mode 100644
index 0000000..4fad7d0
--- /dev/null
+++ b/test/node/swarm-reconnect.js
@@ -0,0 +1,62 @@
+var hat = require('hat')
+var Swarm = require('../../lib/swarm')
+var test = require('tape')
+
+var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36'
+var peerId1 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex')
+var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex')
+
+test('reconnect when peer disconnects', function (t) {
+ t.plan(10)
+
+ var swarm1 = new Swarm(infoHash, peerId1)
+ swarm1.listen(0, function () {
+ var swarm2 = new Swarm(infoHash, peerId2)
+
+ var time1 = 0
+ swarm1.on('wire', function (wire) {
+ if (time1 === 0) {
+ t.ok(wire, 'Peer joined via listening port')
+ t.equal(swarm1.wires.length, 1)
+
+ // at some point in future, end wire
+ setTimeout(function () {
+ wire.destroy()
+ }, 100)
+
+ // ...and prevent reconnect
+ swarm1._drain = function () {}
+ } else if (time1 === 1) {
+ t.ok(wire, 'Remote peer reconnected')
+ t.equal(swarm1.wires.length, 1)
+ } else {
+ throw new Error('too many wire events (1)')
+ }
+ time1 += 1
+ })
+
+ var time2 = 0
+ swarm2.on('wire', function (wire) {
+ if (time2 === 0) {
+ t.ok(wire, 'Joined swarm, got wire')
+ t.equal(swarm2.wires.length, 1)
+
+ wire.on('end', function () {
+ t.pass('Wire ended by remote peer')
+ t.equal(swarm1.wires.length, 0)
+ })
+ } else if (time2 === 1) {
+ t.ok(wire, 'Reconnected to remote peer')
+ t.equal(swarm2.wires.length, 1)
+
+ swarm1.destroy()
+ swarm2.destroy()
+ } else {
+ throw new Error('too many wire events (2)')
+ }
+ time2 += 1
+ })
+
+ swarm2.addPeer('127.0.0.1:' + swarm1.address().port)
+ })
+})
diff --git a/test/node/swarm-timeout.js b/test/node/swarm-timeout.js
new file mode 100644
index 0000000..4341465
--- /dev/null
+++ b/test/node/swarm-timeout.js
@@ -0,0 +1,50 @@
+var hat = require('hat')
+var Swarm = require('../../lib/swarm')
+var test = require('tape')
+
+var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36'
+var peerId1 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex')
+var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex')
+
+test('timeout if no handshake in 25 seconds', function (t) {
+ t.plan(4)
+
+ var swarm1 = new Swarm(infoHash, peerId1)
+
+ var _addIncomingPeer = swarm1._addIncomingPeer
+ swarm1._addIncomingPeer = function (peer) {
+ // Nuke the handshake function on swarm1's peer to test swarm2's
+ // handshake timeout code
+ peer.wire.handshake = function () {}
+ _addIncomingPeer.call(swarm1, peer)
+ }
+
+ swarm1.listen(0, function () {
+ var swarm2 = new Swarm(infoHash, peerId2)
+
+ var numWires = 0
+ swarm1.on('wire', function (wire) {
+ numWires += 1
+ if (numWires === 1) {
+ t.ok(wire, 'Got wire via listening port')
+ t.equal(swarm1.wires.length, 1)
+
+ // swarm2 should never get a wire since swarm1 refuses to send it a
+ // handshake
+ t.equal(swarm2.wires.length, 0)
+ } else if (numWires === 2) {
+ t.pass('swarm2 reconnected after timeout')
+ swarm1.destroy()
+ swarm2.destroy()
+ } else {
+ t.fail('got wire after destroy')
+ }
+ })
+
+ swarm2.on('wire', function (wire) {
+ t.fail('Should not get a wire because peer did not handshake')
+ })
+
+ swarm2.addPeer('127.0.0.1:' + swarm1.address().port)
+ })
+})
diff --git a/test/swarm.js b/test/swarm.js
new file mode 100644
index 0000000..9662d4d
--- /dev/null
+++ b/test/swarm.js
@@ -0,0 +1,82 @@
+var hat = require('hat')
+var Swarm = require('../lib/swarm')
+var test = require('tape')
+
+var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36'
+var infoHash2 = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa37'
+var peerId = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex')
+var peerId2 = new Buffer('-WW0001-' + hat(48), 'utf8').toString('hex')
+
+test('create swarm, check invariants', function (t) {
+ var swarm = new Swarm(infoHash, peerId)
+
+ t.equal(swarm.infoHash.toString('hex'), infoHash)
+ t.equal(swarm.peerId.toString('hex'), peerId)
+ t.equal(swarm.downloaded, 0)
+ t.equal(swarm.uploaded, 0)
+ t.ok(Array.isArray(swarm.wires))
+ t.equal(swarm.wires.length, 0)
+ t.end()
+})
+
+test('swarm listen(0) selects free port', function (t) {
+ t.plan(2)
+
+ var swarm = new Swarm(infoHash, peerId)
+ swarm.listen(0)
+ swarm.on('listening', function () {
+ var port = swarm.address().port
+ t.equal(typeof port, 'number', 'port is a number')
+ if (process.browser) {
+ t.equal(port, 0, 'port number is 0')
+ } else {
+ t.ok(port > 0 && port < 65535, 'valid port number')
+ }
+ swarm.destroy()
+ })
+})
+
+test('two swarms listen on same port (implicit)', function (t) {
+ t.plan(5)
+
+ // When no port is specified and listen is called twice, they should get assigned the same port.
+
+ var swarm1 = new Swarm(infoHash, peerId)
+ var swarm2 = new Swarm(infoHash2, peerId2)
+
+ var swarm1Port
+ var swarm2Port
+
+ function maybeDone () {
+ if (swarm1.listening && swarm2.listening) {
+ t.equal(swarm1Port, swarm2Port, 'swarms were given same port')
+
+ t.equal(typeof swarm1Port, 'number', 'port is a number')
+ if (process.browser) {
+ t.equal(swarm1Port, 0, 'port number is 0')
+ } else {
+ t.ok(swarm1Port > 0 && swarm1Port < 65535, 'valid port number')
+ }
+
+ t.equal(typeof swarm2Port, 'number', 'port is a number')
+ if (process.browser) {
+ t.equal(swarm2Port, 0, 'port number is 0')
+ } else {
+ t.ok(swarm2Port > 0 && swarm2Port < 65535, 'valid port number')
+ }
+
+ swarm1.destroy()
+ swarm2.destroy()
+ }
+ }
+
+ swarm1.listen(0, function () {
+ swarm1Port = swarm1.address().port
+ maybeDone()
+ })
+
+ swarm2.listen(0, function (port2) {
+ swarm2Port = swarm2.address().port
+ maybeDone()
+ })
+})