diff options
author | Alex <alxmorais8@msn.com> | 2021-07-24 01:05:25 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-24 01:05:25 +0300 |
commit | 39bb33c3cf694cdee45378ea4b30c66c93576d2a (patch) | |
tree | 654d454bbcc6bba1deab2195ccf6bcbc2bbe4f60 | |
parent | 524618edde211a2ce2c9d1e40f68a091699442fd (diff) |
feat: add speed limit to client (#2062)
* Add speed limit to client
* Fix standard
* Update docs/api.md
* Add changes from PR feedback
Co-authored-by: Kadu Diógenes <kadu@fnix.com.br>
Co-authored-by: Ivan Gorbanev <ivang@van.work>
Co-authored-by: ultimate-tester <jordimueters@hotmail.com>
Co-authored-by: Julen Garcia Leunda <hicom150@gmail.com>
Co-authored-by: Niklas Johansson <niklas.y.johansson@se.abb.com>
Co-authored-by: ThaUnknown <kapi.skowronek@gmail.com>
Co-authored-by: Diego RodrÃguez Baquero <github@diegorbaquero.com>
-rw-r--r-- | docs/api.md | 23 | ||||
-rw-r--r-- | index.js | 40 | ||||
-rw-r--r-- | lib/conn-pool.js | 4 | ||||
-rw-r--r-- | lib/peer.js | 65 | ||||
-rw-r--r-- | lib/torrent.js | 61 | ||||
-rw-r--r-- | package.json | 1 | ||||
-rw-r--r-- | test/node/limit-download-upload.js | 82 | ||||
-rw-r--r-- | test/node/limit-methods.js | 88 |
8 files changed, 323 insertions, 41 deletions
diff --git a/docs/api.md b/docs/api.md index bbd3b3e..321d50f 100644 --- a/docs/api.md +++ b/docs/api.md @@ -60,8 +60,11 @@ If `opts` is specified, then the default options (shown below) will be overridde dht: Boolean|Object, // Enable DHT (default=true), or options object for DHT lsd: Boolean, // Enable BEP14 local service discovery (default=true) webSeeds: Boolean, // Enable BEP19 web seeds (default=true) + utp: Boolean, // Enable BEP29 uTorrent transport protocol (default=true) blocklist: Array|String, // List of IP's to block utp: Boolean, // Enable BEP29 uTorrent transport protocol (default=true) + downloadLimit: Number, // Max download speed (bytes/sec) over all torrents (default=-1) + uploadLimit: Number, // Max upload speed (bytes/sec) over all torrents (default=-1) } ``` @@ -74,6 +77,11 @@ For possible values of `opts.tracker` see the For possible values of `opts.blocklist` see the [`load-ip-set` documentation](https://github.com/webtorrent/load-ip-set#usage). +For `downloadLimit` and `uploadLimit` the possible values can be: + - `> 0`. The client will set the throttle at that speed + - `0`. The client will block any data from being downloaded or uploaded + - `-1`. The client will is disable the throttling and use the whole bandwidth available + ## `client.add(torrentId, [opts], [function ontorrent (torrent) {}])` Start downloading a new torrent. @@ -216,6 +224,19 @@ Total download progress for all **active** torrents, from 0 to 1. Aggregate "seed ratio" for all torrents (uploaded / downloaded). +## `client.throttleDownload(rate)` + +Sets the maximum speed at which the client downloads the torrents, in bytes/sec. + +`rate` must be bigger or equal than zero, or `-1` to disable the download throttle and +use the whole bandwidth of the connection. + +## `client.throttleUpload(rate)` + +Sets the maximum speed at which the client uploads the torrents, in bytes/sec. + +`rate` must be bigger or equal than zero, or `-1` to disable the upload throttle and +use the whole bandwidth of the connection. # Torrent API @@ -760,4 +781,4 @@ Peer's remote port. Only exists for tcp/utp peers. ## `wire.destroy()` -Close the connection with the peer. This however doesn't prevent the peer from simply re-connecting.
\ No newline at end of file +Close the connection with the peer. This however doesn't prevent the peer from simply re-connecting. @@ -11,9 +11,10 @@ const parallel = require('run-parallel') const parseTorrent = require('parse-torrent') const path = require('path') const Peer = require('simple-peer') +const queueMicrotask = require('queue-microtask') const randombytes = require('randombytes') const speedometer = require('speedometer') -const queueMicrotask = require('queue-microtask') +const { ThrottleGroup } = require('speed-limiter') const ConnPool = require('./lib/conn-pool') // browser exclude const Torrent = require('./lib/torrent') @@ -76,11 +77,19 @@ class WebTorrent extends EventEmitter { this.maxConns = Number(opts.maxConns) || 55 this.utp = WebTorrent.UTP_SUPPORT && opts.utp !== false + this._downloadLimit = Math.max(Number(opts.downloadLimit) || -1, -1) + this._uploadLimit = Math.max(Number(opts.uploadLimit) || -1, -1) + this._debug( 'new webtorrent (peerId %s, nodeId %s, port %s)', this.peerId, this.nodeId, this.torrentPort ) + this.throttleGroups = { + down: new ThrottleGroup({ rate: Math.max(this._downloadLimit, 0), enabled: this._downloadLimit >= 0 }), + up: new ThrottleGroup({ rate: Math.max(this._uploadLimit, 0), enabled: this._uploadLimit >= 0 }) + } + if (this.tracker) { if (typeof this.tracker !== 'object') this.tracker = {} if (global.WRTC && !this.tracker.wrtc) this.tracker.wrtc = global.WRTC @@ -353,6 +362,32 @@ class WebTorrent extends EventEmitter { } /** + * Set global download throttle rate. + * @param {Number} rate (must be bigger or equal than zero, or -1 to disable throttling) + */ + throttleDownload (rate) { + rate = Number(rate) + if (isNaN(rate) || !isFinite(rate) || rate < -1) return false + this._downloadLimit = rate + if (this._downloadLimit < 0) return this.throttleGroups.down.setEnabled(false) + this.throttleGroups.down.setEnabled(true) + this.throttleGroups.down.setRate(this._downloadLimit) + } + + /** + * Set global upload throttle rate + * @param {Number} rate (must be bigger or equal than zero, or -1 to disable throttling) + */ + throttleUpload (rate) { + rate = Number(rate) + if (isNaN(rate) || !isFinite(rate) || rate < -1) return false + this._uploadLimit = rate + if (this._uploadLimit < 0) return this.throttleGroups.up.setEnabled(false) + this.throttleGroups.up.setEnabled(true) + this.throttleGroups.up.setRate(this._uploadLimit) + } + + /** * Destroy the client, including all torrents and connections to peers. * @param {function} cb */ @@ -388,6 +423,9 @@ class WebTorrent extends EventEmitter { this.torrents = [] this._connPool = null this.dht = null + + this.throttleGroups.down.destroy() + this.throttleGroups.up.destroy() } _onListening () { diff --git a/lib/conn-pool.js b/lib/conn-pool.js index 004a4cd..9ef02a6 100644 --- a/lib/conn-pool.js +++ b/lib/conn-pool.js @@ -131,7 +131,9 @@ class ConnPool { self._pendingConns.add(conn) conn.once('close', cleanupPending) - const peer = type === 'utp' ? Peer.createUTPIncomingPeer(conn) : Peer.createTCPIncomingPeer(conn) + const peer = type === 'utp' + ? Peer.createUTPIncomingPeer(conn, this._client.throttleGroups) + : Peer.createTCPIncomingPeer(conn, this._client.throttleGroups) const wire = peer.wire wire.once('handshake', onHandshake) diff --git a/lib/peer.js b/lib/peer.js index c7da3ad..dd048fc 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -1,3 +1,5 @@ +const { EventEmitter } = require('events') +const { Transform } = require('stream') const arrayRemove = require('unordered-array-remove') const debug = require('debug')('webtorrent:peer') const Wire = require('bittorrent-protocol') @@ -12,10 +14,11 @@ const HANDSHAKE_TIMEOUT = 25000 * "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address * that lets you refer to a WebRTC endpoint. */ -exports.createWebRTCPeer = (conn, swarm) => { +exports.createWebRTCPeer = (conn, swarm, throttleGroups) => { const peer = new Peer(conn.id, 'webrtc') peer.conn = conn peer.swarm = swarm + peer.throttleGroups = throttleGroups if (peer.conn.connected) { peer.onConnect() @@ -45,42 +48,52 @@ exports.createWebRTCPeer = (conn, swarm) => { * listening port of the TCP server. Until the remote peer sends a handshake, we don't * know what swarm the connection is intended for. */ -exports.createTCPIncomingPeer = conn => _createIncomingPeer(conn, 'tcpIncoming') +exports.createTCPIncomingPeer = (conn, throttleGroups) => { + return _createIncomingPeer(conn, 'tcpIncoming', throttleGroups) +} /** * Incoming uTP peers start out connected, because the remote peer connected to the * listening port of the uTP server. Until the remote peer sends a handshake, we don't * know what swarm the connection is intended for. */ -exports.createUTPIncomingPeer = conn => _createIncomingPeer(conn, 'utpIncoming') +exports.createUTPIncomingPeer = (conn, throttleGroups) => { + return _createIncomingPeer(conn, 'utpIncoming', throttleGroups) +} /** * Outgoing TCP peers start out with just an IP address. At some point (when there is an * available connection), the client can attempt to connect to the address. */ -exports.createTCPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'tcpOutgoing') +exports.createTCPOutgoingPeer = (addr, swarm, throttleGroups) => { + return _createOutgoingPeer(addr, swarm, 'tcpOutgoing', throttleGroups) +} /** * Outgoing uTP peers start out with just an IP address. At some point (when there is an * available connection), the client can attempt to connect to the address. */ -exports.createUTPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'utpOutgoing') +exports.createUTPOutgoingPeer = (addr, swarm, throttleGroups) => { + return _createOutgoingPeer(addr, swarm, 'utpOutgoing', throttleGroups) +} -const _createIncomingPeer = (conn, type) => { +const _createIncomingPeer = (conn, type, throttleGroups) => { const addr = `${conn.remoteAddress}:${conn.remotePort}` const peer = new Peer(addr, type) peer.conn = conn peer.addr = addr + peer.throttleGroups = throttleGroups peer.onConnect() return peer } -const _createOutgoingPeer = (addr, swarm, type) => { +const _createOutgoingPeer = (addr, swarm, type, throttleGroups) => { const peer = new Peer(addr, type) peer.addr = addr peer.swarm = swarm + peer.throttleGroups = throttleGroups return peer } @@ -88,10 +101,11 @@ const _createOutgoingPeer = (addr, swarm, type) => { /** * Peer that represents a Web Seed (BEP17 / BEP19). */ -exports.createWebSeedPeer = (conn, id, swarm) => { +exports.createWebSeedPeer = (conn, id, swarm, throttleGroups) => { const peer = new Peer(id, 'webSeed') peer.swarm = swarm peer.conn = conn + peer.throttleGroups = throttleGroups peer.onConnect() @@ -104,8 +118,10 @@ exports.createWebSeedPeer = (conn, id, swarm) => { * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds) * @param {string} type the type of the peer */ -class Peer { +class Peer extends EventEmitter { constructor (id, type) { + super() + this.id = id this.type = type @@ -170,10 +186,39 @@ class Peer { }) this.startHandshakeTimeout() - conn.pipe(wire).pipe(conn) + this.setThrottlePipes() + if (this.swarm && !this.sentHandshake) this.handshake() } + clearPipes () { + this.conn.unpipe() + this.wire.unpipe() + } + + setThrottlePipes () { + const self = this + this.conn + .pipe(this.throttleGroups.down.throttle()) + .pipe(new Transform({ + transform (chunk, _, callback) { + self.emit('download', chunk.length) + if (self.destroyed) return + callback(null, chunk) + } + })) + .pipe(this.wire) + .pipe(this.throttleGroups.up.throttle()) + .pipe(new Transform({ + transform (chunk, _, callback) { + self.emit('upload', chunk.length) + if (self.destroyed) return + callback(null, chunk) + } + })) + .pipe(this.conn) + } + /** * Called when handshake is received from remote peer. * @param {string} infoHash diff --git a/lib/torrent.js b/lib/torrent.js index 0a473ba..7e98a8e 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -828,14 +828,15 @@ class Torrent extends EventEmitter { let newPeer if (typeof peer === 'string') { // `peer` is an addr ("ip:port" string) - newPeer = type === 'utp' ? Peer.createUTPOutgoingPeer(peer, this) : Peer.createTCPOutgoingPeer(peer, this) + newPeer = type === 'utp' + ? Peer.createUTPOutgoingPeer(peer, this, this.client.throttleGroups) + : Peer.createTCPOutgoingPeer(peer, this, this.client.throttleGroups) } else { // `peer` is a WebRTC connection (simple-peer) - newPeer = Peer.createWebRTCPeer(peer, this) + newPeer = Peer.createWebRTCPeer(peer, this, this.client.throttleGroups) } - this._peers[newPeer.id] = newPeer - this._peersLength += 1 + this._registerPeer(newPeer) if (typeof peer === 'string') { // `peer` is an addr ("ip:port" string) @@ -883,9 +884,9 @@ class Torrent extends EventEmitter { this._debug('add web seed %s', id) - const newPeer = Peer.createWebSeedPeer(conn, id, this) - this._peers[newPeer.id] = newPeer - this._peersLength += 1 + const newPeer = Peer.createWebSeedPeer(conn, id, this, this.client.throttleGroups) + + this._registerPeer(newPeer) this.emit('peer', id) } @@ -900,7 +901,31 @@ class Torrent extends EventEmitter { this._debug('add incoming peer %s', peer.id) - this._peers[peer.id] = peer + this._registerPeer(peer) + } + + _registerPeer (newPeer) { + newPeer.on('download', downloaded => { + if (this.destroyed) return + this.received += downloaded + this._downloadSpeed(downloaded) + this.client._downloadSpeed(downloaded) + this.emit('download', downloaded) + if (this.destroyed) return + this.client.emit('download', downloaded) + }) + + newPeer.on('upload', uploaded => { + if (this.destroyed) return + this.uploaded += uploaded + this._uploadSpeed(uploaded) + this.client._uploadSpeed(uploaded) + this.emit('upload', uploaded) + if (this.destroyed) return + this.client.emit('upload', uploaded) + }) + + this._peers[newPeer.id] = newPeer this._peersLength += 1 } @@ -976,26 +1001,6 @@ class Torrent extends EventEmitter { _onWire (wire, addr) { this._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown') - wire.on('download', downloaded => { - if (this.destroyed) return - this.received += downloaded - this._downloadSpeed(downloaded) - this.client._downloadSpeed(downloaded) - this.emit('download', downloaded) - if (this.destroyed) return - this.client.emit('download', downloaded) - }) - - wire.on('upload', uploaded => { - if (this.destroyed) return - this.uploaded += uploaded - this._uploadSpeed(uploaded) - this.client._uploadSpeed(uploaded) - this.emit('upload', uploaded) - if (this.destroyed) return - this.client.emit('upload', uploaded) - }) - this.wires.push(wire) if (addr) { diff --git a/package.json b/package.json index c189780..67dc8ff 100644 --- a/package.json +++ b/package.json @@ -71,6 +71,7 @@ "simple-get": "^4.0.0", "simple-peer": "^9.11.0", "simple-sha1": "^3.1.0", + "speed-limiter": "^1.0.0", "speedometer": "^1.1.0", "stream-to-blob": "^2.0.1", "stream-to-blob-url": "^3.0.2", diff --git a/test/node/limit-download-upload.js b/test/node/limit-download-upload.js new file mode 100644 index 0000000..0f595ba --- /dev/null +++ b/test/node/limit-download-upload.js @@ -0,0 +1,82 @@ +const fixtures = require('webtorrent-fixtures') +const test = require('tape') +const WebTorrent = require('../../') +const MemoryChunkStore = require('memory-chunk-store') + +const DOWNLOAD_SPEED_LIMIT = 200 * 1000 // 200 KB/s +const UPLOAD_SPEED_LIMIT = 200 * 1000 // 200 KB/s + +function testSpeed (t, downloaderOpts, uploaderOpts, cb) { + const client1 = new WebTorrent({ dht: false, tracker: false, ...downloaderOpts }) + const client2 = new WebTorrent({ dht: false, tracker: false, ...uploaderOpts }) + + client1.on('error', err => { t.fail(err) }) + client1.on('warning', err => { t.fail(err) }) + + client2.on('error', err => { t.fail(err) }) + client2.on('warning', err => { t.fail(err) }) + + const downloadSpeeds = [] + const uploadSpeeds = [] + + // Start seeding + client2.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }, torrent => { + torrent.on('upload', () => { + uploadSpeeds.push(torrent.uploadSpeed) + }) + }) + + client2.on('listening', () => { + // Start downloading + const torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore }) + + // Manually connect peers + torrent.addPeer(`127.0.0.1:${client2.address().port}`) + + torrent.on('download', () => { + downloadSpeeds.push(torrent.downloadSpeed) + }) + + torrent.on('done', () => { + cb(downloadSpeeds, uploadSpeeds) + + client1.destroy(err => { t.error(err, 'client 1 destroyed') }) + client2.destroy(err => { t.error(err, 'client 2 destroyed') }) + }) + }) +} + +test('Limit download speed by constructor when tcp connection', t => { + t.plan(3) + + testSpeed(t, { downloadLimit: DOWNLOAD_SPEED_LIMIT }, {}, downloadSpeeds => { + t.ok(downloadSpeeds.every(downloadSpeed => downloadSpeed <= DOWNLOAD_SPEED_LIMIT)) + }) +}) + +test('Limit upload speed by constructor when tcp connection', t => { + t.plan(3) + + testSpeed(t, {}, { uploadLimit: UPLOAD_SPEED_LIMIT }, (_, uploadSpeeds) => { + t.ok(uploadSpeeds.every(uploadSpeed => uploadSpeed <= UPLOAD_SPEED_LIMIT)) + }) +}) + +test('Limit download speed by constructor when utp connection', t => { + t.plan(3) + + testSpeed(t, { utp: true, downloadLimit: DOWNLOAD_SPEED_LIMIT }, { utp: true }, downloadSpeeds => { + t.ok(downloadSpeeds.every(downloadSpeed => downloadSpeed <= DOWNLOAD_SPEED_LIMIT)) + }) +}) + +test('Limit upload speed by constructor when utp connection', t => { + t.plan(3) + + testSpeed(t, { utp: true }, { utp: true, uploadLimit: UPLOAD_SPEED_LIMIT }, (_, uploadSpeeds) => { + t.ok(uploadSpeeds.every(uploadSpeed => uploadSpeed <= UPLOAD_SPEED_LIMIT)) + }) +}) diff --git a/test/node/limit-methods.js b/test/node/limit-methods.js new file mode 100644 index 0000000..58ad573 --- /dev/null +++ b/test/node/limit-methods.js @@ -0,0 +1,88 @@ +const fixtures = require('webtorrent-fixtures') +const test = require('tape') +const WebTorrent = require('../../') +const MemoryChunkStore = require('memory-chunk-store') + +const DOWNLOAD_SPEED_LIMIT = 200 * 1000 // 200 KB/s +const UPLOAD_SPEED_LIMIT = 200 * 1000 // 200 KB/s + +function testSpeed (t, downloaderOpts, uploaderOpts, cb) { + const { downloadLimit, ...restDownloaderOpts } = downloaderOpts + const { uploadLimit, ...restUploaderOpts } = uploaderOpts + + const client1 = new WebTorrent({ dht: false, tracker: false, ...restDownloaderOpts }) + const client2 = new WebTorrent({ dht: false, tracker: false, ...restUploaderOpts }) + + if (downloadLimit) client1.throttleDownload(downloadLimit) + if (uploadLimit) client2.throttleUpload(uploadLimit) + + client1.on('error', err => { t.fail(err) }) + client1.on('warning', err => { t.fail(err) }) + + client2.on('error', err => { t.fail(err) }) + client2.on('warning', err => { t.fail(err) }) + + const downloadSpeeds = [] + const uploadSpeeds = [] + + // Start seeding + client2.seed(fixtures.leaves.content, { + name: 'Leaves of Grass by Walt Whitman.epub', + announce: [] + }, torrent => { + torrent.on('upload', () => { + uploadSpeeds.push(torrent.uploadSpeed) + }) + }) + + client2.on('listening', () => { + // Start downloading + const torrent = client1.add(fixtures.leaves.parsedTorrent.infoHash, { store: MemoryChunkStore }) + + // Manually connect peers + torrent.addPeer(`127.0.0.1:${client2.address().port}`) + + torrent.on('download', () => { + downloadSpeeds.push(torrent.downloadSpeed) + }) + + torrent.on('done', () => { + cb(downloadSpeeds, uploadSpeeds) + + client1.destroy(err => { t.error(err, 'client 1 destroyed') }) + client2.destroy(err => { t.error(err, 'client 2 destroyed') }) + }) + }) +} + +test('Limit download speed by methods when tcp connection', t => { + t.plan(3) + + testSpeed(t, { downloadLimit: DOWNLOAD_SPEED_LIMIT }, {}, downloadSpeeds => { + t.ok(downloadSpeeds.every(downloadSpeed => downloadSpeed <= DOWNLOAD_SPEED_LIMIT)) + }) +}) + +test('Limit upload speed by methods when tcp connection', t => { + t.plan(3) + + testSpeed(t, {}, { uploadLimit: UPLOAD_SPEED_LIMIT }, (_, uploadSpeeds) => { + t.ok(uploadSpeeds.every(uploadSpeed => uploadSpeed <= UPLOAD_SPEED_LIMIT)) + }) +}) + +test('Limit download speed by methods when utp connection', t => { + t.plan(3) + + testSpeed(t, { utp: true, downloadLimit: DOWNLOAD_SPEED_LIMIT }, { utp: true }, downloadSpeeds => { + t.ok(downloadSpeeds.every(downloadSpeed => downloadSpeed <= DOWNLOAD_SPEED_LIMIT)) + }) +}) + +test('Limit upload speed by methods when utp connection', t => { + t.plan(3) + + testSpeed(t, { utp: true }, { utp: true, uploadLimit: UPLOAD_SPEED_LIMIT }, (_, uploadSpeeds) => { + t.ok(uploadSpeeds.every(uploadSpeed => uploadSpeed <= UPLOAD_SPEED_LIMIT)) + }) +}) |