diff options
author | Alex <alxmorais8@msn.com> | 2021-07-24 01:05:25 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-24 01:05:25 +0300 |
commit | 39bb33c3cf694cdee45378ea4b30c66c93576d2a (patch) | |
tree | 654d454bbcc6bba1deab2195ccf6bcbc2bbe4f60 /lib/peer.js | |
parent | 524618edde211a2ce2c9d1e40f68a091699442fd (diff) |
feat: add speed limit to client (#2062)
* Add speed limit to client
* Fix standard
* Update docs/api.md
* Add changes from PR feedback
Co-authored-by: Kadu Diógenes <kadu@fnix.com.br>
Co-authored-by: Ivan Gorbanev <ivang@van.work>
Co-authored-by: ultimate-tester <jordimueters@hotmail.com>
Co-authored-by: Julen Garcia Leunda <hicom150@gmail.com>
Co-authored-by: Niklas Johansson <niklas.y.johansson@se.abb.com>
Co-authored-by: ThaUnknown <kapi.skowronek@gmail.com>
Co-authored-by: Diego RodrÃguez Baquero <github@diegorbaquero.com>
Diffstat (limited to 'lib/peer.js')
-rw-r--r-- | lib/peer.js | 65 |
1 files changed, 55 insertions, 10 deletions
diff --git a/lib/peer.js b/lib/peer.js index c7da3ad..dd048fc 100644 --- a/lib/peer.js +++ b/lib/peer.js @@ -1,3 +1,5 @@ +const { EventEmitter } = require('events') +const { Transform } = require('stream') const arrayRemove = require('unordered-array-remove') const debug = require('debug')('webtorrent:peer') const Wire = require('bittorrent-protocol') @@ -12,10 +14,11 @@ const HANDSHAKE_TIMEOUT = 25000 * "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address * that lets you refer to a WebRTC endpoint. */ -exports.createWebRTCPeer = (conn, swarm) => { +exports.createWebRTCPeer = (conn, swarm, throttleGroups) => { const peer = new Peer(conn.id, 'webrtc') peer.conn = conn peer.swarm = swarm + peer.throttleGroups = throttleGroups if (peer.conn.connected) { peer.onConnect() @@ -45,42 +48,52 @@ exports.createWebRTCPeer = (conn, swarm) => { * listening port of the TCP server. Until the remote peer sends a handshake, we don't * know what swarm the connection is intended for. */ -exports.createTCPIncomingPeer = conn => _createIncomingPeer(conn, 'tcpIncoming') +exports.createTCPIncomingPeer = (conn, throttleGroups) => { + return _createIncomingPeer(conn, 'tcpIncoming', throttleGroups) +} /** * Incoming uTP peers start out connected, because the remote peer connected to the * listening port of the uTP server. Until the remote peer sends a handshake, we don't * know what swarm the connection is intended for. */ -exports.createUTPIncomingPeer = conn => _createIncomingPeer(conn, 'utpIncoming') +exports.createUTPIncomingPeer = (conn, throttleGroups) => { + return _createIncomingPeer(conn, 'utpIncoming', throttleGroups) +} /** * Outgoing TCP peers start out with just an IP address. At some point (when there is an * available connection), the client can attempt to connect to the address. */ -exports.createTCPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'tcpOutgoing') +exports.createTCPOutgoingPeer = (addr, swarm, throttleGroups) => { + return _createOutgoingPeer(addr, swarm, 'tcpOutgoing', throttleGroups) +} /** * Outgoing uTP peers start out with just an IP address. At some point (when there is an * available connection), the client can attempt to connect to the address. */ -exports.createUTPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'utpOutgoing') +exports.createUTPOutgoingPeer = (addr, swarm, throttleGroups) => { + return _createOutgoingPeer(addr, swarm, 'utpOutgoing', throttleGroups) +} -const _createIncomingPeer = (conn, type) => { +const _createIncomingPeer = (conn, type, throttleGroups) => { const addr = `${conn.remoteAddress}:${conn.remotePort}` const peer = new Peer(addr, type) peer.conn = conn peer.addr = addr + peer.throttleGroups = throttleGroups peer.onConnect() return peer } -const _createOutgoingPeer = (addr, swarm, type) => { +const _createOutgoingPeer = (addr, swarm, type, throttleGroups) => { const peer = new Peer(addr, type) peer.addr = addr peer.swarm = swarm + peer.throttleGroups = throttleGroups return peer } @@ -88,10 +101,11 @@ const _createOutgoingPeer = (addr, swarm, type) => { /** * Peer that represents a Web Seed (BEP17 / BEP19). */ -exports.createWebSeedPeer = (conn, id, swarm) => { +exports.createWebSeedPeer = (conn, id, swarm, throttleGroups) => { const peer = new Peer(id, 'webSeed') peer.swarm = swarm peer.conn = conn + peer.throttleGroups = throttleGroups peer.onConnect() @@ -104,8 +118,10 @@ exports.createWebSeedPeer = (conn, id, swarm) => { * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds) * @param {string} type the type of the peer */ -class Peer { +class Peer extends EventEmitter { constructor (id, type) { + super() + this.id = id this.type = type @@ -170,10 +186,39 @@ class Peer { }) this.startHandshakeTimeout() - conn.pipe(wire).pipe(conn) + this.setThrottlePipes() + if (this.swarm && !this.sentHandshake) this.handshake() } + clearPipes () { + this.conn.unpipe() + this.wire.unpipe() + } + + setThrottlePipes () { + const self = this + this.conn + .pipe(this.throttleGroups.down.throttle()) + .pipe(new Transform({ + transform (chunk, _, callback) { + self.emit('download', chunk.length) + if (self.destroyed) return + callback(null, chunk) + } + })) + .pipe(this.wire) + .pipe(this.throttleGroups.up.throttle()) + .pipe(new Transform({ + transform (chunk, _, callback) { + self.emit('upload', chunk.length) + if (self.destroyed) return + callback(null, chunk) + } + })) + .pipe(this.conn) + } + /** * Called when handshake is received from remote peer. * @param {string} infoHash |