Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/api.md1
-rw-r--r--index.js21
-rw-r--r--lib/conn-pool.js (renamed from lib/tcp-pool.js)79
-rw-r--r--lib/peer.js40
-rw-r--r--lib/torrent.js34
-rw-r--r--package.json3
-rw-r--r--test/node/conn-pool.js205
7 files changed, 337 insertions, 46 deletions
diff --git a/docs/api.md b/docs/api.md
index 1532a89..87bd5d3 100644
--- a/docs/api.md
+++ b/docs/api.md
@@ -59,6 +59,7 @@ If `opts` is specified, then the default options (shown below) will be overridde
tracker: Boolean|Object, // Enable trackers (default=true), or options object for Tracker
dht: Boolean|Object, // Enable DHT (default=true), or options object for DHT
webSeeds: Boolean // Enable BEP19 web seeds (default=true)
+ utp: Boolean // Enable BEP29 uTorrent transport protocol (default=false)
}
```
diff --git a/index.js b/index.js
index f024ba8..312ebe7 100644
--- a/index.js
+++ b/index.js
@@ -14,7 +14,7 @@ const Peer = require('simple-peer')
const randombytes = require('randombytes')
const speedometer = require('speedometer')
-const TCPPool = require('./lib/tcp-pool') // browser exclude
+const ConnPool = require('./lib/conn-pool') // browser exclude
const Torrent = require('./lib/torrent')
const VERSION = require('./package.json').version
@@ -72,6 +72,7 @@ class WebTorrent extends EventEmitter {
this.tracker = opts.tracker !== undefined ? opts.tracker : {}
this.torrents = []
this.maxConns = Number(opts.maxConns) || 55
+ this.utp = opts.utp === true
this._debug(
'new webtorrent (peerId %s, nodeId %s, port %s)',
@@ -95,8 +96,8 @@ class WebTorrent extends EventEmitter {
}
}
- if (typeof TCPPool === 'function') {
- this._tcpPool = new TCPPool(this)
+ if (typeof ConnPool === 'function') {
+ this._connPool = new ConnPool(this)
} else {
process.nextTick(() => {
this._onListening()
@@ -355,8 +356,8 @@ class WebTorrent extends EventEmitter {
address () {
if (!this.listening) return null
- return this._tcpPool
- ? this._tcpPool.server.address()
+ return this._connPool
+ ? this._connPool.tcpServer.address()
: { address: '0.0.0.0', family: 'IPv4', port: 0 }
}
@@ -377,9 +378,9 @@ class WebTorrent extends EventEmitter {
torrent.destroy(cb)
})
- if (this._tcpPool) {
+ if (this._connPool) {
tasks.push(cb => {
- this._tcpPool.destroy(cb)
+ this._connPool.destroy(cb)
})
}
@@ -394,7 +395,7 @@ class WebTorrent extends EventEmitter {
if (err) this.emit('error', err)
this.torrents = []
- this._tcpPool = null
+ this._connPool = null
this.dht = null
}
@@ -402,9 +403,9 @@ class WebTorrent extends EventEmitter {
this._debug('listening')
this.listening = true
- if (this._tcpPool) {
+ if (this._connPool) {
// Sometimes server.address() returns `null` in Docker.
- const address = this._tcpPool.server.address()
+ const address = this._connPool.tcpServer.address()
if (address) this.torrentPort = address.port
}
diff --git a/lib/tcp-pool.js b/lib/conn-pool.js
index 1094c4f..ddc0672 100644
--- a/lib/tcp-pool.js
+++ b/lib/conn-pool.js
@@ -1,30 +1,30 @@
const arrayRemove = require('unordered-array-remove')
-const debug = require('debug')('webtorrent:tcp-pool')
+const debug = require('debug')('webtorrent:conn-pool')
const net = require('net') // browser exclude
+const utp = require('utp-native') // browser exclude
const Peer = require('./peer')
/**
- * TCPPool
+ * Connection Pool
*
- * A "TCP pool" allows multiple swarms to listen on the same TCP port and determines
+ * A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines
* which swarm incoming connections are intended for by inspecting the bittorrent
* handshake that the remote peer sends.
*
* @param {number} port
*/
-class TCPPool {
+class ConnPool {
constructor (client) {
- debug('create tcp pool (port %s)', client.torrentPort)
+ debug('create pool (port %s)', client.torrentPort)
- this.server = net.createServer()
this._client = client
// Temporarily store incoming connections so they can be destroyed if the server is
// closed before the connection is passed off to a Torrent.
this._pendingConns = []
- this._onConnectionBound = conn => {
+ this._onConnectionBound = (conn) => {
this._onConnection(conn)
}
@@ -32,42 +32,77 @@ class TCPPool {
this._client._onListening()
}
- this._onError = err => {
+ this._onTCPError = (err) => {
this._client._destroy(err)
}
- this.server.on('connection', this._onConnectionBound)
- this.server.on('listening', this._onListening)
- this.server.on('error', this._onError)
+ this._onUTPError = () => {
+ this._client.utp = false
+ }
- this.server.listen(client.torrentPort)
+ // Setup TCP
+ this.tcpServer = net.createServer()
+ this.tcpServer.on('connection', this._onConnectionBound)
+ this.tcpServer.on('error', this._onTCPError)
+
+ // Start TCP
+ this.tcpServer.listen(client.torrentPort, () => {
+ debug('creating tcpServer in port %s', this.tcpServer.address().port)
+ if (this._client.utp) {
+ // Setup uTP
+ this.utpServer = utp.createServer()
+ this.utpServer.on('connection', this._onConnectionBound)
+ this.utpServer.on('listening', this._onListening)
+ this.utpServer.on('error', this._onUTPError)
+
+ // Start uTP
+ debug('creating utpServer in port %s', this.tcpServer.address().port)
+ this.utpServer.listen(this.tcpServer.address().port)
+ } else {
+ this._onListening()
+ }
+ })
}
/**
- * Destroy this TCP pool.
+ * Destroy this Conn pool.
* @param {function} cb
*/
destroy (cb) {
- debug('destroy tcp pool')
+ debug('destroy conn pool')
- this.server.removeListener('connection', this._onConnectionBound)
- this.server.removeListener('listening', this._onListening)
- this.server.removeListener('error', this._onError)
+ if (this.utpServer) {
+ this.utpServer.removeListener('connection', this._onConnectionBound)
+ this.utpServer.removeListener('listening', this._onListening)
+ this.utpServer.removeListener('error', this._onUTPError)
+ }
+
+ this.tcpServer.removeListener('connection', this._onConnectionBound)
+ this.tcpServer.removeListener('error', this._onTCPError)
// Destroy all open connection objects so server can close gracefully without waiting
// for connection timeout or remote peer to disconnect.
- this._pendingConns.forEach(conn => {
+ this._pendingConns.forEach((conn) => {
conn.on('error', noop)
conn.destroy()
})
+ if (this.utpServer) {
+ try {
+ this.utpServer.close(cb)
+ } catch (err) {
+ if (cb) process.nextTick(cb)
+ }
+ }
+
try {
- this.server.close(cb)
+ this.tcpServer.close(cb)
} catch (err) {
if (cb) process.nextTick(cb)
}
- this.server = null
+ this.tcpServer = null
+ this.utpServer = null
this._client = null
this._pendingConns = null
}
@@ -92,7 +127,7 @@ class TCPPool {
self._pendingConns.push(conn)
conn.once('close', cleanupPending)
- const peer = Peer.createTCPIncomingPeer(conn)
+ const peer = this.utpServer ? Peer.createUTPIncomingPeer(conn) : Peer.createTCPIncomingPeer(conn)
const wire = peer.wire
wire.once('handshake', onHandshake)
@@ -125,4 +160,4 @@ class TCPPool {
function noop () {}
-module.exports = TCPPool
+module.exports = ConnPool
diff --git a/lib/peer.js b/lib/peer.js
index 0ddf535..b0fea6d 100644
--- a/lib/peer.js
+++ b/lib/peer.js
@@ -5,6 +5,7 @@ const Wire = require('bittorrent-protocol')
const WebConn = require('./webconn')
const CONNECT_TIMEOUT_TCP = 5000
+const CONNECT_TIMEOUT_UTP = 5000
const CONNECT_TIMEOUT_WEBRTC = 25000
const HANDSHAKE_TIMEOUT = 25000
@@ -46,6 +47,22 @@ exports.createTCPIncomingPeer = conn => {
}
/**
+ * Incoming uTP peers start out connected, because the remote peer connected to the
+ * listening port of the uTP server. Until the remote peer sends a handshake, we don't
+ * know what swarm the connection is intended for.
+ */
+exports.createUTPIncomingPeer = conn => {
+ const addr = `${conn.remoteAddress}:${conn.remotePort}`
+ const peer = new Peer(addr, 'utpIncoming')
+ peer.conn = conn
+ peer.addr = addr
+
+ peer.onConnect()
+
+ return peer
+}
+
+/**
* Outgoing TCP peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
@@ -58,6 +75,18 @@ exports.createTCPOutgoingPeer = (addr, swarm) => {
}
/**
+ * Outgoing uTP peers start out with just an IP address. At some point (when there is an
+ * available connection), the client can attempt to connect to the address.
+ */
+exports.createUTPOutgoingPeer = (addr, swarm) => {
+ const peer = new Peer(addr, 'utpOutgoing')
+ peer.addr = addr
+ peer.swarm = swarm
+
+ return peer
+}
+
+/**
* Peer that represents a Web Seed (BEP17 / BEP19).
*/
exports.createWebSeedPeer = (url, swarm) => {
@@ -193,9 +222,16 @@ class Peer {
startConnectTimeout () {
clearTimeout(this.connectTimeout)
+
+ const connectTimeoutValues = {
+ webrtc: CONNECT_TIMEOUT_WEBRTC,
+ tcpOutgoing: CONNECT_TIMEOUT_TCP,
+ utpOutgoing: CONNECT_TIMEOUT_UTP
+ }
+
this.connectTimeout = setTimeout(() => {
this.destroy(new Error('connect timeout'))
- }, this.type === 'webrtc' ? CONNECT_TIMEOUT_WEBRTC : CONNECT_TIMEOUT_TCP)
+ }, connectTimeoutValues[this.type])
if (this.connectTimeout.unref) this.connectTimeout.unref()
}
@@ -212,7 +248,7 @@ class Peer {
this.destroyed = true
this.connected = false
- debug('destroy %s (error: %s)', this.id, err && (err.message || err))
+ debug('destroy %s %s (error: %s)', this.type, this.id, err && (err.message || err))
clearTimeout(this.connectTimeout)
clearTimeout(this.handshakeTimeout)
diff --git a/lib/torrent.js b/lib/torrent.js
index caa6687..56326fd 100644
--- a/lib/torrent.js
+++ b/lib/torrent.js
@@ -24,6 +24,7 @@ const sha1 = require('simple-sha1')
const speedometer = require('speedometer')
const utMetadata = require('ut_metadata')
const utPex = require('ut_pex') // browser exclude
+const utp = require('utp-native') // browser exclude
const parseRange = require('parse-numeric-range')
const File = require('./file')
@@ -750,7 +751,7 @@ class Torrent extends EventEmitter {
}
}
- const wasAdded = !!this._addPeer(peer)
+ const wasAdded = !!this._addPeer(peer, this.client.utp ? 'utp' : 'tcp')
if (wasAdded) {
this.emit('peer', peer)
} else {
@@ -759,7 +760,7 @@ class Torrent extends EventEmitter {
return wasAdded
}
- _addPeer (peer) {
+ _addPeer (peer, type) {
if (this.destroyed) {
if (typeof peer !== 'string') peer.destroy()
return null
@@ -787,7 +788,7 @@ class Torrent extends EventEmitter {
let newPeer
if (typeof peer === 'string') {
// `peer` is an addr ("ip:port" string)
- newPeer = Peer.createTCPOutgoingPeer(peer, this)
+ newPeer = type === 'utp' ? Peer.createUTPOutgoingPeer(peer, this) : Peer.createTCPOutgoingPeer(peer, this)
} else {
// `peer` is a WebRTC connection (simple-peer)
newPeer = Peer.createWebRTCPeer(peer, this)
@@ -1680,7 +1681,7 @@ class Torrent extends EventEmitter {
const peer = this._queue.shift()
if (!peer) return // queue could be empty
- this._debug('tcp connect attempt to %s', peer.addr)
+ this._debug('%s connect attempt to %s', peer.type, peer.addr)
const parts = addrToIPPort(peer.addr)
const opts = {
@@ -1688,7 +1689,13 @@ class Torrent extends EventEmitter {
port: parts[1]
}
- const conn = peer.conn = net.connect(opts)
+ if (peer.type === 'utpOutgoing') {
+ peer.conn = utp.connect(opts.port, opts.host)
+ } else {
+ peer.conn = net.connect(opts)
+ }
+
+ const conn = peer.conn
conn.once('connect', () => { peer.onConnect() })
conn.once('error', err => { peer.destroy(err) })
@@ -1701,11 +1708,16 @@ class Torrent extends EventEmitter {
// TODO: If torrent is done, do not try to reconnect after a timeout
if (peer.retries >= RECONNECT_WAIT.length) {
- this._debug(
- 'conn %s closed: will not re-add (max %s attempts)',
- peer.addr, RECONNECT_WAIT.length
- )
- return
+ if (this.client.utp) {
+ const newPeer = this._addPeer(peer.addr, 'tcp')
+ if (newPeer) newPeer.retries = 0
+ } else {
+ this._debug(
+ 'conn %s closed: will not re-add (max %s attempts)',
+ peer.addr, RECONNECT_WAIT.length
+ )
+ return
+ }
}
const ms = RECONNECT_WAIT[peer.retries]
@@ -1715,7 +1727,7 @@ class Torrent extends EventEmitter {
)
const reconnectTimeout = setTimeout(() => {
- const newPeer = this._addPeer(peer.addr)
+ const newPeer = this._addPeer(peer.addr, this.client.utp ? 'utp' : 'tcp')
if (newPeer) newPeer.retries = peer.retries + 1
}, ms)
if (reconnectTimeout.unref) reconnectTimeout.unref()
diff --git a/package.json b/package.json
index d73d874..86c2b3b 100644
--- a/package.json
+++ b/package.json
@@ -73,7 +73,8 @@
"torrent-piece": "^2.0.0",
"unordered-array-remove": "^1.0.2",
"ut_metadata": "^3.5.0",
- "ut_pex": "^2.0.0"
+ "ut_pex": "^2.0.0",
+ "utp-native": "^2.2.1"
},
"devDependencies": {
"airtap": "^3.0.0",
diff --git a/test/node/conn-pool.js b/test/node/conn-pool.js
new file mode 100644
index 0000000..435d951
--- /dev/null
+++ b/test/node/conn-pool.js
@@ -0,0 +1,205 @@
+var test = require('tape')
+var fixtures = require('webtorrent-fixtures')
+var WebTorrent = require('../../')
+const MemoryChunkStore = require('memory-chunk-store')
+const dgram = require('dgram')
+
+test('client.conn-pool: use TCP when uTP disabled', function (t) {
+ t.plan(6)
+
+ var client1 = new WebTorrent({ dht: false, tracker: false, utp: false })
+ var client2 = new WebTorrent({ dht: false, tracker: false, utp: false })
+
+ client1.on('error', function (err) { t.fail(err) })
+ client1.on('warning', function (err) { t.fail(err) })
+
+ client2.on('error', function (err) { t.fail(err) })
+ client2.on('warning', function (err) { t.fail(err) })
+
+ // Start seeding
+ client2.seed(fixtures.leaves.content, {
+ name: 'Leaves of Grass by Walt Whitman.epub',
+ announce: []
+ })
+
+ client2.on('listening', function () {
+ // Start downloading
+ var torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore })
+
+ // Manually connect peers
+ torrent.addPeer('127.0.0.1:' + client2.address().port)
+
+ var order = 0
+
+ torrent.on('infoHash', function () {
+ t.equal(++order, 1)
+ })
+
+ torrent.on('metadata', function () {
+ t.equal(++order, 2)
+ })
+
+ torrent.on('ready', function () {
+ t.equal(++order, 3)
+ })
+
+ torrent.on('done', function () {
+ t.equal(++order, 4)
+
+ client1.destroy(function (err) { t.error(err, 'client 1 destroyed') })
+ client2.destroy(function (err) { t.error(err, 'client 2 destroyed') })
+ })
+ })
+})
+
+test('client.conn-pool: use uTP when uTP enabled', function (t) {
+ t.plan(6)
+
+ var client1 = new WebTorrent({ dht: false, tracker: false, utp: true })
+ var client2 = new WebTorrent({ dht: false, tracker: false, utp: true })
+
+ client1.on('error', function (err) { t.fail(err) })
+ client1.on('warning', function (err) { t.fail(err) })
+
+ client2.on('error', function (err) { t.fail(err) })
+ client2.on('warning', function (err) { t.fail(err) })
+
+ // Start seeding
+ client2.seed(fixtures.leaves.content, {
+ name: 'Leaves of Grass by Walt Whitman.epub',
+ announce: []
+ })
+
+ client2.on('listening', function () {
+ // Start downloading
+ var torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore })
+
+ // Manually connect peers
+ torrent.addPeer('127.0.0.1:' + client2.address().port)
+
+ var order = 0
+
+ torrent.on('infoHash', function () {
+ t.equal(++order, 1)
+ })
+
+ torrent.on('metadata', function () {
+ t.equal(++order, 2)
+ })
+
+ torrent.on('ready', function () {
+ t.equal(++order, 3)
+ })
+
+ torrent.on('done', function () {
+ t.equal(++order, 4)
+
+ client1.destroy(function (err) { t.error(err, 'client 1 destroyed') })
+ client2.destroy(function (err) { t.error(err, 'client 2 destroyed') })
+ })
+ })
+})
+
+// Warning: slow test as we need to rely on connection timeouts
+test('client.conn-pool: fallback to TCP when uTP server failed', function (t) {
+ t.plan(6)
+
+ // force uTP server failure
+ const server = dgram.createSocket('udp4')
+ server.bind(63000)
+
+ var client1 = new WebTorrent({ dht: false, tracker: false, utp: true, torrentPort: 63000 })
+ var client2 = new WebTorrent({ dht: false, tracker: false, utp: false })
+
+ client1.on('error', function (err) { t.fail(err) })
+ client1.on('warning', function (err) { t.fail(err) })
+
+ client2.on('error', function (err) { t.fail(err) })
+ client2.on('warning', function (err) { t.fail(err) })
+
+ // Start seeding
+ client2.seed(fixtures.leaves.content, {
+ name: 'Leaves of Grass by Walt Whitman.epub',
+ announce: []
+ })
+
+ client2.on('listening', function () {
+ // Start downloading
+ var torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore })
+
+ // Manually connect peers
+ torrent.addPeer('127.0.0.1:' + client2.address().port)
+
+ var order = 0
+
+ torrent.on('infoHash', function () {
+ t.equal(++order, 1)
+ })
+
+ torrent.on('metadata', function () {
+ t.equal(++order, 2)
+ })
+
+ torrent.on('ready', function () {
+ t.equal(++order, 3)
+ })
+
+ torrent.on('done', function () {
+ t.equal(++order, 4)
+
+ client1.destroy(function (err) { t.error(err, 'client 1 destroyed') })
+ client2.destroy(function (err) { t.error(err, 'client 2 destroyed') })
+
+ server.close()
+ })
+ })
+})
+
+// Warning: slow test as we need to rely on connection timeouts
+test('client.conn-pool: fallback to TCP when remote client has uTP disabled', function (t) {
+ t.plan(6)
+
+ var client1 = new WebTorrent({ dht: false, tracker: false, utp: true })
+ var client2 = new WebTorrent({ dht: false, tracker: false, utp: false })
+
+ client1.on('error', function (err) { t.fail(err) })
+ client1.on('warning', function (err) { t.fail(err) })
+
+ client2.on('error', function (err) { t.fail(err) })
+ client2.on('warning', function (err) { t.fail(err) })
+
+ // Start seeding
+ client2.seed(fixtures.leaves.content, {
+ name: 'Leaves of Grass by Walt Whitman.epub',
+ announce: []
+ })
+
+ client2.on('listening', function () {
+ // Start downloading
+ var torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore })
+
+ // Manually connect peers
+ torrent.addPeer('127.0.0.1:' + client2.address().port)
+
+ var order = 0
+
+ torrent.on('infoHash', function () {
+ t.equal(++order, 1)
+ })
+
+ torrent.on('metadata', function () {
+ t.equal(++order, 2)
+ })
+
+ torrent.on('ready', function () {
+ t.equal(++order, 3)
+ })
+
+ torrent.on('done', function () {
+ t.equal(++order, 4)
+
+ client1.destroy(function (err) { t.error(err, 'client 1 destroyed') })
+ client2.destroy(function (err) { t.error(err, 'client 2 destroyed') })
+ })
+ })
+})