diff options
-rw-r--r-- | docs/api.md | 1 | ||||
-rw-r--r-- | index.js | 21 | ||||
-rw-r--r-- | lib/conn-pool.js (renamed from lib/tcp-pool.js) | 79 | ||||
-rw-r--r-- | lib/peer.js | 40 | ||||
-rw-r--r-- | lib/torrent.js | 34 | ||||
-rw-r--r-- | package.json | 3 | ||||
-rw-r--r-- | test/node/conn-pool.js | 205 |
7 files changed, 337 insertions, 46 deletions
diff --git a/docs/api.md b/docs/api.md index 1532a89..87bd5d3 100644 --- a/docs/api.md +++ b/docs/api.md @@ -59,6 +59,7 @@ If `opts` is specified, then the default options (shown below) will be overridde tracker: Boolean|Object, // Enable trackers (default=true), or options object for Tracker dht: Boolean|Object, // Enable DHT (default=true), or options object for DHT webSeeds: Boolean // Enable BEP19 web seeds (default=true) + utp: Boolean // Enable BEP29 uTorrent transport protocol (default=false) } ``` @@ -14,7 +14,7 @@ const Peer = require('simple-peer') const randombytes = require('randombytes') const speedometer = require('speedometer') -const TCPPool = require('./lib/tcp-pool') // browser exclude +const ConnPool = require('./lib/conn-pool') // browser exclude const Torrent = require('./lib/torrent') const VERSION = require('./package.json').version @@ -72,6 +72,7 @@ class WebTorrent extends EventEmitter { this.tracker = opts.tracker !== undefined ? opts.tracker : {} this.torrents = [] this.maxConns = Number(opts.maxConns) || 55 + this.utp = opts.utp === true this._debug( 'new webtorrent (peerId %s, nodeId %s, port %s)', @@ -95,8 +96,8 @@ class WebTorrent extends EventEmitter { } } - if (typeof TCPPool === 'function') { - this._tcpPool = new TCPPool(this) + if (typeof ConnPool === 'function') { + this._connPool = new ConnPool(this) } else { process.nextTick(() => { this._onListening() @@ -355,8 +356,8 @@ class WebTorrent extends EventEmitter { address () { if (!this.listening) return null - return this._tcpPool - ? this._tcpPool.server.address() + return this._connPool + ? this._connPool.tcpServer.address() : { address: '0.0.0.0', family: 'IPv4', port: 0 } } @@ -377,9 +378,9 @@ class WebTorrent extends EventEmitter { torrent.destroy(cb) }) - if (this._tcpPool) { + if (this._connPool) { tasks.push(cb => { - this._tcpPool.destroy(cb) + this._connPool.destroy(cb) }) } @@ -394,7 +395,7 @@ class WebTorrent extends EventEmitter { if (err) this.emit('error', err) this.torrents = [] - this._tcpPool = null + this._connPool = null this.dht = null } @@ -402,9 +403,9 @@ class WebTorrent extends EventEmitter { this._debug('listening') this.listening = true - if (this._tcpPool) { + if (this._connPool) { // Sometimes server.address() returns `null` in Docker. - const address = this._tcpPool.server.address() + const address = this._connPool.tcpServer.address() if (address) this.torrentPort = address.port } diff --git a/lib/tcp-pool.js b/lib/conn-pool.js index 1094c4f..ddc0672 100644 --- a/lib/tcp-pool.js +++ b/lib/conn-pool.js @@ -1,30 +1,30 @@ const arrayRemove = require('unordered-array-remove') -const debug = require('debug')('webtorrent:tcp-pool') +const debug = require('debug')('webtorrent:conn-pool') const net = require('net') // browser exclude +const utp = require('utp-native') // browser exclude const Peer = require('./peer') /** - * TCPPool + * Connection Pool * - * A "TCP pool" allows multiple swarms to listen on the same TCP port and determines + * A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines * which swarm incoming connections are intended for by inspecting the bittorrent * handshake that the remote peer sends. * * @param {number} port */ -class TCPPool { +class ConnPool { constructor (client) { - debug('create tcp pool (port %s)', client.torrentPort) + debug('create pool (port %s)', client.torrentPort) - this.server = net.createServer() this._client = client // Temporarily store incoming connections so they can be destroyed if the server is // closed before the connection is passed off to a Torrent. this._pendingConns = [] - this._onConnectionBound = conn => { + this._onConnectionBound = (conn) => { this._onConnection(conn) } @@ -32,42 +32,77 @@ class TCPPool { this._client._onListening() } - this._onError = err => { + this._onTCPError = (err) => { this._client._destroy(err) } - this.server.on('connection', this._onConnectionBound) - this.server.on('listening', this._onListening) - this.server.on('error', this._onError) + this._onUTPError = () => { + this._client.utp = false + } - this.server.listen(client.torrentPort) + // Setup TCP + this.tcpServer = net.createServer() + this.tcpServer.on('connection', this._onConnectionBound) + this.tcpServer.on('error', this._onTCPError) + + // Start TCP + this.tcpServer.listen(client.torrentPort, () => { + debug('creating tcpServer in port %s', this.tcpServer.address().port) + if (this._client.utp) { + // Setup uTP + this.utpServer = utp.createServer() + this.utpServer.on('connection', this._onConnectionBound) + this.utpServer.on('listening', this._onListening) + this.utpServer.on('error', this._onUTPError) + + // Start uTP + debug('creating utpServer in port %s', this.tcpServer.address().port) + this.utpServer.listen(this.tcpServer.address().port) + } else { + this._onListening() + } + }) } /** - * Destroy this TCP pool. + * Destroy this Conn pool. * @param {function} cb */ destroy (cb) { - debug('destroy tcp pool') + debug('destroy conn pool') - this.server.removeListener('connection', this._onConnectionBound) - this.server.removeListener('listening', this._onListening) - this.server.removeListener('error', this._onError) + if (this.utpServer) { + this.utpServer.removeListener('connection', this._onConnectionBound) + this.utpServer.removeListener('listening', this._onListening) + this.utpServer.removeListener('error', this._onUTPError) + } + + this.tcpServer.removeListener('connection', this._onConnectionBound) + this.tcpServer.removeListener('error', this._onTCPError) // Destroy all open connection objects so server can close gracefully without waiting // for connection timeout or remote peer to disconnect. - this._pendingConns.forEach(conn => { + this._pendingConns.forEach((conn) => { conn.on('error', noop) conn.destroy() }) + if (this.utpServer) { + try { + this.utpServer.close(cb) + } catch (err) { + if (cb) process.nextTick(cb) + } + } + try { - this.server.close(cb) + this.tcpServer.close(cb) } catch (err) { if (cb) process.nextTick(cb) } - this.server = null + this.tcpServer = null + this.utpServer = null this._client = null this._pendingConns = null } @@ -92,7 +127,7 @@ class TCPPool { self._pendingConns.push(conn) conn.once('close', cleanupPending) - const peer = Peer.createTCPIncomingPeer(conn) + const peer = this.utpServer ? Peer.createUTPIncomingPeer(conn) : Peer.createTCPIncomingPeer(conn) const wire = peer.wire wire.once('handshake', onHandshake) @@ -125,4 +160,4 @@ class TCPPool { function noop () {} -module.exports = TCPPool +module.exports = ConnPool diff --git a/lib/peer.js b/lib/peer.js index 0ddf535..b0fea6d 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -5,6 +5,7 @@ const Wire = require('bittorrent-protocol') const WebConn = require('./webconn') const CONNECT_TIMEOUT_TCP = 5000 +const CONNECT_TIMEOUT_UTP = 5000 const CONNECT_TIMEOUT_WEBRTC = 25000 const HANDSHAKE_TIMEOUT = 25000 @@ -46,6 +47,22 @@ exports.createTCPIncomingPeer = conn => { } /** + * Incoming uTP peers start out connected, because the remote peer connected to the + * listening port of the uTP server. Until the remote peer sends a handshake, we don't + * know what swarm the connection is intended for. + */ +exports.createUTPIncomingPeer = conn => { + const addr = `${conn.remoteAddress}:${conn.remotePort}` + const peer = new Peer(addr, 'utpIncoming') + peer.conn = conn + peer.addr = addr + + peer.onConnect() + + return peer +} + +/** * Outgoing TCP peers start out with just an IP address. At some point (when there is an * available connection), the client can attempt to connect to the address. */ @@ -58,6 +75,18 @@ exports.createTCPOutgoingPeer = (addr, swarm) => { } /** + * Outgoing uTP peers start out with just an IP address. At some point (when there is an + * available connection), the client can attempt to connect to the address. + */ +exports.createUTPOutgoingPeer = (addr, swarm) => { + const peer = new Peer(addr, 'utpOutgoing') + peer.addr = addr + peer.swarm = swarm + + return peer +} + +/** * Peer that represents a Web Seed (BEP17 / BEP19). */ exports.createWebSeedPeer = (url, swarm) => { @@ -193,9 +222,16 @@ class Peer { startConnectTimeout () { clearTimeout(this.connectTimeout) + + const connectTimeoutValues = { + webrtc: CONNECT_TIMEOUT_WEBRTC, + tcpOutgoing: CONNECT_TIMEOUT_TCP, + utpOutgoing: CONNECT_TIMEOUT_UTP + } + this.connectTimeout = setTimeout(() => { this.destroy(new Error('connect timeout')) - }, this.type === 'webrtc' ? CONNECT_TIMEOUT_WEBRTC : CONNECT_TIMEOUT_TCP) + }, connectTimeoutValues[this.type]) if (this.connectTimeout.unref) this.connectTimeout.unref() } @@ -212,7 +248,7 @@ class Peer { this.destroyed = true this.connected = false - debug('destroy %s (error: %s)', this.id, err && (err.message || err)) + debug('destroy %s %s (error: %s)', this.type, this.id, err && (err.message || err)) clearTimeout(this.connectTimeout) clearTimeout(this.handshakeTimeout) diff --git a/lib/torrent.js b/lib/torrent.js index caa6687..56326fd 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -24,6 +24,7 @@ const sha1 = require('simple-sha1') const speedometer = require('speedometer') const utMetadata = require('ut_metadata') const utPex = require('ut_pex') // browser exclude +const utp = require('utp-native') // browser exclude const parseRange = require('parse-numeric-range') const File = require('./file') @@ -750,7 +751,7 @@ class Torrent extends EventEmitter { } } - const wasAdded = !!this._addPeer(peer) + const wasAdded = !!this._addPeer(peer, this.client.utp ? 'utp' : 'tcp') if (wasAdded) { this.emit('peer', peer) } else { @@ -759,7 +760,7 @@ class Torrent extends EventEmitter { return wasAdded } - _addPeer (peer) { + _addPeer (peer, type) { if (this.destroyed) { if (typeof peer !== 'string') peer.destroy() return null @@ -787,7 +788,7 @@ class Torrent extends EventEmitter { let newPeer if (typeof peer === 'string') { // `peer` is an addr ("ip:port" string) - newPeer = Peer.createTCPOutgoingPeer(peer, this) + newPeer = type === 'utp' ? Peer.createUTPOutgoingPeer(peer, this) : Peer.createTCPOutgoingPeer(peer, this) } else { // `peer` is a WebRTC connection (simple-peer) newPeer = Peer.createWebRTCPeer(peer, this) @@ -1680,7 +1681,7 @@ class Torrent extends EventEmitter { const peer = this._queue.shift() if (!peer) return // queue could be empty - this._debug('tcp connect attempt to %s', peer.addr) + this._debug('%s connect attempt to %s', peer.type, peer.addr) const parts = addrToIPPort(peer.addr) const opts = { @@ -1688,7 +1689,13 @@ class Torrent extends EventEmitter { port: parts[1] } - const conn = peer.conn = net.connect(opts) + if (peer.type === 'utpOutgoing') { + peer.conn = utp.connect(opts.port, opts.host) + } else { + peer.conn = net.connect(opts) + } + + const conn = peer.conn conn.once('connect', () => { peer.onConnect() }) conn.once('error', err => { peer.destroy(err) }) @@ -1701,11 +1708,16 @@ class Torrent extends EventEmitter { // TODO: If torrent is done, do not try to reconnect after a timeout if (peer.retries >= RECONNECT_WAIT.length) { - this._debug( - 'conn %s closed: will not re-add (max %s attempts)', - peer.addr, RECONNECT_WAIT.length - ) - return + if (this.client.utp) { + const newPeer = this._addPeer(peer.addr, 'tcp') + if (newPeer) newPeer.retries = 0 + } else { + this._debug( + 'conn %s closed: will not re-add (max %s attempts)', + peer.addr, RECONNECT_WAIT.length + ) + return + } } const ms = RECONNECT_WAIT[peer.retries] @@ -1715,7 +1727,7 @@ class Torrent extends EventEmitter { ) const reconnectTimeout = setTimeout(() => { - const newPeer = this._addPeer(peer.addr) + const newPeer = this._addPeer(peer.addr, this.client.utp ? 'utp' : 'tcp') if (newPeer) newPeer.retries = peer.retries + 1 }, ms) if (reconnectTimeout.unref) reconnectTimeout.unref() diff --git a/package.json b/package.json index d73d874..86c2b3b 100644 --- a/package.json +++ b/package.json @@ -73,7 +73,8 @@ "torrent-piece": "^2.0.0", "unordered-array-remove": "^1.0.2", "ut_metadata": "^3.5.0", - "ut_pex": "^2.0.0" + "ut_pex": "^2.0.0", + "utp-native": "^2.2.1" }, "devDependencies": { "airtap": "^3.0.0", diff --git a/test/node/conn-pool.js b/test/node/conn-pool.js new file mode 100644 index 0000000..435d951 --- /dev/null +++ b/test/node/conn-pool.js @@ -0,0 +1,205 @@ +var test = require('tape') +var fixtures = require('webtorrent-fixtures') +var WebTorrent = require('../../') +const MemoryChunkStore = require('memory-chunk-store') +const dgram = require('dgram') + +test('client.conn-pool: use TCP when uTP disabled', function (t) { + t.plan(6) + + var client1 = new WebTorrent({ dht: false, tracker: false, utp: false }) + var client2 = new WebTorrent({ dht: false, tracker: false, utp: false }) + + client1.on('error', function (err) { t.fail(err) }) + client1.on('warning', function (err) { t.fail(err) }) + + client2.on('error', function (err) { t.fail(err) }) + client2.on('warning', function (err) { t.fail(err) }) + + // Start seeding + client2.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }) + + client2.on('listening', function () { + // Start downloading + var torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore }) + + // Manually connect peers + torrent.addPeer('127.0.0.1:' + client2.address().port) + + var order = 0 + + torrent.on('infoHash', function () { + t.equal(++order, 1) + }) + + torrent.on('metadata', function () { + t.equal(++order, 2) + }) + + torrent.on('ready', function () { + t.equal(++order, 3) + }) + + torrent.on('done', function () { + t.equal(++order, 4) + + client1.destroy(function (err) { t.error(err, 'client 1 destroyed') }) + client2.destroy(function (err) { t.error(err, 'client 2 destroyed') }) + }) + }) +}) + +test('client.conn-pool: use uTP when uTP enabled', function (t) { + t.plan(6) + + var client1 = new WebTorrent({ dht: false, tracker: false, utp: true }) + var client2 = new WebTorrent({ dht: false, tracker: false, utp: true }) + + client1.on('error', function (err) { t.fail(err) }) + client1.on('warning', function (err) { t.fail(err) }) + + client2.on('error', function (err) { t.fail(err) }) + client2.on('warning', function (err) { t.fail(err) }) + + // Start seeding + client2.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }) + + client2.on('listening', function () { + // Start downloading + var torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore }) + + // Manually connect peers + torrent.addPeer('127.0.0.1:' + client2.address().port) + + var order = 0 + + torrent.on('infoHash', function () { + t.equal(++order, 1) + }) + + torrent.on('metadata', function () { + t.equal(++order, 2) + }) + + torrent.on('ready', function () { + t.equal(++order, 3) + }) + + torrent.on('done', function () { + t.equal(++order, 4) + + client1.destroy(function (err) { t.error(err, 'client 1 destroyed') }) + client2.destroy(function (err) { t.error(err, 'client 2 destroyed') }) + }) + }) +}) + +// Warning: slow test as we need to rely on connection timeouts +test('client.conn-pool: fallback to TCP when uTP server failed', function (t) { + t.plan(6) + + // force uTP server failure + const server = dgram.createSocket('udp4') + server.bind(63000) + + var client1 = new WebTorrent({ dht: false, tracker: false, utp: true, torrentPort: 63000 }) + var client2 = new WebTorrent({ dht: false, tracker: false, utp: false }) + + client1.on('error', function (err) { t.fail(err) }) + client1.on('warning', function (err) { t.fail(err) }) + + client2.on('error', function (err) { t.fail(err) }) + client2.on('warning', function (err) { t.fail(err) }) + + // Start seeding + client2.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }) + + client2.on('listening', function () { + // Start downloading + var torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore }) + + // Manually connect peers + torrent.addPeer('127.0.0.1:' + client2.address().port) + + var order = 0 + + torrent.on('infoHash', function () { + t.equal(++order, 1) + }) + + torrent.on('metadata', function () { + t.equal(++order, 2) + }) + + torrent.on('ready', function () { + t.equal(++order, 3) + }) + + torrent.on('done', function () { + t.equal(++order, 4) + + client1.destroy(function (err) { t.error(err, 'client 1 destroyed') }) + client2.destroy(function (err) { t.error(err, 'client 2 destroyed') }) + + server.close() + }) + }) +}) + +// Warning: slow test as we need to rely on connection timeouts +test('client.conn-pool: fallback to TCP when remote client has uTP disabled', function (t) { + t.plan(6) + + var client1 = new WebTorrent({ dht: false, tracker: false, utp: true }) + var client2 = new WebTorrent({ dht: false, tracker: false, utp: false }) + + client1.on('error', function (err) { t.fail(err) }) + client1.on('warning', function (err) { t.fail(err) }) + + client2.on('error', function (err) { t.fail(err) }) + client2.on('warning', function (err) { t.fail(err) }) + + // Start seeding + client2.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }) + + client2.on('listening', function () { + // Start downloading + var torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore }) + + // Manually connect peers + torrent.addPeer('127.0.0.1:' + client2.address().port) + + var order = 0 + + torrent.on('infoHash', function () { + t.equal(++order, 1) + }) + + torrent.on('metadata', function () { + t.equal(++order, 2) + }) + + torrent.on('ready', function () { + t.equal(++order, 3) + }) + + torrent.on('done', function () { + t.equal(++order, 4) + + client1.destroy(function (err) { t.error(err, 'client 1 destroyed') }) + client2.destroy(function (err) { t.error(err, 'client 2 destroyed') }) + }) + }) +}) |