Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex <alxmorais8@msn.com>2021-07-24 01:05:25 +0300
committerGitHub <noreply@github.com>2021-07-24 01:05:25 +0300
commit39bb33c3cf694cdee45378ea4b30c66c93576d2a (patch)
tree654d454bbcc6bba1deab2195ccf6bcbc2bbe4f60 /lib/peer.js
parent524618edde211a2ce2c9d1e40f68a091699442fd (diff)
feat: add speed limit to client (#2062)
* Add speed limit to client * Fix standard * Update docs/api.md * Add changes from PR feedback Co-authored-by: Kadu Diógenes <kadu@fnix.com.br> Co-authored-by: Ivan Gorbanev <ivang@van.work> Co-authored-by: ultimate-tester <jordimueters@hotmail.com> Co-authored-by: Julen Garcia Leunda <hicom150@gmail.com> Co-authored-by: Niklas Johansson <niklas.y.johansson@se.abb.com> Co-authored-by: ThaUnknown <kapi.skowronek@gmail.com> Co-authored-by: Diego Rodríguez Baquero <github@diegorbaquero.com>
Diffstat (limited to 'lib/peer.js')
-rw-r--r--lib/peer.js65
1 files changed, 55 insertions, 10 deletions
diff --git a/lib/peer.js b/lib/peer.js
index c7da3ad..dd048fc 100644
--- a/lib/peer.js
+++ b/lib/peer.js
@@ -1,3 +1,5 @@
+const { EventEmitter } = require('events')
+const { Transform } = require('stream')
const arrayRemove = require('unordered-array-remove')
const debug = require('debug')('webtorrent:peer')
const Wire = require('bittorrent-protocol')
@@ -12,10 +14,11 @@ const HANDSHAKE_TIMEOUT = 25000
* "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address
* that lets you refer to a WebRTC endpoint.
*/
-exports.createWebRTCPeer = (conn, swarm) => {
+exports.createWebRTCPeer = (conn, swarm, throttleGroups) => {
const peer = new Peer(conn.id, 'webrtc')
peer.conn = conn
peer.swarm = swarm
+ peer.throttleGroups = throttleGroups
if (peer.conn.connected) {
peer.onConnect()
@@ -45,42 +48,52 @@ exports.createWebRTCPeer = (conn, swarm) => {
* listening port of the TCP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
-exports.createTCPIncomingPeer = conn => _createIncomingPeer(conn, 'tcpIncoming')
+exports.createTCPIncomingPeer = (conn, throttleGroups) => {
+ return _createIncomingPeer(conn, 'tcpIncoming', throttleGroups)
+}
/**
* Incoming uTP peers start out connected, because the remote peer connected to the
* listening port of the uTP server. Until the remote peer sends a handshake, we don't
* know what swarm the connection is intended for.
*/
-exports.createUTPIncomingPeer = conn => _createIncomingPeer(conn, 'utpIncoming')
+exports.createUTPIncomingPeer = (conn, throttleGroups) => {
+ return _createIncomingPeer(conn, 'utpIncoming', throttleGroups)
+}
/**
* Outgoing TCP peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
-exports.createTCPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'tcpOutgoing')
+exports.createTCPOutgoingPeer = (addr, swarm, throttleGroups) => {
+ return _createOutgoingPeer(addr, swarm, 'tcpOutgoing', throttleGroups)
+}
/**
* Outgoing uTP peers start out with just an IP address. At some point (when there is an
* available connection), the client can attempt to connect to the address.
*/
-exports.createUTPOutgoingPeer = (addr, swarm) => _createOutgoingPeer(addr, swarm, 'utpOutgoing')
+exports.createUTPOutgoingPeer = (addr, swarm, throttleGroups) => {
+ return _createOutgoingPeer(addr, swarm, 'utpOutgoing', throttleGroups)
+}
-const _createIncomingPeer = (conn, type) => {
+const _createIncomingPeer = (conn, type, throttleGroups) => {
const addr = `${conn.remoteAddress}:${conn.remotePort}`
const peer = new Peer(addr, type)
peer.conn = conn
peer.addr = addr
+ peer.throttleGroups = throttleGroups
peer.onConnect()
return peer
}
-const _createOutgoingPeer = (addr, swarm, type) => {
+const _createOutgoingPeer = (addr, swarm, type, throttleGroups) => {
const peer = new Peer(addr, type)
peer.addr = addr
peer.swarm = swarm
+ peer.throttleGroups = throttleGroups
return peer
}
@@ -88,10 +101,11 @@ const _createOutgoingPeer = (addr, swarm, type) => {
/**
* Peer that represents a Web Seed (BEP17 / BEP19).
*/
-exports.createWebSeedPeer = (conn, id, swarm) => {
+exports.createWebSeedPeer = (conn, id, swarm, throttleGroups) => {
const peer = new Peer(id, 'webSeed')
peer.swarm = swarm
peer.conn = conn
+ peer.throttleGroups = throttleGroups
peer.onConnect()
@@ -104,8 +118,10 @@ exports.createWebSeedPeer = (conn, id, swarm) => {
* @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds)
* @param {string} type the type of the peer
*/
-class Peer {
+class Peer extends EventEmitter {
constructor (id, type) {
+ super()
+
this.id = id
this.type = type
@@ -170,10 +186,39 @@ class Peer {
})
this.startHandshakeTimeout()
- conn.pipe(wire).pipe(conn)
+ this.setThrottlePipes()
+
if (this.swarm && !this.sentHandshake) this.handshake()
}
+ clearPipes () {
+ this.conn.unpipe()
+ this.wire.unpipe()
+ }
+
+ setThrottlePipes () {
+ const self = this
+ this.conn
+ .pipe(this.throttleGroups.down.throttle())
+ .pipe(new Transform({
+ transform (chunk, _, callback) {
+ self.emit('download', chunk.length)
+ if (self.destroyed) return
+ callback(null, chunk)
+ }
+ }))
+ .pipe(this.wire)
+ .pipe(this.throttleGroups.up.throttle())
+ .pipe(new Transform({
+ transform (chunk, _, callback) {
+ self.emit('upload', chunk.length)
+ if (self.destroyed) return
+ callback(null, chunk)
+ }
+ }))
+ .pipe(this.conn)
+ }
+
/**
* Called when handshake is received from remote peer.
* @param {string} infoHash