diff options
Diffstat (limited to 'lib/peer.js')
-rw-r--r-- | lib/peer.js | 253 |
1 files changed, 253 insertions, 0 deletions
diff --git a/lib/peer.js b/lib/peer.js new file mode 100644 index 0000000..f4667b5 --- /dev/null +++ b/lib/peer.js @@ -0,0 +1,253 @@ +var debug = require('debug')('webtorrent:peer') +var Wire = require('bittorrent-protocol') + +var WebConn = require('./webconn') + +var CONNECT_TIMEOUT = 25000 +var HANDSHAKE_TIMEOUT = 25000 + +/** + * WebRTC peer connections start out connected, because WebRTC peers require an + * "introduction" (i.e. WebRTC signaling), and there's no equivalent to an IP address + * that lets you refer to a WebRTC endpoint. + */ +exports.createWebRTCPeer = function (conn, swarm) { + var peer = new Peer(conn.id, 'webrtc') + peer.conn = conn + peer.swarm = swarm + + if (peer.conn.connected) { + peer.onConnect() + } else { + peer.conn.once('connect', function () { peer.onConnect() }) + peer.conn.once('error', function (err) { peer.destroy(err) }) + peer.setConnectTimeout() + } + + return peer +} + +/** + * Incoming TCP peers start out connected, because the remote peer connected to the + * listening port of the TCP server. Until the remote peer sends a handshake, we don't + * know what swarm the connection is intended for. + */ +exports.createTCPIncomingPeer = function (conn) { + var addr = conn.remoteAddress + ':' + conn.remotePort + var peer = new Peer(addr, 'tcpIncoming') + peer.conn = conn + peer.addr = addr + + peer.onConnect() + + return peer +} + +/** + * Outgoing TCP peers start out with just an IP address. At some point (when there is an + * available connection), the client can attempt to connect to the address. + */ +exports.createTCPOutgoingPeer = function (addr, swarm) { + var peer = new Peer(addr, 'tcpOutgoing') + peer.addr = addr + peer.swarm = swarm + + return peer +} + +/** + * Peer that represents a Web Seed (BEP17 / BEP19). + */ +exports.createWebSeedPeer = function (url, parsedTorrent, swarm) { + var peer = new Peer(url, 'webSeed') + peer.swarm = swarm + peer.conn = new WebConn(url, parsedTorrent) + + peer.onConnect() + + return peer +} + +/** + * Peer. Represents a peer in the Swarm. + * + * @param {string} id "ip:port" string, peer id (for WebRTC peers), or url (for Web Seeds) + * @param {string} type the type of the peer + */ +function Peer (id, type) { + var self = this + self.id = id + self.type = type + + debug('new Peer %s', id) + + self.addr = null + self.conn = null + self.swarm = null + self.wire = null + + self.connected = false + self.destroyed = false + self.timeout = null // handshake timeout + self.retries = 0 // outgoing TCP connection retry count + + self.sentHandshake = false +} + +/** + * Called once the peer is connected (i.e. fired 'connect' event) + * @param {Socket} conn + */ +Peer.prototype.onConnect = function () { + var self = this + if (self.destroyed) return + self.connected = true + + debug('Peer %s connected', self.id) + + clearTimeout(self.connectTimeout) + + var conn = self.conn + conn.once('end', function () { + self.destroy() + }) + conn.once('close', function () { + self.destroy() + }) + conn.once('finish', function () { + self.destroy() + }) + conn.once('error', function (err) { + self.destroy(err) + }) + + var wire = self.wire = new Wire() + wire.type = self.type + wire.once('end', function () { + self.destroy() + }) + wire.once('close', function () { + self.destroy() + }) + wire.once('finish', function () { + self.destroy() + }) + wire.once('error', function (err) { + self.destroy(err) + }) + + wire.once('handshake', function (infoHash, peerId) { + self.onHandshake(infoHash, peerId) + }) + self.setHandshakeTimeout() + + conn.pipe(wire).pipe(conn) + if (self.swarm && !self.sentHandshake) self.handshake() +} + +/** + * Called when handshake is received from remote peer. + * @param {string} infoHash + * @param {string} peerId + */ +Peer.prototype.onHandshake = function (infoHash, peerId) { + var self = this + if (!self.swarm) return // `self.swarm` not set yet, so do nothing + + if (self.swarm.destroyed) return self.destroy(new Error('swarm already destroyed')) + if (infoHash !== self.swarm.infoHash) { + return self.destroy(new Error('unexpected handshake info hash for this swarm')) + } + if (peerId === self.swarm.peerId) { + return self.destroy(new Error('refusing to handshake with self')) + } + + debug('Peer %s got handshake %s', self.id, infoHash) + + clearTimeout(self.handshakeTimeout) + + self.retries = 0 + + self.wire.on('download', function (downloaded) { + if (self.destroyed) return + self.swarm.downloaded += downloaded + self.swarm.downloadSpeed(downloaded) + self.swarm.emit('download', downloaded) + }) + + self.wire.on('upload', function (uploaded) { + if (self.destroyed) return + self.swarm.uploaded += uploaded + self.swarm.uploadSpeed(uploaded) + self.swarm.emit('upload', uploaded) + }) + + self.swarm.wires.push(self.wire) + + var addr = self.addr + if (!addr && self.conn.remoteAddress) { + addr = self.conn.remoteAddress + ':' + self.conn.remotePort + } + self.swarm.emit('wire', self.wire, addr) + // swarm could be destroyed in user's 'wire' event handler + if (!self.swarm || self.swarm.destroyed) return + + if (!self.sentHandshake) self.handshake() +} + +Peer.prototype.handshake = function () { + var self = this + self.wire.handshake(self.swarm.infoHash, self.swarm.peerId, self.swarm.handshakeOpts) + self.sentHandshake = true +} + +Peer.prototype.setConnectTimeout = function () { + var self = this + clearTimeout(self.connectTimeout) + self.connectTimeout = setTimeout(function () { + self.destroy(new Error('connect timeout')) + }, CONNECT_TIMEOUT) + if (self.connectTimeout.unref) self.connectTimeout.unref() +} + +Peer.prototype.setHandshakeTimeout = function () { + var self = this + clearTimeout(self.handshakeTimeout) + self.handshakeTimeout = setTimeout(function () { + self.destroy(new Error('handshake timeout')) + }, HANDSHAKE_TIMEOUT) + if (self.handshakeTimeout.unref) self.handshakeTimeout.unref() +} + +Peer.prototype.destroy = function (err) { + var self = this + if (self.destroyed) return + self.destroyed = true + self.connected = false + + debug('destroy %s (error: %s)', self.id, err && (err.message || err)) + + clearTimeout(self.connectTimeout) + clearTimeout(self.handshakeTimeout) + + var swarm = self.swarm + var conn = self.conn + var wire = self.wire + + self.conn = null + self.swarm = null + self.wire = null + + if (swarm && wire) { + var index = swarm.wires.indexOf(wire) + if (index >= 0) swarm.wires.splice(index, 1) + } + if (conn) { + conn.on('error', noop) + conn.destroy() + } + if (wire) wire.destroy() + if (swarm) swarm.removePeer(self.id) +} + +function noop () {} |