From 39bb33c3cf694cdee45378ea4b30c66c93576d2a Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 24 Jul 2021 00:05:25 +0200 Subject: feat: add speed limit to client (#2062) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add speed limit to client * Fix standard * Update docs/api.md * Add changes from PR feedback Co-authored-by: Kadu Diógenes Co-authored-by: Ivan Gorbanev Co-authored-by: ultimate-tester Co-authored-by: Julen Garcia Leunda Co-authored-by: Niklas Johansson Co-authored-by: ThaUnknown Co-authored-by: Diego Rodríguez Baquero --- lib/conn-pool.js | 4 +++- lib/peer.js | 65 +++++++++++++++++++++++++++++++++++++++++++++++--------- lib/torrent.js | 61 ++++++++++++++++++++++++++++------------------------ 3 files changed, 91 insertions(+), 39 deletions(-) (limited to 'lib') diff --git a/lib/conn-pool.js b/lib/conn-pool.js index 004a4cd..9ef02a6 100644 --- a/lib/conn-pool.js +++ b/lib/conn-pool.js @@ -131,7 +131,9 @@ class ConnPool { self._pendingConns.add(conn) conn.once('close', cleanupPending) - const peer = type === 'utp' ? Peer.createUTPIncomingPeer(conn) : Peer.createTCPIncomingPeer(conn) + const peer = type === 'utp' + ? Peer.createUTPIncomingPeer(conn, this._client.throttleGroups) + : Peer.createTCPIncomingPeer(conn, this._client.throttleGroups) const wire = peer.wire wire.once('handshake', onHandshake) diff --git a/lib/peer.js b/lib/peer.js index c7da3ad..dd048fc 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -1,3 +1,5 @@ +const { EventEmitter } = require('events') +const { Transform } = require('stream') const arrayRemove = require('unordered-array-remove') const debug = require('debug')('webtorrent:peer') const Wire = require('bittorrent-protocol') @@ -12,10 +14,11 @@ const HANDSHAKE_TIMEOUT = 25000 * "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address * that lets you refer to a WebRTC endpoint. */ -exports.createWebRTCPeer = (conn, swarm) => { +exports.createWebRTCPeer = (conn, swarm, throttleGroups) => { const peer = new Peer(conn.id, 'webrtc') peer.conn = conn peer.swarm = swarm + peer.throttleGroups = throttleGroups if (peer.conn.connected) { peer.onConnect() @@ -45,42 +48,52 @@ exports.createWebRTCPeer = (conn, swarm) => { * listening port of the TCP server. Until the remote peer sends a handshake, we don't * know what swarm the connection is intended for. */ -exports.createTCPIncomingPeer = conn => _createIncomingPeer(conn, 'tcpIncoming') +exports.createTCPIncomingPeer = (conn, throttleGroups) => { + return _createIncomingPeer(conn, 'tcpIncoming', throttleGroups) +} /** * Incoming uTP peers start out connected, because the remote peer connected to the * listening port of the uTP server. Until the remote peer sends a handshake, we don't * know what swarm the connection is intended for. */ -exports.createUTPIncomingPeer = conn => _createIncomingPeer(conn, 'utpIncoming') +exports.createUTPIncomingPeer = (conn, throttleGroups) => { + return _createIncomingPeer(conn, 'utpIncoming', throttleGroups) +} /** * Outgoing TCP peers start out with just an IP address. At some point (when there is an * available connection), the client can attempt to connect to the address. */ -exports.createTCPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'tcpOutgoing') +exports.createTCPOutgoingPeer = (addr, swarm, throttleGroups) => { + return _createOutgoingPeer(addr, swarm, 'tcpOutgoing', throttleGroups) +} /** * Outgoing uTP peers start out with just an IP address. At some point (when there is an * available connection), the client can attempt to connect to the address. */ -exports.createUTPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'utpOutgoing') +exports.createUTPOutgoingPeer = (addr, swarm, throttleGroups) => { + return _createOutgoingPeer(addr, swarm, 'utpOutgoing', throttleGroups) +} -const _createIncomingPeer = (conn, type) => { +const _createIncomingPeer = (conn, type, throttleGroups) => { const addr = `${conn.remoteAddress}:${conn.remotePort}` const peer = new Peer(addr, type) peer.conn = conn peer.addr = addr + peer.throttleGroups = throttleGroups peer.onConnect() return peer } -const _createOutgoingPeer = (addr, swarm, type) => { +const _createOutgoingPeer = (addr, swarm, type, throttleGroups) => { const peer = new Peer(addr, type) peer.addr = addr peer.swarm = swarm + peer.throttleGroups = throttleGroups return peer } @@ -88,10 +101,11 @@ const _createOutgoingPeer = (addr, swarm, type) => { /** * Peer that represents a Web Seed (BEP17 / BEP19). */ -exports.createWebSeedPeer = (conn, id, swarm) => { +exports.createWebSeedPeer = (conn, id, swarm, throttleGroups) => { const peer = new Peer(id, 'webSeed') peer.swarm = swarm peer.conn = conn + peer.throttleGroups = throttleGroups peer.onConnect() @@ -104,8 +118,10 @@ exports.createWebSeedPeer = (conn, id, swarm) => { * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds) * @param {string} type the type of the peer */ -class Peer { +class Peer extends EventEmitter { constructor (id, type) { + super() + this.id = id this.type = type @@ -170,10 +186,39 @@ class Peer { }) this.startHandshakeTimeout() - conn.pipe(wire).pipe(conn) + this.setThrottlePipes() + if (this.swarm && !this.sentHandshake) this.handshake() } + clearPipes () { + this.conn.unpipe() + this.wire.unpipe() + } + + setThrottlePipes () { + const self = this + this.conn + .pipe(this.throttleGroups.down.throttle()) + .pipe(new Transform({ + transform (chunk, _, callback) { + self.emit('download', chunk.length) + if (self.destroyed) return + callback(null, chunk) + } + })) + .pipe(this.wire) + .pipe(this.throttleGroups.up.throttle()) + .pipe(new Transform({ + transform (chunk, _, callback) { + self.emit('upload', chunk.length) + if (self.destroyed) return + callback(null, chunk) + } + })) + .pipe(this.conn) + } + /** * Called when handshake is received from remote peer. * @param {string} infoHash diff --git a/lib/torrent.js b/lib/torrent.js index 0a473ba..7e98a8e 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -828,14 +828,15 @@ class Torrent extends EventEmitter { let newPeer if (typeof peer === 'string') { // `peer` is an addr ("ip:port" string) - newPeer = type === 'utp' ? Peer.createUTPOutgoingPeer(peer, this) : Peer.createTCPOutgoingPeer(peer, this) + newPeer = type === 'utp' + ? Peer.createUTPOutgoingPeer(peer, this, this.client.throttleGroups) + : Peer.createTCPOutgoingPeer(peer, this, this.client.throttleGroups) } else { // `peer` is a WebRTC connection (simple-peer) - newPeer = Peer.createWebRTCPeer(peer, this) + newPeer = Peer.createWebRTCPeer(peer, this, this.client.throttleGroups) } - this._peers[newPeer.id] = newPeer - this._peersLength += 1 + this._registerPeer(newPeer) if (typeof peer === 'string') { // `peer` is an addr ("ip:port" string) @@ -883,9 +884,9 @@ class Torrent extends EventEmitter { this._debug('add web seed %s', id) - const newPeer = Peer.createWebSeedPeer(conn, id, this) - this._peers[newPeer.id] = newPeer - this._peersLength += 1 + const newPeer = Peer.createWebSeedPeer(conn, id, this, this.client.throttleGroups) + + this._registerPeer(newPeer) this.emit('peer', id) } @@ -900,7 +901,31 @@ class Torrent extends EventEmitter { this._debug('add incoming peer %s', peer.id) - this._peers[peer.id] = peer + this._registerPeer(peer) + } + + _registerPeer (newPeer) { + newPeer.on('download', downloaded => { + if (this.destroyed) return + this.received += downloaded + this._downloadSpeed(downloaded) + this.client._downloadSpeed(downloaded) + this.emit('download', downloaded) + if (this.destroyed) return + this.client.emit('download', downloaded) + }) + + newPeer.on('upload', uploaded => { + if (this.destroyed) return + this.uploaded += uploaded + this._uploadSpeed(uploaded) + this.client._uploadSpeed(uploaded) + this.emit('upload', uploaded) + if (this.destroyed) return + this.client.emit('upload', uploaded) + }) + + this._peers[newPeer.id] = newPeer this._peersLength += 1 } @@ -976,26 +1001,6 @@ class Torrent extends EventEmitter { _onWire (wire, addr) { this._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown') - wire.on('download', downloaded => { - if (this.destroyed) return - this.received += downloaded - this._downloadSpeed(downloaded) - this.client._downloadSpeed(downloaded) - this.emit('download', downloaded) - if (this.destroyed) return - this.client.emit('download', downloaded) - }) - - wire.on('upload', uploaded => { - if (this.destroyed) return - this.uploaded += uploaded - this._uploadSpeed(uploaded) - this.client._uploadSpeed(uploaded) - this.emit('upload', uploaded) - if (this.destroyed) return - this.client.emit('upload', uploaded) - }) - this.wires.push(wire) if (addr) { -- cgit v1.2.3