const net = require('net') // browser exclude const debugFactory = require('debug') const queueMicrotask = require('queue-microtask') const Peer = require('./peer.js') const utp = require('./utp.js') // browser exclude const debug = debugFactory('webtorrent:conn-pool') /** * Connection Pool * * A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines * which swarm incoming connections are intended for by inspecting the bittorrent * handshake that the remote peer sends. * * @param {number} port */ class ConnPool { constructor (client) { debug('create pool (port %s)', client.torrentPort) this._client = client // Temporarily store incoming connections so they can be destroyed if the server is // closed before the connection is passed off to a Torrent. this._pendingConns = new Set() this._onTCPConnectionBound = (conn) => { this._onConnection(conn, 'tcp') } this._onUTPConnectionBound = (conn) => { this._onConnection(conn, 'utp') } this._onListening = () => { this._client._onListening() } this._onTCPError = (err) => { this._client._destroy(err) } this._onUTPError = () => { this._client.utp = false } // Setup TCP this.tcpServer = net.createServer() this.tcpServer.on('connection', this._onTCPConnectionBound) this.tcpServer.on('error', this._onTCPError) // Start TCP this.tcpServer.listen(client.torrentPort, () => { debug('creating tcpServer in port %s', this.tcpServer.address().port) if (this._client.utp) { // Setup uTP this.utpServer = utp.createServer() this.utpServer.on('connection', this._onUTPConnectionBound) this.utpServer.on('listening', this._onListening) this.utpServer.on('error', this._onUTPError) // Start uTP debug('creating utpServer in port %s', this.tcpServer.address().port) this.utpServer.listen(this.tcpServer.address().port) } else { this._onListening() } }) } /** * Destroy this Conn pool. * @param {function} cb */ destroy (cb) { debug('destroy conn pool') if (this.utpServer) { this.utpServer.removeListener('connection', this._onUTPConnectionBound) this.utpServer.removeListener('listening', this._onListening) this.utpServer.removeListener('error', this._onUTPError) } this.tcpServer.removeListener('connection', this._onTCPConnectionBound) this.tcpServer.removeListener('error', this._onTCPError) // Destroy all open connection objects so server can close gracefully without waiting // for connection timeout or remote peer to disconnect. this._pendingConns.forEach((conn) => { conn.on('error', noop) conn.destroy() }) if (this.utpServer) { try { this.utpServer.close(cb) } catch (err) { if (cb) queueMicrotask(cb) } } try { this.tcpServer.close(cb) } catch (err) { if (cb) queueMicrotask(cb) } this.tcpServer = null this.utpServer = null this._client = null this._pendingConns = null } /** * On incoming connections, we expect the remote peer to send a handshake first. Based * on the infoHash in that handshake, route the peer to the right swarm. */ _onConnection (conn, type) { const self = this // If the connection has already been closed before the `connect` event is fired, // then `remoteAddress` will not be available, and we can't use this connection. // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566 // - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398 if (!conn.remoteAddress) { conn.on('error', noop) conn.destroy() return } self._pendingConns.add(conn) conn.once('close', cleanupPending) const peer = type === 'utp' ? Peer.createUTPIncomingPeer(conn, this._client.throttleGroups) : Peer.createTCPIncomingPeer(conn, this._client.throttleGroups) const wire = peer.wire wire.once('pe3', onPe3) wire.once('handshake', onHandshake) function onPe3 (infoHashHash) { const torrent = self._client._getByHash(infoHashHash) if (torrent) { peer.swarm = torrent torrent._addIncomingPeer(peer) peer.onPe3(infoHashHash) } else { peer.destroy(new Error(`Unexpected info hash hash ${infoHashHash} from incoming peer ${peer.id}`)) } } function onHandshake (infoHash, peerId) { cleanupPending() const torrent = self._client.get(infoHash) // only add incoming peer if didn't already do so in protocol encryption handshake if (torrent) { if (!peer.swarm) { peer.swarm = torrent torrent._addIncomingPeer(peer) } peer.onHandshake(infoHash, peerId) } else { const err = new Error( `Unexpected info hash ${infoHash} from incoming peer ${peer.id}` ) peer.destroy(err) } } function cleanupPending () { conn.removeListener('close', cleanupPending) wire.removeListener('handshake', onHandshake) if (self._pendingConns) { self._pendingConns.delete(conn) } } } } ConnPool.UTP_SUPPORT = Object.keys(utp).length > 0 function noop () {} module.exports = ConnPool