Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorAlex <alxmorais8@msn.com>2021-07-24 01:05:25 +0300
committerGitHub <noreply@github.com>2021-07-24 01:05:25 +0300
commit39bb33c3cf694cdee45378ea4b30c66c93576d2a (patch)
tree654d454bbcc6bba1deab2195ccf6bcbc2bbe4f60 /lib
parent524618edde211a2ce2c9d1e40f68a091699442fd (diff)
feat: add speed limit to client (#2062)
* Add speed limit to client * Fix standard * Update docs/api.md * Add changes from PR feedback Co-authored-by: Kadu Diógenes <kadu@fnix.com.br> Co-authored-by: Ivan Gorbanev <ivang@van.work> Co-authored-by: ultimate-tester <jordimueters@hotmail.com> Co-authored-by: Julen Garcia Leunda <hicom150@gmail.com> Co-authored-by: Niklas Johansson <niklas.y.johansson@se.abb.com> Co-authored-by: ThaUnknown <kapi.skowronek@gmail.com> Co-authored-by: Diego Rodríguez Baquero <github@diegorbaquero.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/conn-pool.js4
-rw-r--r--lib/peer.js65
-rw-r--r--lib/torrent.js61
3 files changed, 91 insertions, 39 deletions
diff --git a/lib/conn-pool.js b/lib/conn-pool.js
index 004a4cd..9ef02a6 100644
--- a/lib/conn-pool.js
+++ b/lib/conn-pool.js
@@ -131,7 +131,9 @@ class ConnPool {
self._pendingConns.add(conn)
conn.once('close', cleanupPending)
- const peer = type === 'utp' ? Peer.createUTPIncomingPeer(conn) : Peer.createTCPIncomingPeer(conn)
+ const peer = type === 'utp'
+ ? Peer.createUTPIncomingPeer(conn, this._client.throttleGroups)
+ : Peer.createTCPIncomingPeer(conn, this._client.throttleGroups)
const wire = peer.wire
wire.once('handshake', onHandshake)
diff --git a/lib/peer.js b/lib/peer.js
index c7da3ad..dd048fc 100644
--- a/lib/peer.js
+++ b/lib/peer.js
@@ -1,3 +1,5 @@
+const { EventEmitter } = require('events')
+const { Transform } = require('stream')
const arrayRemove = require('unordered-array-remove')
const debug = require('debug')('webtorrent:peer')
const Wire = require('bittorrent-protocol')
@@ -12,10 +14,11 @@ const HANDSHAKE_TIMEOUT = 25000
* "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address
* that lets you refer to a WebRTC endpoint.
*/
-exports.createWebRTCPeer = (conn, swarm) => {
+exports.createWebRTCPeer = (conn, swarm, throttleGroups) => {
const peer = new Peer(conn.id, 'webrtc')
peer.conn = conn
peer.swarm = swarm
+ peer.throttleGroups = throttleGroups
if (peer.conn.connected) {
peer.onConnect()
@@ -45,42 +48,52 @@ exports.createWebRTCPeer = (conn, swarm) => {
* listening port of the TCP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
-exports.createTCPIncomingPeer = conn => _createIncomingPeer(conn, 'tcpIncoming')
+exports.createTCPIncomingPeer = (conn, throttleGroups) => {
+ return _createIncomingPeer(conn, 'tcpIncoming', throttleGroups)
+}
/**
* Incoming uTP peers start out connected, because the remote peer connected to the
* listening port of the uTP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
-exports.createUTPIncomingPeer = conn => _createIncomingPeer(conn, 'utpIncoming')
+exports.createUTPIncomingPeer = (conn, throttleGroups) => {
+ return _createIncomingPeer(conn, 'utpIncoming', throttleGroups)
+}
/**
* Outgoing TCP peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
-exports.createTCPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'tcpOutgoing')
+exports.createTCPOutgoingPeer = (addr, swarm, throttleGroups) => {
+ return _createOutgoingPeer(addr, swarm, 'tcpOutgoing', throttleGroups)
+}
/**
* Outgoing uTP peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
-exports.createUTPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'utpOutgoing')
+exports.createUTPOutgoingPeer = (addr, swarm, throttleGroups) => {
+ return _createOutgoingPeer(addr, swarm, 'utpOutgoing', throttleGroups)
+}
-const _createIncomingPeer = (conn, type) => {
+const _createIncomingPeer = (conn, type, throttleGroups) => {
const addr = `${conn.remoteAddress}:${conn.remotePort}`
const peer = new Peer(addr, type)
peer.conn = conn
peer.addr = addr
+ peer.throttleGroups = throttleGroups
peer.onConnect()
return peer
}
-const _createOutgoingPeer = (addr, swarm, type) => {
+const _createOutgoingPeer = (addr, swarm, type, throttleGroups) => {
const peer = new Peer(addr, type)
peer.addr = addr
peer.swarm = swarm
+ peer.throttleGroups = throttleGroups
return peer
}
@@ -88,10 +101,11 @@ const _createOutgoingPeer = (addr, swarm, type) => {
/**
* Peer that represents a Web Seed (BEP17 / BEP19).
*/
-exports.createWebSeedPeer = (conn, id, swarm) => {
+exports.createWebSeedPeer = (conn, id, swarm, throttleGroups) => {
const peer = new Peer(id, 'webSeed')
peer.swarm = swarm
peer.conn = conn
+ peer.throttleGroups = throttleGroups
peer.onConnect()
@@ -104,8 +118,10 @@ exports.createWebSeedPeer = (conn, id, swarm) => {
* @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds)
* @param {string} type the type of the peer
*/
-class Peer {
+class Peer extends EventEmitter {
constructor (id, type) {
+ super()
+
this.id = id
this.type = type
@@ -170,10 +186,39 @@ class Peer {
})
this.startHandshakeTimeout()
- conn.pipe(wire).pipe(conn)
+ this.setThrottlePipes()
+
if (this.swarm && !this.sentHandshake) this.handshake()
}
+ clearPipes () {
+ this.conn.unpipe()
+ this.wire.unpipe()
+ }
+
+ setThrottlePipes () {
+ const self = this
+ this.conn
+ .pipe(this.throttleGroups.down.throttle())
+ .pipe(new Transform({
+ transform (chunk, _, callback) {
+ self.emit('download', chunk.length)
+ if (self.destroyed) return
+ callback(null, chunk)
+ }
+ }))
+ .pipe(this.wire)
+ .pipe(this.throttleGroups.up.throttle())
+ .pipe(new Transform({
+ transform (chunk, _, callback) {
+ self.emit('upload', chunk.length)
+ if (self.destroyed) return
+ callback(null, chunk)
+ }
+ }))
+ .pipe(this.conn)
+ }
+
/**
* Called when handshake is received from remote peer.
* @param {string} infoHash
diff --git a/lib/torrent.js b/lib/torrent.js
index 0a473ba..7e98a8e 100644
--- a/lib/torrent.js
+++ b/lib/torrent.js
@@ -828,14 +828,15 @@ class Torrent extends EventEmitter {
let newPeer
if (typeof peer === 'string') {
// `peer` is an addr ("ip:port" string)
- newPeer = type === 'utp' ? Peer.createUTPOutgoingPeer(peer, this) : Peer.createTCPOutgoingPeer(peer, this)
+ newPeer = type === 'utp'
+ ? Peer.createUTPOutgoingPeer(peer, this, this.client.throttleGroups)
+ : Peer.createTCPOutgoingPeer(peer, this, this.client.throttleGroups)
} else {
// `peer` is a WebRTC connection (simple-peer)
- newPeer = Peer.createWebRTCPeer(peer, this)
+ newPeer = Peer.createWebRTCPeer(peer, this, this.client.throttleGroups)
}
- this._peers[newPeer.id] = newPeer
- this._peersLength += 1
+ this._registerPeer(newPeer)
if (typeof peer === 'string') {
// `peer` is an addr ("ip:port" string)
@@ -883,9 +884,9 @@ class Torrent extends EventEmitter {
this._debug('add web seed %s', id)
- const newPeer = Peer.createWebSeedPeer(conn, id, this)
- this._peers[newPeer.id] = newPeer
- this._peersLength += 1
+ const newPeer = Peer.createWebSeedPeer(conn, id, this, this.client.throttleGroups)
+
+ this._registerPeer(newPeer)
this.emit('peer', id)
}
@@ -900,7 +901,31 @@ class Torrent extends EventEmitter {
this._debug('add incoming peer %s', peer.id)
- this._peers[peer.id] = peer
+ this._registerPeer(peer)
+ }
+
+ _registerPeer (newPeer) {
+ newPeer.on('download', downloaded => {
+ if (this.destroyed) return
+ this.received += downloaded
+ this._downloadSpeed(downloaded)
+ this.client._downloadSpeed(downloaded)
+ this.emit('download', downloaded)
+ if (this.destroyed) return
+ this.client.emit('download', downloaded)
+ })
+
+ newPeer.on('upload', uploaded => {
+ if (this.destroyed) return
+ this.uploaded += uploaded
+ this._uploadSpeed(uploaded)
+ this.client._uploadSpeed(uploaded)
+ this.emit('upload', uploaded)
+ if (this.destroyed) return
+ this.client.emit('upload', uploaded)
+ })
+
+ this._peers[newPeer.id] = newPeer
this._peersLength += 1
}
@@ -976,26 +1001,6 @@ class Torrent extends EventEmitter {
_onWire (wire, addr) {
this._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown')
- wire.on('download', downloaded => {
- if (this.destroyed) return
- this.received += downloaded
- this._downloadSpeed(downloaded)
- this.client._downloadSpeed(downloaded)
- this.emit('download', downloaded)
- if (this.destroyed) return
- this.client.emit('download', downloaded)
- })
-
- wire.on('upload', uploaded => {
- if (this.destroyed) return
- this.uploaded += uploaded
- this._uploadSpeed(uploaded)
- this.client._uploadSpeed(uploaded)
- this.emit('upload', uploaded)
- if (this.destroyed) return
- this.client.emit('upload', uploaded)
- })
-
this.wires.push(wire)
if (addr) {