diff options
Diffstat (limited to 'lib/conn-pool.js')
-rw-r--r-- | lib/conn-pool.js | 163 |
1 files changed, 163 insertions, 0 deletions
diff --git a/lib/conn-pool.js b/lib/conn-pool.js new file mode 100644 index 0000000..ddc0672 --- /dev/null +++ b/lib/conn-pool.js @@ -0,0 +1,163 @@ +const arrayRemove = require('unordered-array-remove') +const debug = require('debug')('webtorrent:conn-pool') +const net = require('net') // browser exclude +const utp = require('utp-native') // browser exclude + +const Peer = require('./peer') + +/** + * Connection Pool + * + * A connection pool allows multiple swarms to listen on the same TCP/UDP port and determines + * which swarm incoming connections are intended for by inspecting the bittorrent + * handshake that the remote peer sends. + * + * @param {number} port + */ +class ConnPool { + constructor (client) { + debug('create pool (port %s)', client.torrentPort) + + this._client = client + + // Temporarily store incoming connections so they can be destroyed if the server is + // closed before the connection is passed off to a Torrent. + this._pendingConns = [] + + this._onConnectionBound = (conn) => { + this._onConnection(conn) + } + + this._onListening = () => { + this._client._onListening() + } + + this._onTCPError = (err) => { + this._client._destroy(err) + } + + this._onUTPError = () => { + this._client.utp = false + } + + // Setup TCP + this.tcpServer = net.createServer() + this.tcpServer.on('connection', this._onConnectionBound) + this.tcpServer.on('error', this._onTCPError) + + // Start TCP + this.tcpServer.listen(client.torrentPort, () => { + debug('creating tcpServer in port %s', this.tcpServer.address().port) + if (this._client.utp) { + // Setup uTP + this.utpServer = utp.createServer() + this.utpServer.on('connection', this._onConnectionBound) + this.utpServer.on('listening', this._onListening) + this.utpServer.on('error', this._onUTPError) + + // Start uTP + debug('creating utpServer in port %s', this.tcpServer.address().port) + this.utpServer.listen(this.tcpServer.address().port) + } else { + this._onListening() + } + }) + } + + /** + * Destroy this Conn pool. + * @param {function} cb + */ + destroy (cb) { + debug('destroy conn pool') + + if (this.utpServer) { + this.utpServer.removeListener('connection', this._onConnectionBound) + this.utpServer.removeListener('listening', this._onListening) + this.utpServer.removeListener('error', this._onUTPError) + } + + this.tcpServer.removeListener('connection', this._onConnectionBound) + this.tcpServer.removeListener('error', this._onTCPError) + + // Destroy all open connection objects so server can close gracefully without waiting + // for connection timeout or remote peer to disconnect. + this._pendingConns.forEach((conn) => { + conn.on('error', noop) + conn.destroy() + }) + + if (this.utpServer) { + try { + this.utpServer.close(cb) + } catch (err) { + if (cb) process.nextTick(cb) + } + } + + try { + this.tcpServer.close(cb) + } catch (err) { + if (cb) process.nextTick(cb) + } + + this.tcpServer = null + this.utpServer = null + this._client = null + this._pendingConns = null + } + + /** + * On incoming connections, we expect the remote peer to send a handshake first. Based + * on the infoHash in that handshake, route the peer to the right swarm. + */ + _onConnection (conn) { + const self = this + + // If the connection has already been closed before the `connect` event is fired, + // then `remoteAddress` will not be available, and we can't use this connection. + // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566 + // - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398 + if (!conn.remoteAddress) { + conn.on('error', noop) + conn.destroy() + return + } + + self._pendingConns.push(conn) + conn.once('close', cleanupPending) + + const peer = this.utpServer ? Peer.createUTPIncomingPeer(conn) : Peer.createTCPIncomingPeer(conn) + + const wire = peer.wire + wire.once('handshake', onHandshake) + + function onHandshake (infoHash, peerId) { + cleanupPending() + + const torrent = self._client.get(infoHash) + if (torrent) { + peer.swarm = torrent + torrent._addIncomingPeer(peer) + peer.onHandshake(infoHash, peerId) + } else { + const err = new Error( + `Unexpected info hash ${infoHash} from incoming peer ${peer.id}` + ) + peer.destroy(err) + } + } + + function cleanupPending () { + conn.removeListener('close', cleanupPending) + wire.removeListener('handshake', onHandshake) + if (self._pendingConns) { + arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn)) + } + } + } +} + +function noop () {} + +module.exports = ConnPool |