Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex <alxmorais8@msn.com>2021-07-24 01:05:25 +0300
committerGitHub <noreply@github.com>2021-07-24 01:05:25 +0300
commit39bb33c3cf694cdee45378ea4b30c66c93576d2a (patch)
tree654d454bbcc6bba1deab2195ccf6bcbc2bbe4f60
parent524618edde211a2ce2c9d1e40f68a091699442fd (diff)
feat: add speed limit to client (#2062)
* Add speed limit to client * Fix standard * Update docs/api.md * Add changes from PR feedback Co-authored-by: Kadu Diógenes <kadu@fnix.com.br> Co-authored-by: Ivan Gorbanev <ivang@van.work> Co-authored-by: ultimate-tester <jordimueters@hotmail.com> Co-authored-by: Julen Garcia Leunda <hicom150@gmail.com> Co-authored-by: Niklas Johansson <niklas.y.johansson@se.abb.com> Co-authored-by: ThaUnknown <kapi.skowronek@gmail.com> Co-authored-by: Diego Rodríguez Baquero <github@diegorbaquero.com>
-rw-r--r--docs/api.md23
-rw-r--r--index.js40
-rw-r--r--lib/conn-pool.js4
-rw-r--r--lib/peer.js65
-rw-r--r--lib/torrent.js61
-rw-r--r--package.json1
-rw-r--r--test/node/limit-download-upload.js82
-rw-r--r--test/node/limit-methods.js88
8 files changed, 323 insertions, 41 deletions
diff --git a/docs/api.md b/docs/api.md
index bbd3b3e..321d50f 100644
--- a/docs/api.md
+++ b/docs/api.md
@@ -60,8 +60,11 @@ If `opts` is specified, then the default options (shown below) will be overridde
dht: Boolean|Object, // Enable DHT (default=true), or options object for DHT
lsd: Boolean, // Enable BEP14 local service discovery (default=true)
webSeeds: Boolean, // Enable BEP19 web seeds (default=true)
+ utp: Boolean, // Enable BEP29 uTorrent transport protocol (default=true)
blocklist: Array|String, // List of IP's to block
utp: Boolean, // Enable BEP29 uTorrent transport protocol (default=true)
+ downloadLimit: Number, // Max download speed (bytes/sec) over all torrents (default=-1)
+ uploadLimit: Number, // Max upload speed (bytes/sec) over all torrents (default=-1)
}
```
@@ -74,6 +77,11 @@ For possible values of `opts.tracker` see the
For possible values of `opts.blocklist` see the
[`load-ip-set` documentation](https://github.com/webtorrent/load-ip-set#usage).
+For `downloadLimit` and `uploadLimit` the possible values can be:
+ - `> 0`. The client will set the throttle at that speed
+ - `0`. The client will block any data from being downloaded or uploaded
+ - `-1`. The client will is disable the throttling and use the whole bandwidth available
+
## `client.add(torrentId, [opts], [function ontorrent (torrent) {}])`
Start downloading a new torrent.
@@ -216,6 +224,19 @@ Total download progress for all **active** torrents, from 0 to 1.
Aggregate "seed ratio" for all torrents (uploaded / downloaded).
+## `client.throttleDownload(rate)`
+
+Sets the maximum speed at which the client downloads the torrents, in bytes/sec.
+
+`rate` must be bigger or equal than zero, or `-1` to disable the download throttle and
+use the whole bandwidth of the connection.
+
+## `client.throttleUpload(rate)`
+
+Sets the maximum speed at which the client uploads the torrents, in bytes/sec.
+
+`rate` must be bigger or equal than zero, or `-1` to disable the upload throttle and
+use the whole bandwidth of the connection.
# Torrent API
@@ -760,4 +781,4 @@ Peer's remote port. Only exists for tcp/utp peers.
## `wire.destroy()`
-Close the connection with the peer. This however doesn't prevent the peer from simply re-connecting. \ No newline at end of file
+Close the connection with the peer. This however doesn't prevent the peer from simply re-connecting.
diff --git a/index.js b/index.js
index e4a315b..e971298 100644
--- a/index.js
+++ b/index.js
@@ -11,9 +11,10 @@ const parallel = require('run-parallel')
const parseTorrent = require('parse-torrent')
const path = require('path')
const Peer = require('simple-peer')
+const queueMicrotask = require('queue-microtask')
const randombytes = require('randombytes')
const speedometer = require('speedometer')
-const queueMicrotask = require('queue-microtask')
+const { ThrottleGroup } = require('speed-limiter')
const ConnPool = require('./lib/conn-pool') // browser exclude
const Torrent = require('./lib/torrent')
@@ -76,11 +77,19 @@ class WebTorrent extends EventEmitter {
this.maxConns = Number(opts.maxConns) || 55
this.utp = WebTorrent.UTP_SUPPORT && opts.utp !== false
+ this._downloadLimit = Math.max(Number(opts.downloadLimit) || -1, -1)
+ this._uploadLimit = Math.max(Number(opts.uploadLimit) || -1, -1)
+
this._debug(
'new webtorrent (peerId %s, nodeId %s, port %s)',
this.peerId, this.nodeId, this.torrentPort
)
+ this.throttleGroups = {
+ down: new ThrottleGroup({ rate: Math.max(this._downloadLimit, 0), enabled: this._downloadLimit >= 0 }),
+ up: new ThrottleGroup({ rate: Math.max(this._uploadLimit, 0), enabled: this._uploadLimit >= 0 })
+ }
+
if (this.tracker) {
if (typeof this.tracker !== 'object') this.tracker = {}
if (global.WRTC && !this.tracker.wrtc) this.tracker.wrtc = global.WRTC
@@ -353,6 +362,32 @@ class WebTorrent extends EventEmitter {
}
/**
+ * Set global download throttle rate.
+ * @param {Number} rate (must be bigger or equal than zero, or -1 to disable throttling)
+ */
+ throttleDownload (rate) {
+ rate = Number(rate)
+ if (isNaN(rate) || !isFinite(rate) || rate < -1) return false
+ this._downloadLimit = rate
+ if (this._downloadLimit < 0) return this.throttleGroups.down.setEnabled(false)
+ this.throttleGroups.down.setEnabled(true)
+ this.throttleGroups.down.setRate(this._downloadLimit)
+ }
+
+ /**
+ * Set global upload throttle rate
+ * @param {Number} rate (must be bigger or equal than zero, or -1 to disable throttling)
+ */
+ throttleUpload (rate) {
+ rate = Number(rate)
+ if (isNaN(rate) || !isFinite(rate) || rate < -1) return false
+ this._uploadLimit = rate
+ if (this._uploadLimit < 0) return this.throttleGroups.up.setEnabled(false)
+ this.throttleGroups.up.setEnabled(true)
+ this.throttleGroups.up.setRate(this._uploadLimit)
+ }
+
+ /**
* Destroy the client, including all torrents and connections to peers.
* @param {function} cb
*/
@@ -388,6 +423,9 @@ class WebTorrent extends EventEmitter {
this.torrents = []
this._connPool = null
this.dht = null
+
+ this.throttleGroups.down.destroy()
+ this.throttleGroups.up.destroy()
}
_onListening () {
diff --git a/lib/conn-pool.js b/lib/conn-pool.js
index 004a4cd..9ef02a6 100644
--- a/lib/conn-pool.js
+++ b/lib/conn-pool.js
@@ -131,7 +131,9 @@ class ConnPool {
self._pendingConns.add(conn)
conn.once('close', cleanupPending)
- const peer = type === 'utp' ? Peer.createUTPIncomingPeer(conn) : Peer.createTCPIncomingPeer(conn)
+ const peer = type === 'utp'
+ ? Peer.createUTPIncomingPeer(conn, this._client.throttleGroups)
+ : Peer.createTCPIncomingPeer(conn, this._client.throttleGroups)
const wire = peer.wire
wire.once('handshake', onHandshake)
diff --git a/lib/peer.js b/lib/peer.js
index c7da3ad..dd048fc 100644
--- a/lib/peer.js
+++ b/lib/peer.js
@@ -1,3 +1,5 @@
+const { EventEmitter } = require('events')
+const { Transform } = require('stream')
const arrayRemove = require('unordered-array-remove')
const debug = require('debug')('webtorrent:peer')
const Wire = require('bittorrent-protocol')
@@ -12,10 +14,11 @@ const HANDSHAKE_TIMEOUT = 25000
* "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address
* that lets you refer to a WebRTC endpoint.
*/
-exports.createWebRTCPeer = (conn, swarm) => {
+exports.createWebRTCPeer = (conn, swarm, throttleGroups) => {
const peer = new Peer(conn.id, 'webrtc')
peer.conn = conn
peer.swarm = swarm
+ peer.throttleGroups = throttleGroups
if (peer.conn.connected) {
peer.onConnect()
@@ -45,42 +48,52 @@ exports.createWebRTCPeer = (conn, swarm) => {
* listening port of the TCP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
-exports.createTCPIncomingPeer = conn => _createIncomingPeer(conn, 'tcpIncoming')
+exports.createTCPIncomingPeer = (conn, throttleGroups) => {
+ return _createIncomingPeer(conn, 'tcpIncoming', throttleGroups)
+}
/**
* Incoming uTP peers start out connected, because the remote peer connected to the
* listening port of the uTP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
-exports.createUTPIncomingPeer = conn => _createIncomingPeer(conn, 'utpIncoming')
+exports.createUTPIncomingPeer = (conn, throttleGroups) => {
+ return _createIncomingPeer(conn, 'utpIncoming', throttleGroups)
+}
/**
* Outgoing TCP peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
-exports.createTCPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'tcpOutgoing')
+exports.createTCPOutgoingPeer = (addr, swarm, throttleGroups) => {
+ return _createOutgoingPeer(addr, swarm, 'tcpOutgoing', throttleGroups)
+}
/**
* Outgoing uTP peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
-exports.createUTPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'utpOutgoing')
+exports.createUTPOutgoingPeer = (addr, swarm, throttleGroups) => {
+ return _createOutgoingPeer(addr, swarm, 'utpOutgoing', throttleGroups)
+}
-const _createIncomingPeer = (conn, type) => {
+const _createIncomingPeer = (conn, type, throttleGroups) => {
const addr = `${conn.remoteAddress}:${conn.remotePort}`
const peer = new Peer(addr, type)
peer.conn = conn
peer.addr = addr
+ peer.throttleGroups = throttleGroups
peer.onConnect()
return peer
}
-const _createOutgoingPeer = (addr, swarm, type) => {
+const _createOutgoingPeer = (addr, swarm, type, throttleGroups) => {
const peer = new Peer(addr, type)
peer.addr = addr
peer.swarm = swarm
+ peer.throttleGroups = throttleGroups
return peer
}
@@ -88,10 +101,11 @@ const _createOutgoingPeer = (addr, swarm, type) => {
/**
* Peer that represents a Web Seed (BEP17 / BEP19).
*/
-exports.createWebSeedPeer = (conn, id, swarm) => {
+exports.createWebSeedPeer = (conn, id, swarm, throttleGroups) => {
const peer = new Peer(id, 'webSeed')
peer.swarm = swarm
peer.conn = conn
+ peer.throttleGroups = throttleGroups
peer.onConnect()
@@ -104,8 +118,10 @@ exports.createWebSeedPeer = (conn, id, swarm) => {
* @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds)
* @param {string} type the type of the peer
*/
-class Peer {
+class Peer extends EventEmitter {
constructor (id, type) {
+ super()
+
this.id = id
this.type = type
@@ -170,10 +186,39 @@ class Peer {
})
this.startHandshakeTimeout()
- conn.pipe(wire).pipe(conn)
+ this.setThrottlePipes()
+
if (this.swarm && !this.sentHandshake) this.handshake()
}
+ clearPipes () {
+ this.conn.unpipe()
+ this.wire.unpipe()
+ }
+
+ setThrottlePipes () {
+ const self = this
+ this.conn
+ .pipe(this.throttleGroups.down.throttle())
+ .pipe(new Transform({
+ transform (chunk, _, callback) {
+ self.emit('download', chunk.length)
+ if (self.destroyed) return
+ callback(null, chunk)
+ }
+ }))
+ .pipe(this.wire)
+ .pipe(this.throttleGroups.up.throttle())
+ .pipe(new Transform({
+ transform (chunk, _, callback) {
+ self.emit('upload', chunk.length)
+ if (self.destroyed) return
+ callback(null, chunk)
+ }
+ }))
+ .pipe(this.conn)
+ }
+
/**
* Called when handshake is received from remote peer.
* @param {string} infoHash
diff --git a/lib/torrent.js b/lib/torrent.js
index 0a473ba..7e98a8e 100644
--- a/lib/torrent.js
+++ b/lib/torrent.js
@@ -828,14 +828,15 @@ class Torrent extends EventEmitter {
let newPeer
if (typeof peer === 'string') {
// `peer` is an addr ("ip:port" string)
- newPeer = type === 'utp' ? Peer.createUTPOutgoingPeer(peer, this) : Peer.createTCPOutgoingPeer(peer, this)
+ newPeer = type === 'utp'
+ ? Peer.createUTPOutgoingPeer(peer, this, this.client.throttleGroups)
+ : Peer.createTCPOutgoingPeer(peer, this, this.client.throttleGroups)
} else {
// `peer` is a WebRTC connection (simple-peer)
- newPeer = Peer.createWebRTCPeer(peer, this)
+ newPeer = Peer.createWebRTCPeer(peer, this, this.client.throttleGroups)
}
- this._peers[newPeer.id] = newPeer
- this._peersLength += 1
+ this._registerPeer(newPeer)
if (typeof peer === 'string') {
// `peer` is an addr ("ip:port" string)
@@ -883,9 +884,9 @@ class Torrent extends EventEmitter {
this._debug('add web seed %s', id)
- const newPeer = Peer.createWebSeedPeer(conn, id, this)
- this._peers[newPeer.id] = newPeer
- this._peersLength += 1
+ const newPeer = Peer.createWebSeedPeer(conn, id, this, this.client.throttleGroups)
+
+ this._registerPeer(newPeer)
this.emit('peer', id)
}
@@ -900,7 +901,31 @@ class Torrent extends EventEmitter {
this._debug('add incoming peer %s', peer.id)
- this._peers[peer.id] = peer
+ this._registerPeer(peer)
+ }
+
+ _registerPeer (newPeer) {
+ newPeer.on('download', downloaded => {
+ if (this.destroyed) return
+ this.received += downloaded
+ this._downloadSpeed(downloaded)
+ this.client._downloadSpeed(downloaded)
+ this.emit('download', downloaded)
+ if (this.destroyed) return
+ this.client.emit('download', downloaded)
+ })
+
+ newPeer.on('upload', uploaded => {
+ if (this.destroyed) return
+ this.uploaded += uploaded
+ this._uploadSpeed(uploaded)
+ this.client._uploadSpeed(uploaded)
+ this.emit('upload', uploaded)
+ if (this.destroyed) return
+ this.client.emit('upload', uploaded)
+ })
+
+ this._peers[newPeer.id] = newPeer
this._peersLength += 1
}
@@ -976,26 +1001,6 @@ class Torrent extends EventEmitter {
_onWire (wire, addr) {
this._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown')
- wire.on('download', downloaded => {
- if (this.destroyed) return
- this.received += downloaded
- this._downloadSpeed(downloaded)
- this.client._downloadSpeed(downloaded)
- this.emit('download', downloaded)
- if (this.destroyed) return
- this.client.emit('download', downloaded)
- })
-
- wire.on('upload', uploaded => {
- if (this.destroyed) return
- this.uploaded += uploaded
- this._uploadSpeed(uploaded)
- this.client._uploadSpeed(uploaded)
- this.emit('upload', uploaded)
- if (this.destroyed) return
- this.client.emit('upload', uploaded)
- })
-
this.wires.push(wire)
if (addr) {
diff --git a/package.json b/package.json
index c189780..67dc8ff 100644
--- a/package.json
+++ b/package.json
@@ -71,6 +71,7 @@
"simple-get": "^4.0.0",
"simple-peer": "^9.11.0",
"simple-sha1": "^3.1.0",
+ "speed-limiter": "^1.0.0",
"speedometer": "^1.1.0",
"stream-to-blob": "^2.0.1",
"stream-to-blob-url": "^3.0.2",
diff --git a/test/node/limit-download-upload.js b/test/node/limit-download-upload.js
new file mode 100644
index 0000000..0f595ba
--- /dev/null
+++ b/test/node/limit-download-upload.js
@@ -0,0 +1,82 @@
+const fixtures = require('webtorrent-fixtures')
+const test = require('tape')
+const WebTorrent = require('../../')
+const MemoryChunkStore = require('memory-chunk-store')
+
+const DOWNLOAD_SPEED_LIMIT = 200 * 1000 // 200 KB/s
+const UPLOAD_SPEED_LIMIT = 200 * 1000 // 200 KB/s
+
+function testSpeed (t, downloaderOpts, uploaderOpts, cb) {
+ const client1 = new WebTorrent({ dht: false, tracker: false, ...downloaderOpts })
+ const client2 = new WebTorrent({ dht: false, tracker: false, ...uploaderOpts })
+
+ client1.on('error', err => { t.fail(err) })
+ client1.on('warning', err => { t.fail(err) })
+
+ client2.on('error', err => { t.fail(err) })
+ client2.on('warning', err => { t.fail(err) })
+
+ const downloadSpeeds = []
+ const uploadSpeeds = []
+
+ // Start seeding
+ client2.seed(fixtures.leaves.content, {
+ name: 'Leaves of Grass by Walt Whitman.epub',
+ announce: []
+ }, torrent => {
+ torrent.on('upload', () => {
+ uploadSpeeds.push(torrent.uploadSpeed)
+ })
+ })
+
+ client2.on('listening', () => {
+ // Start downloading
+ const torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore })
+
+ // Manually connect peers
+ torrent.addPeer(`127.0.0.1:${client2.address().port}`)
+
+ torrent.on('download', () => {
+ downloadSpeeds.push(torrent.downloadSpeed)
+ })
+
+ torrent.on('done', () => {
+ cb(downloadSpeeds, uploadSpeeds)
+
+ client1.destroy(err => { t.error(err, 'client 1 destroyed') })
+ client2.destroy(err => { t.error(err, 'client 2 destroyed') })
+ })
+ })
+}
+
+test('Limit download speed by constructor when tcp connection', t => {
+ t.plan(3)
+
+ testSpeed(t, { downloadLimit: DOWNLOAD_SPEED_LIMIT }, {}, downloadSpeeds => {
+ t.ok(downloadSpeeds.every(downloadSpeed => downloadSpeed <= DOWNLOAD_SPEED_LIMIT))
+ })
+})
+
+test('Limit upload speed by constructor when tcp connection', t => {
+ t.plan(3)
+
+ testSpeed(t, {}, { uploadLimit: UPLOAD_SPEED_LIMIT }, (_, uploadSpeeds) => {
+ t.ok(uploadSpeeds.every(uploadSpeed => uploadSpeed <= UPLOAD_SPEED_LIMIT))
+ })
+})
+
+test('Limit download speed by constructor when utp connection', t => {
+ t.plan(3)
+
+ testSpeed(t, { utp: true, downloadLimit: DOWNLOAD_SPEED_LIMIT }, { utp: true }, downloadSpeeds => {
+ t.ok(downloadSpeeds.every(downloadSpeed => downloadSpeed <= DOWNLOAD_SPEED_LIMIT))
+ })
+})
+
+test('Limit upload speed by constructor when utp connection', t => {
+ t.plan(3)
+
+ testSpeed(t, { utp: true }, { utp: true, uploadLimit: UPLOAD_SPEED_LIMIT }, (_, uploadSpeeds) => {
+ t.ok(uploadSpeeds.every(uploadSpeed => uploadSpeed <= UPLOAD_SPEED_LIMIT))
+ })
+})
diff --git a/test/node/limit-methods.js b/test/node/limit-methods.js
new file mode 100644
index 0000000..58ad573
--- /dev/null
+++ b/test/node/limit-methods.js
@@ -0,0 +1,88 @@
+const fixtures = require('webtorrent-fixtures')
+const test = require('tape')
+const WebTorrent = require('../../')
+const MemoryChunkStore = require('memory-chunk-store')
+
+const DOWNLOAD_SPEED_LIMIT = 200 * 1000 // 200 KB/s
+const UPLOAD_SPEED_LIMIT = 200 * 1000 // 200 KB/s
+
+function testSpeed (t, downloaderOpts, uploaderOpts, cb) {
+ const { downloadLimit, ...restDownloaderOpts } = downloaderOpts
+ const { uploadLimit, ...restUploaderOpts } = uploaderOpts
+
+ const client1 = new WebTorrent({ dht: false, tracker: false, ...restDownloaderOpts })
+ const client2 = new WebTorrent({ dht: false, tracker: false, ...restUploaderOpts })
+
+ if (downloadLimit) client1.throttleDownload(downloadLimit)
+ if (uploadLimit) client2.throttleUpload(uploadLimit)
+
+ client1.on('error', err => { t.fail(err) })
+ client1.on('warning', err => { t.fail(err) })
+
+ client2.on('error', err => { t.fail(err) })
+ client2.on('warning', err => { t.fail(err) })
+
+ const downloadSpeeds = []
+ const uploadSpeeds = []
+
+ // Start seeding
+ client2.seed(fixtures.leaves.content, {
+ name: 'Leaves of Grass by Walt Whitman.epub',
+ announce: []
+ }, torrent => {
+ torrent.on('upload', () => {
+ uploadSpeeds.push(torrent.uploadSpeed)
+ })
+ })
+
+ client2.on('listening', () => {
+ // Start downloading
+ const torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore })
+
+ // Manually connect peers
+ torrent.addPeer(`127.0.0.1:${client2.address().port}`)
+
+ torrent.on('download', () => {
+ downloadSpeeds.push(torrent.downloadSpeed)
+ })
+
+ torrent.on('done', () => {
+ cb(downloadSpeeds, uploadSpeeds)
+
+ client1.destroy(err => { t.error(err, 'client 1 destroyed') })
+ client2.destroy(err => { t.error(err, 'client 2 destroyed') })
+ })
+ })
+}
+
+test('Limit download speed by methods when tcp connection', t => {
+ t.plan(3)
+
+ testSpeed(t, { downloadLimit: DOWNLOAD_SPEED_LIMIT }, {}, downloadSpeeds => {
+ t.ok(downloadSpeeds.every(downloadSpeed => downloadSpeed <= DOWNLOAD_SPEED_LIMIT))
+ })
+})
+
+test('Limit upload speed by methods when tcp connection', t => {
+ t.plan(3)
+
+ testSpeed(t, {}, { uploadLimit: UPLOAD_SPEED_LIMIT }, (_, uploadSpeeds) => {
+ t.ok(uploadSpeeds.every(uploadSpeed => uploadSpeed <= UPLOAD_SPEED_LIMIT))
+ })
+})
+
+test('Limit download speed by methods when utp connection', t => {
+ t.plan(3)
+
+ testSpeed(t, { utp: true, downloadLimit: DOWNLOAD_SPEED_LIMIT }, { utp: true }, downloadSpeeds => {
+ t.ok(downloadSpeeds.every(downloadSpeed => downloadSpeed <= DOWNLOAD_SPEED_LIMIT))
+ })
+})
+
+test('Limit upload speed by methods when utp connection', t => {
+ t.plan(3)
+
+ testSpeed(t, { utp: true }, { utp: true, uploadLimit: UPLOAD_SPEED_LIMIT }, (_, uploadSpeeds) => {
+ t.ok(uploadSpeeds.every(uploadSpeed => uploadSpeed <= UPLOAD_SPEED_LIMIT))
+ })
+})