Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
Diffstat (limited to 'lib/conn-pool.js')
-rw-r--r--lib/conn-pool.js163
1 files changed, 163 insertions, 0 deletions
diff --git a/lib/conn-pool.js b/lib/conn-pool.js
new file mode 100644
index 0000000..ddc0672
--- /dev/null
+++ b/lib/conn-pool.js
@@ -0,0 +1,163 @@
+const arrayRemove = require('unordered-array-remove')
+const debug = require('debug')('webtorrent:conn-pool')
+const net = require('net') // browser exclude
+const utp = require('utp-native') // browser exclude
+
+const Peer = require('./peer')
+
+/**
+ * Connection Pool
+ *
+ * A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines
+ * which swarm incoming connections are intended for by inspecting the bittorrent
+ * handshake that the remote peer sends.
+ *
+ * @param {number} port
+ */
+class ConnPool {
+ constructor (client) {
+ debug('create pool (port %s)', client.torrentPort)
+
+ this._client = client
+
+ // Temporarily store incoming connections so they can be destroyed if the server is
+ // closed before the connection is passed off to a Torrent.
+ this._pendingConns = []
+
+ this._onConnectionBound = (conn) => {
+ this._onConnection(conn)
+ }
+
+ this._onListening = () => {
+ this._client._onListening()
+ }
+
+ this._onTCPError = (err) => {
+ this._client._destroy(err)
+ }
+
+ this._onUTPError = () => {
+ this._client.utp = false
+ }
+
+ // Setup TCP
+ this.tcpServer = net.createServer()
+ this.tcpServer.on('connection', this._onConnectionBound)
+ this.tcpServer.on('error', this._onTCPError)
+
+ // Start TCP
+ this.tcpServer.listen(client.torrentPort, () => {
+ debug('creating tcpServer in port %s', this.tcpServer.address().port)
+ if (this._client.utp) {
+ // Setup uTP
+ this.utpServer = utp.createServer()
+ this.utpServer.on('connection', this._onConnectionBound)
+ this.utpServer.on('listening', this._onListening)
+ this.utpServer.on('error', this._onUTPError)
+
+ // Start uTP
+ debug('creating utpServer in port %s', this.tcpServer.address().port)
+ this.utpServer.listen(this.tcpServer.address().port)
+ } else {
+ this._onListening()
+ }
+ })
+ }
+
+ /**
+ * Destroy this Conn pool.
+ * @param {function} cb
+ */
+ destroy (cb) {
+ debug('destroy conn pool')
+
+ if (this.utpServer) {
+ this.utpServer.removeListener('connection', this._onConnectionBound)
+ this.utpServer.removeListener('listening', this._onListening)
+ this.utpServer.removeListener('error', this._onUTPError)
+ }
+
+ this.tcpServer.removeListener('connection', this._onConnectionBound)
+ this.tcpServer.removeListener('error', this._onTCPError)
+
+ // Destroy all open connection objects so server can close gracefully without waiting
+ // for connection timeout or remote peer to disconnect.
+ this._pendingConns.forEach((conn) => {
+ conn.on('error', noop)
+ conn.destroy()
+ })
+
+ if (this.utpServer) {
+ try {
+ this.utpServer.close(cb)
+ } catch (err) {
+ if (cb) process.nextTick(cb)
+ }
+ }
+
+ try {
+ this.tcpServer.close(cb)
+ } catch (err) {
+ if (cb) process.nextTick(cb)
+ }
+
+ this.tcpServer = null
+ this.utpServer = null
+ this._client = null
+ this._pendingConns = null
+ }
+
+ /**
+ * On incoming connections, we expect the remote peer to send a handshake first. Based
+ * on the infoHash in that handshake, route the peer to the right swarm.
+ */
+ _onConnection (conn) {
+ const self = this
+
+ // If the connection has already been closed before the `connect` event is fired,
+ // then `remoteAddress` will not be available, and we can't use this connection.
+ // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566
+ // - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398
+ if (!conn.remoteAddress) {
+ conn.on('error', noop)
+ conn.destroy()
+ return
+ }
+
+ self._pendingConns.push(conn)
+ conn.once('close', cleanupPending)
+
+ const peer = this.utpServer ? Peer.createUTPIncomingPeer(conn) : Peer.createTCPIncomingPeer(conn)
+
+ const wire = peer.wire
+ wire.once('handshake', onHandshake)
+
+ function onHandshake (infoHash, peerId) {
+ cleanupPending()
+
+ const torrent = self._client.get(infoHash)
+ if (torrent) {
+ peer.swarm = torrent
+ torrent._addIncomingPeer(peer)
+ peer.onHandshake(infoHash, peerId)
+ } else {
+ const err = new Error(
+ `Unexpected info hash ${infoHash} from incoming peer ${peer.id}`
+ )
+ peer.destroy(err)
+ }
+ }
+
+ function cleanupPending () {
+ conn.removeListener('close', cleanupPending)
+ wire.removeListener('handshake', onHandshake)
+ if (self._pendingConns) {
+ arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn))
+ }
+ }
+ }
+}
+
+function noop () {}
+
+module.exports = ConnPool