Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorDiego Rodríguez Baquero <diegorbaquero@gmail.com>2018-08-24 03:19:21 +0300
committerGitHub <noreply@github.com>2018-08-24 03:19:21 +0300
commit92ec1a249d5374178557d80c1fc013ff2c76440d (patch)
tree6ded94ab17434a770d353ba424fab90cecf29719 /lib
parentf0e004b902c8c60a66b3ec45c3caddf81e18550a (diff)
parentafbef26d5fafa745dc0793ed0d6a5761f5f0eb89 (diff)
Merge pull request #1480 from webtorrent/modernize
Modernize some lib files
Diffstat (limited to 'lib')
-rw-r--r--lib/server.js75
-rw-r--r--lib/tcp-pool.js188
-rw-r--r--lib/torrent.js2697
3 files changed, 1454 insertions, 1506 deletions
diff --git a/lib/server.js b/lib/server.js
index 8ea9a48..60595a8 100644
--- a/lib/server.js
+++ b/lib/server.js
@@ -1,44 +1,41 @@
-module.exports = Server
-
-var arrayRemove = require('unordered-array-remove')
-var http = require('http')
-var mime = require('mime')
-var pump = require('pump')
-var rangeParser = require('range-parser')
-var url = require('url')
-
-function Server (torrent, opts) {
- var server = http.createServer()
- if (!opts) opts = {}
+const arrayRemove = require('unordered-array-remove')
+const http = require('http')
+const mime = require('mime')
+const pump = require('pump')
+const rangeParser = require('range-parser')
+const url = require('url')
+
+function Server (torrent, opts = {}) {
+ const server = http.createServer()
if (!opts.origin) opts.origin = '*' // allow all origins by default
- var sockets = []
- var pendingReady = []
- var closed = false
+ const sockets = []
+ const pendingReady = []
+ let closed = false
server.on('connection', onConnection)
server.on('request', onRequest)
- var _close = server.close
- server.close = function (cb) {
+ const _close = server.close
+ server.close = cb => {
closed = true
server.removeListener('connection', onConnection)
server.removeListener('request', onRequest)
while (pendingReady.length) {
- var onReady = pendingReady.pop()
+ const onReady = pendingReady.pop()
torrent.removeListener('ready', onReady)
}
torrent = null
_close.call(server, cb)
}
- server.destroy = function (cb) {
- sockets.forEach(function (socket) {
+ server.destroy = cb => {
+ sockets.forEach(socket => {
socket.destroy()
})
// Only call `server.close` if user has not called it already
- if (!cb) cb = function () {}
+ if (!cb) cb = () => {}
if (closed) process.nextTick(cb)
else server.close(cb)
}
@@ -68,13 +65,13 @@ function Server (torrent, opts) {
function onConnection (socket) {
socket.setTimeout(36000000)
sockets.push(socket)
- socket.once('close', function () {
+ socket.once('close', () => {
arrayRemove(sockets, sockets.indexOf(socket))
})
}
function onRequest (req, res) {
- var pathname = url.parse(req.url).pathname
+ const pathname = url.parse(req.url).pathname
if (pathname === '/favicon.ico') {
return serve404Page()
@@ -132,12 +129,12 @@ function Server (torrent, opts) {
return serveIndexPage()
}
- var index = Number(pathname.split('/')[1])
+ const index = Number(pathname.split('/')[1])
if (Number.isNaN(index) || index >= torrent.files.length) {
return serve404Page()
}
- var file = torrent.files[index]
+ const file = torrent.files[index]
serveFile(file)
}
@@ -145,14 +142,11 @@ function Server (torrent, opts) {
res.statusCode = 200
res.setHeader('Content-Type', 'text/html')
- var listHtml = torrent.files.map(function (file, i) {
- return '<li><a download="' + file.name + '" href="/' + i + '/' + file.name + '">' + file.path + '</a> ' +
- '(' + file.length + ' bytes)</li>'
- }).join('<br>')
+ const listHtml = torrent.files.map((file, i) => `<li><a download="${file.name}" href="/${i}/${file.name}">${file.path}</a> (${file.length} bytes)</li>`).join('<br>')
- var html = getPageHTML(
- torrent.name + ' - WebTorrent',
- '<h1>' + torrent.name + '</h1><ol>' + listHtml + '</ol>'
+ const html = getPageHTML(
+ `${torrent.name} - WebTorrent`,
+ `<h1>${torrent.name}</h1><ol>${listHtml}</ol>`
)
res.end(html)
}
@@ -161,7 +155,7 @@ function Server (torrent, opts) {
res.statusCode = 404
res.setHeader('Content-Type', 'text/html')
- var html = getPageHTML('404 - Not Found', '<h1>404 - Not Found</h1>')
+ const html = getPageHTML('404 - Not Found', '<h1>404 - Not Found</h1>')
res.end(html)
}
@@ -175,7 +169,7 @@ function Server (torrent, opts) {
// Set name of file (for "Save Page As..." dialog)
res.setHeader(
'Content-Disposition',
- 'inline; filename*=UTF-8\'\'' + encodeRFC5987(file.name)
+ `inline; filename*=UTF-8''${encodeRFC5987(file.name)}`
)
// Support DLNA streaming
@@ -187,7 +181,7 @@ function Server (torrent, opts) {
// `rangeParser` returns an array of ranges, or an error code (number) if
// there was an error parsing the range.
- var range = rangeParser(file.length, req.headers.range || '')
+ let range = rangeParser(file.length, req.headers.range || '')
if (Array.isArray(range)) {
res.statusCode = 206 // indicates that range-request was understood
@@ -197,7 +191,7 @@ function Server (torrent, opts) {
res.setHeader(
'Content-Range',
- 'bytes ' + range.start + '-' + range.end + '/' + file.length
+ `bytes ${range.start}-${range.end}/${file.length}`
)
res.setHeader('Content-Length', range.end - range.start + 1)
} else {
@@ -215,7 +209,7 @@ function Server (torrent, opts) {
function serveMethodNotAllowed () {
res.statusCode = 405
res.setHeader('Content-Type', 'text/html')
- var html = getPageHTML('405 - Method Not Allowed', '<h1>405 - Method Not Allowed</h1>')
+ const html = getPageHTML('405 - Method Not Allowed', '<h1>405 - Method Not Allowed</h1>')
res.end(html)
}
}
@@ -224,10 +218,7 @@ function Server (torrent, opts) {
}
function getPageHTML (title, pageHtml) {
- return '<!DOCTYPE html><html lang="en"><head>' +
- '<meta charset="utf-8">' +
- '<title>' + title + '</title>' +
- '</head><body>' + pageHtml + '</body></html>'
+ return `<!DOCTYPE html><html lang="en"><head><meta charset="utf-8"><title>${title}</title></head><body>${pageHtml}</body></html>`
}
// From https://developer.mozilla.org/en/docs/Web/JavaScript/Reference/Global_Objects/encodeURIComponent
@@ -241,3 +232,5 @@ function encodeRFC5987 (str) {
// so we can allow for a little better readability over the wire: |`^
.replace(/%(?:7C|60|5E)/g, unescape)
}
+
+module.exports = Server
diff --git a/lib/tcp-pool.js b/lib/tcp-pool.js
index 7625d8d..1094c4f 100644
--- a/lib/tcp-pool.js
+++ b/lib/tcp-pool.js
@@ -1,10 +1,8 @@
-module.exports = TCPPool
-
-var arrayRemove = require('unordered-array-remove')
-var debug = require('debug')('webtorrent:tcp-pool')
-var net = require('net') // browser exclude
+const arrayRemove = require('unordered-array-remove')
+const debug = require('debug')('webtorrent:tcp-pool')
+const net = require('net') // browser exclude
-var Peer = require('./peer')
+const Peer = require('./peer')
/**
* TCPPool
@@ -15,114 +13,116 @@ var Peer = require('./peer')
*
* @param {number} port
*/
-function TCPPool (client) {
- var self = this
- debug('create tcp pool (port %s)', client.torrentPort)
-
- self.server = net.createServer()
- self._client = client
+class TCPPool {
+ constructor (client) {
+ debug('create tcp pool (port %s)', client.torrentPort)
- // Temporarily store incoming connections so they can be destroyed if the server is
- // closed before the connection is passed off to a Torrent.
- self._pendingConns = []
+ this.server = net.createServer()
+ this._client = client
- self._onConnectionBound = function (conn) {
- self._onConnection(conn)
- }
+ // Temporarily store incoming connections so they can be destroyed if the server is
+ // closed before the connection is passed off to a Torrent.
+ this._pendingConns = []
- self._onListening = function () {
- self._client._onListening()
- }
+ this._onConnectionBound = conn => {
+ this._onConnection(conn)
+ }
- self._onError = function (err) {
- self._client._destroy(err)
- }
+ this._onListening = () => {
+ this._client._onListening()
+ }
- self.server.on('connection', self._onConnectionBound)
- self.server.on('listening', self._onListening)
- self.server.on('error', self._onError)
+ this._onError = err => {
+ this._client._destroy(err)
+ }
- self.server.listen(client.torrentPort)
-}
+ this.server.on('connection', this._onConnectionBound)
+ this.server.on('listening', this._onListening)
+ this.server.on('error', this._onError)
-/**
- * Destroy this TCP pool.
- * @param {function} cb
- */
-TCPPool.prototype.destroy = function (cb) {
- var self = this
- debug('destroy tcp pool')
-
- self.server.removeListener('connection', self._onConnectionBound)
- self.server.removeListener('listening', self._onListening)
- self.server.removeListener('error', self._onError)
-
- // Destroy all open connection objects so server can close gracefully without waiting
- // for connection timeout or remote peer to disconnect.
- self._pendingConns.forEach(function (conn) {
- conn.on('error', noop)
- conn.destroy()
- })
-
- try {
- self.server.close(cb)
- } catch (err) {
- if (cb) process.nextTick(cb)
+ this.server.listen(client.torrentPort)
}
- self.server = null
- self._client = null
- self._pendingConns = null
-}
+ /**
+ * Destroy this TCP pool.
+ * @param {function} cb
+ */
+ destroy (cb) {
+ debug('destroy tcp pool')
+
+ this.server.removeListener('connection', this._onConnectionBound)
+ this.server.removeListener('listening', this._onListening)
+ this.server.removeListener('error', this._onError)
+
+ // Destroy all open connection objects so server can close gracefully without waiting
+ // for connection timeout or remote peer to disconnect.
+ this._pendingConns.forEach(conn => {
+ conn.on('error', noop)
+ conn.destroy()
+ })
+
+ try {
+ this.server.close(cb)
+ } catch (err) {
+ if (cb) process.nextTick(cb)
+ }
-/**
- * On incoming connections, we expect the remote peer to send a handshake first. Based
- * on the infoHash in that handshake, route the peer to the right swarm.
- */
-TCPPool.prototype._onConnection = function (conn) {
- var self = this
-
- // If the connection has already been closed before the `connect` event is fired,
- // then `remoteAddress` will not be available, and we can't use this connection.
- // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566
- // - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398
- if (!conn.remoteAddress) {
- conn.on('error', noop)
- conn.destroy()
- return
+ this.server = null
+ this._client = null
+ this._pendingConns = null
}
- self._pendingConns.push(conn)
- conn.once('close', cleanupPending)
+ /**
+ * On incoming connections, we expect the remote peer to send a handshake first. Based
+ * on the infoHash in that handshake, route the peer to the right swarm.
+ */
+ _onConnection (conn) {
+ const self = this
+
+ // If the connection has already been closed before the `connect` event is fired,
+ // then `remoteAddress` will not be available, and we can't use this connection.
+ // - Node.js issue: https://github.com/nodejs/node-v0.x-archive/issues/7566
+ // - WebTorrent issue: https://github.com/webtorrent/webtorrent/issues/398
+ if (!conn.remoteAddress) {
+ conn.on('error', noop)
+ conn.destroy()
+ return
+ }
+
+ self._pendingConns.push(conn)
+ conn.once('close', cleanupPending)
- var peer = Peer.createTCPIncomingPeer(conn)
+ const peer = Peer.createTCPIncomingPeer(conn)
- var wire = peer.wire
- wire.once('handshake', onHandshake)
+ const wire = peer.wire
+ wire.once('handshake', onHandshake)
- function onHandshake (infoHash, peerId) {
- cleanupPending()
+ function onHandshake (infoHash, peerId) {
+ cleanupPending()
- var torrent = self._client.get(infoHash)
- if (torrent) {
- peer.swarm = torrent
- torrent._addIncomingPeer(peer)
- peer.onHandshake(infoHash, peerId)
- } else {
- var err = new Error(
- 'Unexpected info hash ' + infoHash + ' from incoming peer ' + peer.id
- )
- peer.destroy(err)
+ const torrent = self._client.get(infoHash)
+ if (torrent) {
+ peer.swarm = torrent
+ torrent._addIncomingPeer(peer)
+ peer.onHandshake(infoHash, peerId)
+ } else {
+ const err = new Error(
+ `Unexpected info hash ${infoHash} from incoming peer ${peer.id}`
+ )
+ peer.destroy(err)
+ }
}
- }
- function cleanupPending () {
- conn.removeListener('close', cleanupPending)
- wire.removeListener('handshake', onHandshake)
- if (self._pendingConns) {
- arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn))
+ function cleanupPending () {
+ conn.removeListener('close', cleanupPending)
+ wire.removeListener('handshake', onHandshake)
+ if (self._pendingConns) {
+ arrayRemove(self._pendingConns, self._pendingConns.indexOf(conn))
+ }
}
}
}
function noop () {}
+
+module.exports = TCPPool
diff --git a/lib/torrent.js b/lib/torrent.js
index ea422e0..a32326a 100644
--- a/lib/torrent.js
+++ b/lib/torrent.js
@@ -1,613 +1,572 @@
/* global URL, Blob */
-module.exports = Torrent
-
-var addrToIPPort = require('addr-to-ip-port')
-var BitField = require('bitfield')
-var ChunkStoreWriteStream = require('chunk-store-stream/write')
-var debug = require('debug')('webtorrent:torrent')
-var Discovery = require('torrent-discovery')
-var EventEmitter = require('events').EventEmitter
-var extend = require('xtend')
-var extendMutable = require('xtend/mutable')
-var fs = require('fs')
-var FSChunkStore = require('fs-chunk-store') // browser: `memory-chunk-store`
-var get = require('simple-get')
-var ImmediateChunkStore = require('immediate-chunk-store')
-var inherits = require('inherits')
-var MultiStream = require('multistream')
-var net = require('net') // browser exclude
-var os = require('os') // browser exclude
-var parallel = require('run-parallel')
-var parallelLimit = require('run-parallel-limit')
-var parseTorrent = require('parse-torrent')
-var path = require('path')
-var Piece = require('torrent-piece')
-var pump = require('pump')
-var randomIterate = require('random-iterate')
-var sha1 = require('simple-sha1')
-var speedometer = require('speedometer')
-var uniq = require('uniq')
-var utMetadata = require('ut_metadata')
-var utPex = require('ut_pex') // browser exclude
-var parseRange = require('parse-numeric-range')
-
-var File = require('./file')
-var Peer = require('./peer')
-var RarityMap = require('./rarity-map')
-var Server = require('./server') // browser exclude
-
-var MAX_BLOCK_LENGTH = 128 * 1024
-var PIECE_TIMEOUT = 30000
-var CHOKE_TIMEOUT = 5000
-var SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH
-
-var PIPELINE_MIN_DURATION = 0.5
-var PIPELINE_MAX_DURATION = 1
-
-var RECHOKE_INTERVAL = 10000 // 10 seconds
-var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds
-
-var FILESYSTEM_CONCURRENCY = 2
-
-var RECONNECT_WAIT = [ 1000, 5000, 15000 ]
-
-var VERSION = require('../package.json').version
-var USER_AGENT = 'WebTorrent/' + VERSION + ' (https://webtorrent.io)'
-
-var TMP
+const addrToIPPort = require('addr-to-ip-port')
+const BitField = require('bitfield')
+const ChunkStoreWriteStream = require('chunk-store-stream/write')
+const debug = require('debug')('webtorrent:torrent')
+const Discovery = require('torrent-discovery')
+const EventEmitter = require('events').EventEmitter
+const extend = require('xtend')
+const extendMutable = require('xtend/mutable')
+const fs = require('fs')
+const FSChunkStore = require('fs-chunk-store') // browser: `memory-chunk-store`
+const get = require('simple-get')
+const ImmediateChunkStore = require('immediate-chunk-store')
+const MultiStream = require('multistream')
+const net = require('net') // browser exclude
+const os = require('os') // browser exclude
+const parallel = require('run-parallel')
+const parallelLimit = require('run-parallel-limit')
+const parseTorrent = require('parse-torrent')
+const path = require('path')
+const Piece = require('torrent-piece')
+const pump = require('pump')
+const randomIterate = require('random-iterate')
+const sha1 = require('simple-sha1')
+const speedometer = require('speedometer')
+const uniq = require('uniq')
+const utMetadata = require('ut_metadata')
+const utPex = require('ut_pex') // browser exclude
+const parseRange = require('parse-numeric-range')
+
+const File = require('./file')
+const Peer = require('./peer')
+const RarityMap = require('./rarity-map')
+const Server = require('./server') // browser exclude
+
+const MAX_BLOCK_LENGTH = 128 * 1024
+const PIECE_TIMEOUT = 30000
+const CHOKE_TIMEOUT = 5000
+const SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH
+
+const PIPELINE_MIN_DURATION = 0.5
+const PIPELINE_MAX_DURATION = 1
+
+const RECHOKE_INTERVAL = 10000 // 10 seconds
+const RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds
+
+const FILESYSTEM_CONCURRENCY = 2
+
+const RECONNECT_WAIT = [ 1000, 5000, 15000 ]
+
+const VERSION = require('../package.json').version
+const USER_AGENT = `WebTorrent/${VERSION} (https://webtorrent.io)`
+
+let TMP
try {
TMP = path.join(fs.statSync('/tmp') && '/tmp', 'webtorrent')
} catch (err) {
TMP = path.join(typeof os.tmpdir === 'function' ? os.tmpdir() : '/', 'webtorrent')
}
-inherits(Torrent, EventEmitter)
+class Torrent extends EventEmitter {
+ constructor (torrentId, client, opts) {
+ super()
-function Torrent (torrentId, client, opts) {
- EventEmitter.call(this)
+ this._debugId = 'unknown infohash'
+ this.client = client
- this._debugId = 'unknown infohash'
- this.client = client
+ this.announce = opts.announce
+ this.urlList = opts.urlList
- this.announce = opts.announce
- this.urlList = opts.urlList
+ this.path = opts.path
+ this.skipVerify = !!opts.skipVerify
+ this._store = opts.store || FSChunkStore
+ this._getAnnounceOpts = opts.getAnnounceOpts
- this.path = opts.path
- this.skipVerify = !!opts.skipVerify
- this._store = opts.store || FSChunkStore
- this._getAnnounceOpts = opts.getAnnounceOpts
+ this.strategy = opts.strategy || 'sequential'
- this.strategy = opts.strategy || 'sequential'
+ this.maxWebConns = opts.maxWebConns || 4
- this.maxWebConns = opts.maxWebConns || 4
+ this._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0)
+ ? 0
+ : (+opts.uploads || 10)
+ this._rechokeOptimisticWire = null
+ this._rechokeOptimisticTime = 0
+ this._rechokeIntervalId = null
- this._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0)
- ? 0
- : (+opts.uploads || 10)
- this._rechokeOptimisticWire = null
- this._rechokeOptimisticTime = 0
- this._rechokeIntervalId = null
+ this.ready = false
+ this.destroyed = false
+ this.paused = false
+ this.done = false
- this.ready = false
- this.destroyed = false
- this.paused = false
- this.done = false
+ this.metadata = null
+ this.store = null
+ this.files = []
+ this.pieces = []
- this.metadata = null
- this.store = null
- this.files = []
- this.pieces = []
+ this._amInterested = false
+ this._selections = []
+ this._critical = []
- this._amInterested = false
- this._selections = []
- this._critical = []
+ this.wires = [] // open wires (added *after* handshake)
- this.wires = [] // open wires (added *after* handshake)
+ this._queue = [] // queue of outgoing tcp peers to connect to
+ this._peers = {} // connected peers (addr/peerId -> Peer)
+ this._peersLength = 0 // number of elements in `this._peers` (cache, for perf)
- this._queue = [] // queue of outgoing tcp peers to connect to
- this._peers = {} // connected peers (addr/peerId -> Peer)
- this._peersLength = 0 // number of elements in `this._peers` (cache, for perf)
+ // stats
+ this.received = 0
+ this.uploaded = 0
+ this._downloadSpeed = speedometer()
+ this._uploadSpeed = speedometer()
- // stats
- this.received = 0
- this.uploaded = 0
- this._downloadSpeed = speedometer()
- this._uploadSpeed = speedometer()
+ // for cleanup
+ this._servers = []
+ this._xsRequests = []
- // for cleanup
- this._servers = []
- this._xsRequests = []
+ // TODO: remove this and expose a hook instead
+ // optimization: don't recheck every file if it hasn't changed
+ this._fileModtimes = opts.fileModtimes
- // TODO: remove this and expose a hook instead
- // optimization: don't recheck every file if it hasn't changed
- this._fileModtimes = opts.fileModtimes
+ if (torrentId !== null) this._onTorrentId(torrentId)
- if (torrentId !== null) this._onTorrentId(torrentId)
-
- this._debug('new torrent')
-}
+ this._debug('new torrent')
+ }
-Object.defineProperty(Torrent.prototype, 'timeRemaining', {
- get: function () {
+ get timeRemaining () {
if (this.done) return 0
if (this.downloadSpeed === 0) return Infinity
return ((this.length - this.downloaded) / this.downloadSpeed) * 1000
}
-})
-Object.defineProperty(Torrent.prototype, 'downloaded', {
- get: function () {
+ get downloaded () {
if (!this.bitfield) return 0
- var downloaded = 0
- for (var index = 0, len = this.pieces.length; index < len; ++index) {
+ let downloaded = 0
+ for (let index = 0, len = this.pieces.length; index < len; ++index) {
if (this.bitfield.get(index)) { // verified data
downloaded += (index === len - 1) ? this.lastPieceLength : this.pieceLength
} else { // "in progress" data
- var piece = this.pieces[index]
+ const piece = this.pieces[index]
downloaded += (piece.length - piece.missing)
}
}
return downloaded
}
-})
-
-// TODO: re-enable this. The number of missing pieces. Used to implement 'end game' mode.
-// Object.defineProperty(Storage.prototype, 'numMissing', {
-// get: function () {
-// var self = this
-// var numMissing = self.pieces.length
-// for (var index = 0, len = self.pieces.length; index < len; index++) {
-// numMissing -= self.bitfield.get(index)
-// }
-// return numMissing
-// }
-// })
-
-Object.defineProperty(Torrent.prototype, 'downloadSpeed', {
- get: function () { return this._downloadSpeed() }
-})
-
-Object.defineProperty(Torrent.prototype, 'uploadSpeed', {
- get: function () { return this._uploadSpeed() }
-})
-
-Object.defineProperty(Torrent.prototype, 'progress', {
- get: function () { return this.length ? this.downloaded / this.length : 0 }
-})
-
-Object.defineProperty(Torrent.prototype, 'ratio', {
- get: function () { return this.uploaded / (this.received || 1) }
-})
-
-Object.defineProperty(Torrent.prototype, 'numPeers', {
- get: function () { return this.wires.length }
-})
-
-Object.defineProperty(Torrent.prototype, 'torrentFileBlobURL', {
- get: function () {
+
+ // TODO: re-enable this. The number of missing pieces. Used to implement 'end game' mode.
+ // Object.defineProperty(Storage.prototype, 'numMissing', {
+ // get: function () {
+ // var self = this
+ // var numMissing = self.pieces.length
+ // for (var index = 0, len = self.pieces.length; index < len; index++) {
+ // numMissing -= self.bitfield.get(index)
+ // }
+ // return numMissing
+ // }
+ // })
+
+ get downloadSpeed () { return this._downloadSpeed() }
+
+ get uploadSpeed () { return this._uploadSpeed() }
+
+ get progress () { return this.length ? this.downloaded / this.length : 0 }
+
+ get ratio () { return this.uploaded / (this.received || 1) }
+
+ get numPeers () { return this.wires.length }
+
+ get torrentFileBlobURL () {
if (typeof window === 'undefined') throw new Error('browser-only property')
if (!this.torrentFile) return null
return URL.createObjectURL(
new Blob([ this.torrentFile ], { type: 'application/x-bittorrent' })
)
}
-})
-Object.defineProperty(Torrent.prototype, '_numQueued', {
- get: function () {
+ get _numQueued () {
return this._queue.length + (this._peersLength - this._numConns)
}
-})
-Object.defineProperty(Torrent.prototype, '_numConns', {
- get: function () {
- var self = this
- var numConns = 0
- for (var id in self._peers) {
+ get _numConns () {
+ const self = this
+ let numConns = 0
+ for (const id in self._peers) {
if (self._peers[id].connected) numConns += 1
}
return numConns
}
-})
-// TODO: remove in v1
-Object.defineProperty(Torrent.prototype, 'swarm', {
- get: function () {
+ // TODO: remove in v1
+ get swarm () {
console.warn('WebTorrent: `torrent.swarm` is deprecated. Use `torrent` directly instead.')
return this
}
-})
-
-Torrent.prototype._onTorrentId = function (torrentId) {
- var self = this
- if (self.destroyed) return
-
- var parsedTorrent
- try { parsedTorrent = parseTorrent(torrentId) } catch (err) {}
- if (parsedTorrent) {
- // Attempt to set infoHash property synchronously
- self.infoHash = parsedTorrent.infoHash
- self._debugId = parsedTorrent.infoHash.toString('hex').substring(0, 7)
- process.nextTick(function () {
- if (self.destroyed) return
- self._onParsedTorrent(parsedTorrent)
- })
- } else {
- // If torrentId failed to parse, it could be in a form that requires an async
- // operation, i.e. http/https link, filesystem path, or Blob.
- parseTorrent.remote(torrentId, function (err, parsedTorrent) {
- if (self.destroyed) return
- if (err) return self._destroy(err)
- self._onParsedTorrent(parsedTorrent)
- })
- }
-}
-
-Torrent.prototype._onParsedTorrent = function (parsedTorrent) {
- var self = this
- if (self.destroyed) return
- self._processParsedTorrent(parsedTorrent)
+ _onTorrentId (torrentId) {
+ const self = this
+ if (self.destroyed) return
- if (!self.infoHash) {
- return self._destroy(new Error('Malformed torrent data: No info hash'))
+ let parsedTorrent
+ try { parsedTorrent = parseTorrent(torrentId) } catch (err) {}
+ if (parsedTorrent) {
+ // Attempt to set infoHash property synchronously
+ self.infoHash = parsedTorrent.infoHash
+ self._debugId = parsedTorrent.infoHash.toString('hex').substring(0, 7)
+ process.nextTick(() => {
+ if (self.destroyed) return
+ self._onParsedTorrent(parsedTorrent)
+ })
+ } else {
+ // If torrentId failed to parse, it could be in a form that requires an async
+ // operation, i.e. http/https link, filesystem path, or Blob.
+ parseTorrent.remote(torrentId, (err, parsedTorrent) => {
+ if (self.destroyed) return
+ if (err) return self._destroy(err)
+ self._onParsedTorrent(parsedTorrent)
+ })
+ }
}
- if (!self.path) self.path = path.join(TMP, self.infoHash)
+ _onParsedTorrent (parsedTorrent) {
+ const self = this
+ if (self.destroyed) return
- self._rechokeIntervalId = setInterval(function () {
- self._rechoke()
- }, RECHOKE_INTERVAL)
- if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref()
+ self._processParsedTorrent(parsedTorrent)
- // Private 'infoHash' event allows client.add to check for duplicate torrents and
- // destroy them before the normal 'infoHash' event is emitted. Prevents user
- // applications from needing to deal with duplicate 'infoHash' events.
- self.emit('_infoHash', self.infoHash)
- if (self.destroyed) return
+ if (!self.infoHash) {
+ return self._destroy(new Error('Malformed torrent data: No info hash'))
+ }
- self.emit('infoHash', self.infoHash)
- if (self.destroyed) return // user might destroy torrent in event handler
+ if (!self.path) self.path = path.join(TMP, self.infoHash)
- if (self.client.listening) {
- self._onListening()
- } else {
- self.client.once('listening', function () {
- self._onListening()
- })
- }
-}
+ self._rechokeIntervalId = setInterval(() => {
+ self._rechoke()
+ }, RECHOKE_INTERVAL)
+ if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref()
-Torrent.prototype._processParsedTorrent = function (parsedTorrent) {
- this._debugId = parsedTorrent.infoHash.toString('hex').substring(0, 7)
+ // Private 'infoHash' event allows client.add to check for duplicate torrents and
+ // destroy them before the normal 'infoHash' event is emitted. Prevents user
+ // applications from needing to deal with duplicate 'infoHash' events.
+ self.emit('_infoHash', self.infoHash)
+ if (self.destroyed) return
- if (this.announce) {
- // Allow specifying trackers via `opts` parameter
- parsedTorrent.announce = parsedTorrent.announce.concat(this.announce)
- }
+ self.emit('infoHash', self.infoHash)
+ if (self.destroyed) return // user might destroy torrent in event handler
- if (this.client.tracker && global.WEBTORRENT_ANNOUNCE && !this.private) {
- // So `webtorrent-hybrid` can force specific trackers to be used
- parsedTorrent.announce = parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE)
+ if (self.client.listening) {
+ self._onListening()
+ } else {
+ self.client.once('listening', () => {
+ self._onListening()
+ })
+ }
}
- if (this.urlList) {
- // Allow specifying web seeds via `opts` parameter
- parsedTorrent.urlList = parsedTorrent.urlList.concat(this.urlList)
- }
+ _processParsedTorrent (parsedTorrent) {
+ this._debugId = parsedTorrent.infoHash.toString('hex').substring(0, 7)
- uniq(parsedTorrent.announce)
- uniq(parsedTorrent.urlList)
+ if (this.announce) {
+ // Allow specifying trackers via `opts` parameter
+ parsedTorrent.announce = parsedTorrent.announce.concat(this.announce)
+ }
- extendMutable(this, parsedTorrent)
+ if (this.client.tracker && global.WEBTORRENT_ANNOUNCE && !this.private) {
+ // So `webtorrent-hybrid` can force specific trackers to be used
+ parsedTorrent.announce = parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE)
+ }
- this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent)
- this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent)
-}
+ if (this.urlList) {
+ // Allow specifying web seeds via `opts` parameter
+ parsedTorrent.urlList = parsedTorrent.urlList.concat(this.urlList)
+ }
-Torrent.prototype._onListening = function () {
- var self = this
- if (self.discovery || self.destroyed) return
-
- var trackerOpts = self.client.tracker
- if (trackerOpts) {
- trackerOpts = extend(self.client.tracker, {
- getAnnounceOpts: function () {
- var opts = {
- uploaded: self.uploaded,
- downloaded: self.downloaded,
- left: Math.max(self.length - self.downloaded, 0)
- }
- if (self.client.tracker.getAnnounceOpts) {
- extendMutable(opts, self.client.tracker.getAnnounceOpts())
- }
- if (self._getAnnounceOpts) {
- // TODO: consider deprecating this, as it's redundant with the former case
- extendMutable(opts, self._getAnnounceOpts())
- }
- return opts
- }
- })
- }
+ uniq(parsedTorrent.announce)
+ uniq(parsedTorrent.urlList)
- // begin discovering peers via DHT and trackers
- self.discovery = new Discovery({
- infoHash: self.infoHash,
- announce: self.announce,
- peerId: self.client.peerId,
- dht: !self.private && self.client.dht,
- tracker: trackerOpts,
- port: self.client.torrentPort,
- userAgent: USER_AGENT
- })
-
- self.discovery.on('error', onError)
- self.discovery.on('peer', onPeer)
- self.discovery.on('trackerAnnounce', onTrackerAnnounce)
- self.discovery.on('dhtAnnounce', onDHTAnnounce)
- self.discovery.on('warning', onWarning)
-
- function onError (err) {
- self._destroy(err)
- }
+ extendMutable(this, parsedTorrent)
- function onPeer (peer) {
- // Don't create new outgoing TCP connections when torrent is done
- if (typeof peer === 'string' && self.done) return
- self.addPeer(peer)
+ this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent)
+ this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent)
}
- function onTrackerAnnounce () {
- self.emit('trackerAnnounce')
- if (self.numPeers === 0) self.emit('noPeers', 'tracker')
- }
+ _onListening () {
+ const self = this
+ if (self.discovery || self.destroyed) return
- function onDHTAnnounce () {
- self.emit('dhtAnnounce')
- if (self.numPeers === 0) self.emit('noPeers', 'dht')
- }
+ let trackerOpts = self.client.tracker
+ if (trackerOpts) {
+ trackerOpts = extend(self.client.tracker, {
+ getAnnounceOpts () {
+ const opts = {
+ uploaded: self.uploaded,
+ downloaded: self.downloaded,
+ left: Math.max(self.length - self.downloaded, 0)
+ }
+ if (self.client.tracker.getAnnounceOpts) {
+ extendMutable(opts, self.client.tracker.getAnnounceOpts())
+ }
+ if (self._getAnnounceOpts) {
+ // TODO: consider deprecating this, as it's redundant with the former case
+ extendMutable(opts, self._getAnnounceOpts())
+ }
+ return opts
+ }
+ })
+ }
- function onWarning (err) {
- self.emit('warning', err)
- }
+ // begin discovering peers via DHT and trackers
+ self.discovery = new Discovery({
+ infoHash: self.infoHash,
+ announce: self.announce,
+ peerId: self.client.peerId,
+ dht: !self.private && self.client.dht,
+ tracker: trackerOpts,
+ port: self.client.torrentPort,
+ userAgent: USER_AGENT
+ })
- if (self.info) {
- // if full metadata was included in initial torrent id, use it immediately. Otherwise,
- // wait for torrent-discovery to find peers and ut_metadata to get the metadata.
- self._onMetadata(self)
- } else if (self.xs) {
- self._getMetadataFromServer()
- }
-}
+ self.discovery.on('error', onError)
+ self.discovery.on('peer', onPeer)
+ self.discovery.on('trackerAnnounce', onTrackerAnnounce)
+ self.discovery.on('dhtAnnounce', onDHTAnnounce)
+ self.discovery.on('warning', onWarning)
-Torrent.prototype._getMetadataFromServer = function () {
- var self = this
- var urls = Array.isArray(self.xs) ? self.xs : [ self.xs ]
+ function onError (err) {
+ self._destroy(err)
+ }
- var tasks = urls.map(function (url) {
- return function (cb) {
- getMetadataFromURL(url, cb)
+ function onPeer (peer) {
+ // Don't create new outgoing TCP connections when torrent is done
+ if (typeof peer === 'string' && self.done) return
+ self.addPeer(peer)
}
- })
- parallel(tasks)
- function getMetadataFromURL (url, cb) {
- if (url.indexOf('http://') !== 0 && url.indexOf('https://') !== 0) {
- self.emit('warning', new Error('skipping non-http xs param: ' + url))
- return cb(null)
+ function onTrackerAnnounce () {
+ self.emit('trackerAnnounce')
+ if (self.numPeers === 0) self.emit('noPeers', 'tracker')
}
- var opts = {
- url: url,
- method: 'GET',
- headers: {
- 'user-agent': USER_AGENT
- }
+ function onDHTAnnounce () {
+ self.emit('dhtAnnounce')
+ if (self.numPeers === 0) self.emit('noPeers', 'dht')
}
- var req
- try {
- req = get.concat(opts, onResponse)
- } catch (err) {
- self.emit('warning', new Error('skipping invalid url xs param: ' + url))
- return cb(null)
+
+ function onWarning (err) {
+ self.emit('warning', err)
}
- self._xsRequests.push(req)
+ if (self.info) {
+ // if full metadata was included in initial torrent id, use it immediately. Otherwise,
+ // wait for torrent-discovery to find peers and ut_metadata to get the metadata.
+ self._onMetadata(self)
+ } else if (self.xs) {
+ self._getMetadataFromServer()
+ }
+ }
- function onResponse (err, res, torrent) {
- if (self.destroyed) return cb(null)
- if (self.metadata) return cb(null)
+ _getMetadataFromServer () {
+ const self = this
+ const urls = Array.isArray(self.xs) ? self.xs : [ self.xs ]
- if (err) {
- self.emit('warning', new Error('http error from xs param: ' + url))
- return cb(null)
- }
- if (res.statusCode !== 200) {
- self.emit('warning', new Error('non-200 status code ' + res.statusCode + ' from xs param: ' + url))
+ const tasks = urls.map(url => cb => {
+ getMetadataFromURL(url, cb)
+ })
+ parallel(tasks)
+
+ function getMetadataFromURL (url, cb) {
+ if (url.indexOf('http://') !== 0 && url.indexOf('https://') !== 0) {
+ self.emit('warning', new Error(`skipping non-http xs param: ${url}`))
return cb(null)
}
- var parsedTorrent
+ const opts = {
+ url,
+ method: 'GET',
+ headers: {
+ 'user-agent': USER_AGENT
+ }
+ }
+ let req
try {
- parsedTorrent = parseTorrent(torrent)
- } catch (err) {}
-
- if (!parsedTorrent) {
- self.emit('warning', new Error('got invalid torrent file from xs param: ' + url))
+ req = get.concat(opts, onResponse)
+ } catch (err) {
+ self.emit('warning', new Error(`skipping invalid url xs param: ${url}`))
return cb(null)
}
- if (parsedTorrent.infoHash !== self.infoHash) {
- self.emit('warning', new Error('got torrent file with incorrect info hash from xs param: ' + url))
- return cb(null)
- }
+ self._xsRequests.push(req)
- self._onMetadata(parsedTorrent)
- cb(null)
- }
- }
-}
+ function onResponse (err, res, torrent) {
+ if (self.destroyed) return cb(null)
+ if (self.metadata) return cb(null)
-/**
- * Called when the full torrent metadata is received.
- */
-Torrent.prototype._onMetadata = function (metadata) {
- var self = this
- if (self.metadata || self.destroyed) return
- self._debug('got metadata')
-
- self._xsRequests.forEach(function (req) {
- req.abort()
- })
- self._xsRequests = []
-
- var parsedTorrent
- if (metadata && metadata.infoHash) {
- // `metadata` is a parsed torrent (from parse-torrent module)
- parsedTorrent = metadata
- } else {
- try {
- parsedTorrent = parseTorrent(metadata)
- } catch (err) {
- return self._destroy(err)
+ if (err) {
+ self.emit('warning', new Error(`http error from xs param: ${url}`))
+ return cb(null)
+ }
+ if (res.statusCode !== 200) {
+ self.emit('warning', new Error(`non-200 status code ${res.statusCode} from xs param: ${url}`))
+ return cb(null)
+ }
+
+ let parsedTorrent
+ try {
+ parsedTorrent = parseTorrent(torrent)
+ } catch (err) {}
+
+ if (!parsedTorrent) {
+ self.emit('warning', new Error(`got invalid torrent file from xs param: ${url}`))
+ return cb(null)
+ }
+
+ if (parsedTorrent.infoHash !== self.infoHash) {
+ self.emit('warning', new Error(`got torrent file with incorrect info hash from xs param: ${url}`))
+ return cb(null)
+ }
+
+ self._onMetadata(parsedTorrent)
+ cb(null)
+ }
}
}
- self._processParsedTorrent(parsedTorrent)
- self.metadata = self.torrentFile
+ /**
+ * Called when the full torrent metadata is received.
+ */
+ _onMetadata (metadata) {
+ const self = this
+ if (self.metadata || self.destroyed) return
+ self._debug('got metadata')
- // add web seed urls (BEP19)
- if (self.client.enableWebSeeds) {
- self.urlList.forEach(function (url) {
- self.addWebSeed(url)
+ self._xsRequests.forEach(req => {
+ req.abort()
})
- }
+ self._xsRequests = []
+
+ let parsedTorrent
+ if (metadata && metadata.infoHash) {
+ // `metadata` is a parsed torrent (from parse-torrent module)
+ parsedTorrent = metadata
+ } else {
+ try {
+ parsedTorrent = parseTorrent(metadata)
+ } catch (err) {
+ return self._destroy(err)
+ }
+ }
+
+ self._processParsedTorrent(parsedTorrent)
+ self.metadata = self.torrentFile
+
+ // add web seed urls (BEP19)
+ if (self.client.enableWebSeeds) {
+ self.urlList.forEach(url => {
+ self.addWebSeed(url)
+ })
+ }
- self._rarityMap = new RarityMap(self)
+ self._rarityMap = new RarityMap(self)
- self.store = new ImmediateChunkStore(
- new self._store(self.pieceLength, {
- torrent: {
- infoHash: self.infoHash
- },
- files: self.files.map(function (file) {
- return {
+ self.store = new ImmediateChunkStore(
+ new self._store(self.pieceLength, {
+ torrent: {
+ infoHash: self.infoHash
+ },
+ files: self.files.map(file => ({
path: path.join(self.path, file.path),
length: file.length,
offset: file.offset
- }
- }),
- length: self.length
- })
- )
+ })),
+ length: self.length
+ })
+ )
- self.files = self.files.map(function (file) {
- return new File(self, file)
- })
+ self.files = self.files.map(file => new File(self, file))
- // Select only specified files (BEP53) http://www.bittorrent.org/beps/bep_0053.html
- if (self.so) {
- var selectOnlyFiles = parseRange.parse(self.so)
+ // Select only specified files (BEP53) http://www.bittorrent.org/beps/bep_0053.html
+ if (self.so) {
+ const selectOnlyFiles = parseRange.parse(self.so)
- self.files.forEach(function (v, i) {
- if (selectOnlyFiles.includes(i)) self.files[i].select(true)
- })
- } else {
- // start off selecting the entire torrent with low priority
- if (self.pieces.length !== 0) {
- self.select(0, self.pieces.length - 1, false)
+ self.files.forEach((v, i) => {
+ if (selectOnlyFiles.includes(i)) self.files[i].select(true)
+ })
+ } else {
+ // start off selecting the entire torrent with low priority
+ if (self.pieces.length !== 0) {
+ self.select(0, self.pieces.length - 1, false)
+ }
}
- }
- self._hashes = self.pieces
-
- self.pieces = self.pieces.map(function (hash, i) {
- var pieceLength = (i === self.pieces.length - 1)
- ? self.lastPieceLength
- : self.pieceLength
- return new Piece(pieceLength)
- })
-
- self._reservations = self.pieces.map(function () {
- return []
- })
-
- self.bitfield = new BitField(self.pieces.length)
-
- self.wires.forEach(function (wire) {
- // If we didn't have the metadata at the time ut_metadata was initialized for this
- // wire, we still want to make it available to the peer in case they request it.
- if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata)
-
- self._onWireWithMetadata(wire)
- })
-
- if (self.skipVerify) {
- // Skip verifying exisitng data and just assume it's correct
- self._markAllVerified()
- self._onStore()
- } else {
- self._debug('verifying existing torrent data')
- if (self._fileModtimes && self._store === FSChunkStore) {
- // don't verify if the files haven't been modified since we last checked
- self.getFileModtimes(function (err, fileModtimes) {
- if (err) return self._destroy(err)
+ self._hashes = self.pieces
- var unchanged = self.files.map(function (_, index) {
- return fileModtimes[index] === self._fileModtimes[index]
- }).every(function (x) {
- return x
- })
+ self.pieces = self.pieces.map((hash, i) => {
+ const pieceLength = (i === self.pieces.length - 1)
+ ? self.lastPieceLength
+ : self.pieceLength
+ return new Piece(pieceLength)
+ })
- if (unchanged) {
- self._markAllVerified()
- self._onStore()
- } else {
- self._verifyPieces()
- }
- })
+ self._reservations = self.pieces.map(() => [])
+
+ self.bitfield = new BitField(self.pieces.length)
+
+ self.wires.forEach(wire => {
+ // If we didn't have the metadata at the time ut_metadata was initialized for this
+ // wire, we still want to make it available to the peer in case they request it.
+ if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata)
+
+ self._onWireWithMetadata(wire)
+ })
+
+ if (self.skipVerify) {
+ // Skip verifying exisitng data and just assume it's correct
+ self._markAllVerified()
+ self._onStore()
} else {
- self._verifyPieces()
+ self._debug('verifying existing torrent data')
+ if (self._fileModtimes && self._store === FSChunkStore) {
+ // don't verify if the files haven't been modified since we last checked
+ self.getFileModtimes((err, fileModtimes) => {
+ if (err) return self._destroy(err)
+
+ const unchanged = self.files.map((_, index) => fileModtimes[index] === self._fileModtimes[index]).every(x => x)
+
+ if (unchanged) {
+ self._markAllVerified()
+ self._onStore()
+ } else {
+ self._verifyPieces()
+ }
+ })
+ } else {
+ self._verifyPieces()
+ }
}
- }
- self.emit('metadata')
-}
+ self.emit('metadata')
+ }
-/*
- * TODO: remove this
- * Gets the last modified time of every file on disk for this torrent.
- * Only valid in Node, not in the browser.
- */
-Torrent.prototype.getFileModtimes = function (cb) {
- var self = this
- var ret = []
- parallelLimit(self.files.map(function (file, index) {
- return function (cb) {
- fs.stat(path.join(self.path, file.path), function (err, stat) {
+ /*
+ * TODO: remove this
+ * Gets the last modified time of every file on disk for this torrent.
+ * Only valid in Node, not in the browser.
+ */
+ getFileModtimes (cb) {
+ const self = this
+ const ret = []
+ parallelLimit(self.files.map((file, index) => cb => {
+ fs.stat(path.join(self.path, file.path), (err, stat) => {
if (err && err.code !== 'ENOENT') return cb(err)
ret[index] = stat && stat.mtime.getTime()
cb(null)
})
- }
- }), FILESYSTEM_CONCURRENCY, function (err) {
- self._debug('done getting file modtimes')
- cb(err, ret)
- })
-}
+ }), FILESYSTEM_CONCURRENCY, err => {
+ self._debug('done getting file modtimes')
+ cb(err, ret)
+ })
+ }
-Torrent.prototype._verifyPieces = function () {
- var self = this
- parallelLimit(self.pieces.map(function (_, index) {
- return function (cb) {
+ _verifyPieces () {
+ const self = this
+ parallelLimit(self.pieces.map((_, index) => cb => {
if (self.destroyed) return cb(new Error('torrent is destroyed'))
- self.store.get(index, function (err, buf) {
+ self.store.get(index, (err, buf) => {
if (self.destroyed) return cb(new Error('torrent is destroyed'))
if (err) return process.nextTick(cb, null) // ignore error
- sha1(buf, function (hash) {
+ sha1(buf, hash => {
if (self.destroyed) return cb(new Error('torrent is destroyed'))
if (hash === self._hashes[index]) {
@@ -620,1138 +579,1132 @@ Torrent.prototype._verifyPieces = function () {
cb(null)
})
})
- }
- }), FILESYSTEM_CONCURRENCY, function (err) {
- if (err) return self._destroy(err)
- self._debug('done verifying')
- self._onStore()
- })
-}
+ }), FILESYSTEM_CONCURRENCY, err => {
+ if (err) return self._destroy(err)
+ self._debug('done verifying')
+ self._onStore()
+ })
+ }
-Torrent.prototype._markAllVerified = function () {
- for (var index = 0; index < this.pieces.length; index++) {
- this._markVerified(index)
+ _markAllVerified () {
+ for (let index = 0; index < this.pieces.length; index++) {
+ this._markVerified(index)
+ }
}
-}
-Torrent.prototype._markVerified = function (index) {
- this.pieces[index] = null
- this._reservations[index] = null
- this.bitfield.set(index, true)
-}
+ _markVerified (index) {
+ this.pieces[index] = null
+ this._reservations[index] = null
+ this.bitfield.set(index, true)
+ }
-/**
- * Called when the metadata, listening server, and underlying chunk store is initialized.
- */
-Torrent.prototype._onStore = function () {
- var self = this
- if (self.destroyed) return
- self._debug('on store')
+ /**
+ * Called when the metadata, listening server, and underlying chunk store is initialized.
+ */
+ _onStore () {
+ const self = this
+ if (self.destroyed) return
+ self._debug('on store')
- self.ready = true
- self.emit('ready')
+ self.ready = true
+ self.emit('ready')
- // Files may start out done if the file was already in the store
- self._checkDone()
+ // Files may start out done if the file was already in the store
+ self._checkDone()
- // In case any selections were made before torrent was ready
- self._updateSelections()
-}
+ // In case any selections were made before torrent was ready
+ self._updateSelections()
+ }
-Torrent.prototype.destroy = function (cb) {
- var self = this
- self._destroy(null, cb)
-}
+ destroy (cb) {
+ const self = this
+ self._destroy(null, cb)
+ }
-Torrent.prototype._destroy = function (err, cb) {
- var self = this
- if (self.destroyed) return
- self.destroyed = true
- self._debug('destroy')
+ _destroy (err, cb) {
+ const self = this
+ if (self.destroyed) return
+ self.destroyed = true
+ self._debug('destroy')
- self.client._remove(self)
+ self.client._remove(self)
- clearInterval(self._rechokeIntervalId)
+ clearInterval(self._rechokeIntervalId)
- self._xsRequests.forEach(function (req) {
- req.abort()
- })
+ self._xsRequests.forEach(req => {
+ req.abort()
+ })
- if (self._rarityMap) {
- self._rarityMap.destroy()
- }
+ if (self._rarityMap) {
+ self._rarityMap.destroy()
+ }
- for (var id in self._peers) {
- self.removePeer(id)
- }
+ for (const id in self._peers) {
+ self.removePeer(id)
+ }
- self.files.forEach(function (file) {
- if (file instanceof File) file._destroy()
- })
+ self.files.forEach(file => {
+ if (file instanceof File) file._destroy()
+ })
- var tasks = self._servers.map(function (server) {
- return function (cb) {
+ const tasks = self._servers.map(server => cb => {
server.destroy(cb)
+ })
+
+ if (self.discovery) {
+ tasks.push(cb => {
+ self.discovery.destroy(cb)
+ })
}
- })
- if (self.discovery) {
- tasks.push(function (cb) {
- self.discovery.destroy(cb)
- })
- }
+ if (self.store) {
+ tasks.push(cb => {
+ self.store.close(cb)
+ })
+ }
- if (self.store) {
- tasks.push(function (cb) {
- self.store.close(cb)
- })
- }
+ parallel(tasks, cb)
+
+ if (err) {
+ // Torrent errors are emitted at `torrent.on('error')`. If there are no 'error'
+ // event handlers on the torrent instance, then the error will be emitted at
+ // `client.on('error')`. This prevents throwing an uncaught exception
+ // (unhandled 'error' event), but it makes it impossible to distinguish client
+ // errors versus torrent errors. Torrent errors are not fatal, and the client
+ // is still usable afterwards. Therefore, always listen for errors in both
+ // places (`client.on('error')` and `torrent.on('error')`).
+ if (self.listenerCount('error') === 0) {
+ self.client.emit('error', err)
+ } else {
+ self.emit('error', err)
+ }
+ }
+
+ self.emit('close')
+
+ self.client = null
+ self.files = []
+ self.discovery = null
+ self.store = null
+ self._rarityMap = null
+ self._peers = null
+ self._servers = null
+ self._xsRequests = null
+ }
+
+ addPeer (peer) {
+ const self = this
+ if (self.destroyed) throw new Error('torrent is destroyed')
+ if (!self.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event')
+
+ if (self.client.blocked) {
+ let host
+ if (typeof peer === 'string') {
+ let parts
+ try {
+ parts = addrToIPPort(peer)
+ } catch (e) {
+ self._debug('ignoring peer: invalid %s', peer)
+ self.emit('invalidPeer', peer)
+ return false
+ }
+ host = parts[0]
+ } else if (typeof peer.remoteAddress === 'string') {
+ host = peer.remoteAddress
+ }
+
+ if (host && self.client.blocked.contains(host)) {
+ self._debug('ignoring peer: blocked %s', peer)
+ if (typeof peer !== 'string') peer.destroy()
+ self.emit('blockedPeer', peer)
+ return false
+ }
+ }
- parallel(tasks, cb)
-
- if (err) {
- // Torrent errors are emitted at `torrent.on('error')`. If there are no 'error'
- // event handlers on the torrent instance, then the error will be emitted at
- // `client.on('error')`. This prevents throwing an uncaught exception
- // (unhandled 'error' event), but it makes it impossible to distinguish client
- // errors versus torrent errors. Torrent errors are not fatal, and the client
- // is still usable afterwards. Therefore, always listen for errors in both
- // places (`client.on('error')` and `torrent.on('error')`).
- if (self.listenerCount('error') === 0) {
- self.client.emit('error', err)
+ const wasAdded = !!self._addPeer(peer)
+ if (wasAdded) {
+ self.emit('peer', peer)
} else {
- self.emit('error', err)
+ self.emit('invalidPeer', peer)
}
+ return wasAdded
}
- self.emit('close')
+ _addPeer (peer) {
+ const self = this
+ if (self.destroyed) {
+ if (typeof peer !== 'string') peer.destroy()
+ return null
+ }
+ if (typeof peer === 'string' && !self._validAddr(peer)) {
+ self._debug('ignoring peer: invalid %s', peer)
+ return null
+ }
- self.client = null
- self.files = []
- self.discovery = null
- self.store = null
- self._rarityMap = null
- self._peers = null
- self._servers = null
- self._xsRequests = null
-}
+ const id = (peer && peer.id) || peer
+ if (self._peers[id]) {
+ self._debug('ignoring peer: duplicate (%s)', id)
+ if (typeof peer !== 'string') peer.destroy()
+ return null
+ }
+
+ if (self.paused) {
+ self._debug('ignoring peer: torrent is paused')
+ if (typeof peer !== 'string') peer.destroy()
+ return null
+ }
-Torrent.prototype.addPeer = function (peer) {
- var self = this
- if (self.destroyed) throw new Error('torrent is destroyed')
- if (!self.infoHash) throw new Error('addPeer() must not be called before the `infoHash` event')
+ self._debug('add peer %s', id)
- if (self.client.blocked) {
- var host
+ let newPeer
if (typeof peer === 'string') {
- var parts
- try {
- parts = addrToIPPort(peer)
- } catch (e) {
- self._debug('ignoring peer: invalid %s', peer)
- self.emit('invalidPeer', peer)
- return false
- }
- host = parts[0]
- } else if (typeof peer.remoteAddress === 'string') {
- host = peer.remoteAddress
+ // `peer` is an addr ("ip:port" string)
+ newPeer = Peer.createTCPOutgoingPeer(peer, self)
+ } else {
+ // `peer` is a WebRTC connection (simple-peer)
+ newPeer = Peer.createWebRTCPeer(peer, self)
}
- if (host && self.client.blocked.contains(host)) {
- self._debug('ignoring peer: blocked %s', peer)
- if (typeof peer !== 'string') peer.destroy()
- self.emit('blockedPeer', peer)
- return false
+ self._peers[newPeer.id] = newPeer
+ self._peersLength += 1
+
+ if (typeof peer === 'string') {
+ // `peer` is an addr ("ip:port" string)
+ self._queue.push(newPeer)
+ self._drain()
}
- }
- var wasAdded = !!self._addPeer(peer)
- if (wasAdded) {
- self.emit('peer', peer)
- } else {
- self.emit('invalidPeer', peer)
+ return newPeer
}
- return wasAdded
-}
-Torrent.prototype._addPeer = function (peer) {
- var self = this
- if (self.destroyed) {
- if (typeof peer !== 'string') peer.destroy()
- return null
- }
- if (typeof peer === 'string' && !self._validAddr(peer)) {
- self._debug('ignoring peer: invalid %s', peer)
- return null
- }
+ addWebSeed (url) {
+ if (this.destroyed) throw new Error('torrent is destroyed')
- var id = (peer && peer.id) || peer
- if (self._peers[id]) {
- self._debug('ignoring peer: duplicate (%s)', id)
- if (typeof peer !== 'string') peer.destroy()
- return null
- }
+ if (!/^https?:\/\/.+/.test(url)) {
+ this.emit('warning', new Error(`ignoring invalid web seed: ${url}`))
+ this.emit('invalidPeer', url)
+ return
+ }
- if (self.paused) {
- self._debug('ignoring peer: torrent is paused')
- if (typeof peer !== 'string') peer.destroy()
- return null
- }
+ if (this._peers[url]) {
+ this.emit('warning', new Error(`ignoring duplicate web seed: ${url}`))
+ this.emit('invalidPeer', url)
+ return
+ }
+
+ this._debug('add web seed %s', url)
- self._debug('add peer %s', id)
+ const newPeer = Peer.createWebSeedPeer(url, this)
+ this._peers[newPeer.id] = newPeer
+ this._peersLength += 1
- var newPeer
- if (typeof peer === 'string') {
- // `peer` is an addr ("ip:port" string)
- newPeer = Peer.createTCPOutgoingPeer(peer, self)
- } else {
- // `peer` is a WebRTC connection (simple-peer)
- newPeer = Peer.createWebRTCPeer(peer, self)
+ this.emit('peer', url)
}
- self._peers[newPeer.id] = newPeer
- self._peersLength += 1
+ /**
+ * Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a
+ * peer that has already sent a handshake.
+ */
+ _addIncomingPeer (peer) {
+ const self = this
+ if (self.destroyed) return peer.destroy(new Error('torrent is destroyed'))
+ if (self.paused) return peer.destroy(new Error('torrent is paused'))
- if (typeof peer === 'string') {
- // `peer` is an addr ("ip:port" string)
- self._queue.push(newPeer)
- self._drain()
+ this._debug('add incoming peer %s', peer.id)
+
+ self._peers[peer.id] = peer
+ self._peersLength += 1
}
- return newPeer
-}
+ removePeer (peer) {
+ const self = this
+ const id = (peer && peer.id) || peer
+ peer = self._peers[id]
-Torrent.prototype.addWebSeed = function (url) {
- if (this.destroyed) throw new Error('torrent is destroyed')
+ if (!peer) return
- if (!/^https?:\/\/.+/.test(url)) {
- this.emit('warning', new Error('ignoring invalid web seed: ' + url))
- this.emit('invalidPeer', url)
- return
- }
+ this._debug('removePeer %s', id)
+
+ delete self._peers[id]
+ self._peersLength -= 1
+
+ peer.destroy()
- if (this._peers[url]) {
- this.emit('warning', new Error('ignoring duplicate web seed: ' + url))
- this.emit('invalidPeer', url)
- return
+ // If torrent swarm was at capacity before, try to open a new connection now
+ self._drain()
}
- this._debug('add web seed %s', url)
+ select (start, end, priority, notify) {
+ const self = this
+ if (self.destroyed) throw new Error('torrent is destroyed')
- var newPeer = Peer.createWebSeedPeer(url, this)
- this._peers[newPeer.id] = newPeer
- this._peersLength += 1
+ if (start < 0 || end < start || self.pieces.length <= end) {
+ throw new Error('invalid selection ', start, ':', end)
+ }
+ priority = Number(priority) || 0
- this.emit('peer', url)
-}
+ self._debug('select %s-%s (priority %s)', start, end, priority)
-/**
- * Called whenever a new incoming TCP peer connects to this torrent swarm. Called with a
- * peer that has already sent a handshake.
- */
-Torrent.prototype._addIncomingPeer = function (peer) {
- var self = this
- if (self.destroyed) return peer.destroy(new Error('torrent is destroyed'))
- if (self.paused) return peer.destroy(new Error('torrent is paused'))
+ self._selections.push({
+ from: start,
+ to: end,
+ offset: 0,
+ priority,
+ notify: notify || noop
+ })
- this._debug('add incoming peer %s', peer.id)
+ self._selections.sort((a, b) => b.priority - a.priority)
- self._peers[peer.id] = peer
- self._peersLength += 1
-}
+ self._updateSelections()
+ }
-Torrent.prototype.removePeer = function (peer) {
- var self = this
- var id = (peer && peer.id) || peer
- peer = self._peers[id]
+ deselect (start, end, priority) {
+ const self = this
+ if (self.destroyed) throw new Error('torrent is destroyed')
- if (!peer) return
+ priority = Number(priority) || 0
+ self._debug('deselect %s-%s (priority %s)', start, end, priority)
- this._debug('removePeer %s', id)
+ for (let i = 0; i < self._selections.length; ++i) {
+ const s = self._selections[i]
+ if (s.from === start && s.to === end && s.priority === priority) {
+ self._selections.splice(i, 1)
+ break
+ }
+ }
- delete self._peers[id]
- self._peersLength -= 1
+ self._updateSelections()
+ }
- peer.destroy()
+ critical (start, end) {
+ const self = this
+ if (self.destroyed) throw new Error('torrent is destroyed')
- // If torrent swarm was at capacity before, try to open a new connection now
- self._drain()
-}
+ self._debug('critical %s-%s', start, end)
-Torrent.prototype.select = function (start, end, priority, notify) {
- var self = this
- if (self.destroyed) throw new Error('torrent is destroyed')
+ for (let i = start; i <= end; ++i) {
+ self._critical[i] = true
+ }
- if (start < 0 || end < start || self.pieces.length <= end) {
- throw new Error('invalid selection ', start, ':', end)
+ self._updateSelections()
}
- priority = Number(priority) || 0
- self._debug('select %s-%s (priority %s)', start, end, priority)
+ _onWire (wire, addr) {
+ const self = this
+ self._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown')
- self._selections.push({
- from: start,
- to: end,
- offset: 0,
- priority: priority,
- notify: notify || noop
- })
+ wire.on('download', downloaded => {
+ if (self.destroyed) return
+ self.received += downloaded
+ self._downloadSpeed(downloaded)
+ self.client._downloadSpeed(downloaded)
+ self.emit('download', downloaded)
+ self.client.emit('download', downloaded)
+ })
- self._selections.sort(function (a, b) {
- return b.priority - a.priority
- })
+ wire.on('upload', uploaded => {
+ if (self.destroyed) return
+ self.uploaded += uploaded
+ self._uploadSpeed(uploaded)
+ self.client._uploadSpeed(uploaded)
+ self.emit('upload', uploaded)
+ self.client.emit('upload', uploaded)
+ })
- self._updateSelections()
-}
+ self.wires.push(wire)
-Torrent.prototype.deselect = function (start, end, priority) {
- var self = this
- if (self.destroyed) throw new Error('torrent is destroyed')
+ if (addr) {
+ // Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers
+ const parts = addrToIPPort(addr)
+ wire.remoteAddress = parts[0]
+ wire.remotePort = parts[1]
+ }
- priority = Number(priority) || 0
- self._debug('deselect %s-%s (priority %s)', start, end, priority)
+ // When peer sends PORT message, add that DHT node to routing table
+ if (self.client.dht && self.client.dht.listening) {
+ wire.on('port', port => {
+ if (self.destroyed || self.client.dht.destroyed) {
+ return
+ }
+ if (!wire.remoteAddress) {
+ return self._debug('ignoring PORT from peer with no address')
+ }
+ if (port === 0 || port > 65536) {
+ return self._debug('ignoring invalid PORT from peer')
+ }
- for (var i = 0; i < self._selections.length; ++i) {
- var s = self._selections[i]
- if (s.from === start && s.to === end && s.priority === priority) {
- self._selections.splice(i, 1)
- break
+ self._debug('port: %s (from %s)', port, addr)
+ self.client.dht.addNode({ host: wire.remoteAddress, port })
+ })
}
- }
- self._updateSelections()
-}
+ wire.on('timeout', () => {
+ self._debug('wire timeout (%s)', addr)
+ // TODO: this might be destroying wires too eagerly
+ wire.destroy()
+ })
-Torrent.prototype.critical = function (start, end) {
- var self = this
- if (self.destroyed) throw new Error('torrent is destroyed')
+ // Timeout for piece requests to this peer
+ wire.setTimeout(PIECE_TIMEOUT, true)
- self._debug('critical %s-%s', start, end)
+ // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire
+ wire.setKeepAlive(true)
- for (var i = start; i <= end; ++i) {
- self._critical[i] = true
- }
+ // use ut_metadata extension
+ wire.use(utMetadata(self.metadata))
- self._updateSelections()
-}
+ wire.ut_metadata.on('warning', err => {
+ self._debug('ut_metadata warning: %s', err.message)
+ })
+
+ if (!self.metadata) {
+ wire.ut_metadata.on('metadata', metadata => {
+ self._debug('got metadata via ut_metadata')
+ self._onMetadata(metadata)
+ })
+ wire.ut_metadata.fetch()
+ }
-Torrent.prototype._onWire = function (wire, addr) {
- var self = this
- self._debug('got wire %s (%s)', wire._debugId, addr || 'Unknown')
+ // use ut_pex extension if the torrent is not flagged as private
+ if (typeof utPex === 'function' && !self.private) {
+ wire.use(utPex())
- wire.on('download', function (downloaded) {
- if (self.destroyed) return
- self.received += downloaded
- self._downloadSpeed(downloaded)
- self.client._downloadSpeed(downloaded)
- self.emit('download', downloaded)
- self.client.emit('download', downloaded)
- })
-
- wire.on('upload', function (uploaded) {
- if (self.destroyed) return
- self.uploaded += uploaded
- self._uploadSpeed(uploaded)
- self.client._uploadSpeed(uploaded)
- self.emit('upload', uploaded)
- self.client.emit('upload', uploaded)
- })
-
- self.wires.push(wire)
-
- if (addr) {
- // Sometimes RTCPeerConnection.getStats() doesn't return an ip:port for peers
- var parts = addrToIPPort(addr)
- wire.remoteAddress = parts[0]
- wire.remotePort = parts[1]
- }
+ wire.ut_pex.on('peer', peer => {
+ // Only add potential new peers when we're not seeding
+ if (self.done) return
+ self._debug('ut_pex: got peer: %s (from %s)', peer, addr)
+ self.addPeer(peer)
+ })
- // When peer sends PORT message, add that DHT node to routing table
- if (self.client.dht && self.client.dht.listening) {
- wire.on('port', function (port) {
- if (self.destroyed || self.client.dht.destroyed) {
- return
- }
- if (!wire.remoteAddress) {
- return self._debug('ignoring PORT from peer with no address')
- }
- if (port === 0 || port > 65536) {
- return self._debug('ignoring invalid PORT from peer')
- }
+ wire.ut_pex.on('dropped', peer => {
+ // the remote peer believes a given peer has been dropped from the torrent swarm.
+ // if we're not currently connected to it, then remove it from the queue.
+ const peerObj = self._peers[peer]
+ if (peerObj && !peerObj.connected) {
+ self._debug('ut_pex: dropped peer: %s (from %s)', peer, addr)
+ self.removePeer(peer)
+ }
+ })
- self._debug('port: %s (from %s)', port, addr)
- self.client.dht.addNode({ host: wire.remoteAddress, port: port })
- })
- }
+ wire.once('close', () => {
+ // Stop sending updates to remote peer
+ wire.ut_pex.reset()
+ })
+ }
- wire.on('timeout', function () {
- self._debug('wire timeout (%s)', addr)
- // TODO: this might be destroying wires too eagerly
- wire.destroy()
- })
+ // Hook to allow user-defined `bittorrent-protocol` extensions
+ // More info: https://github.com/webtorrent/bittorrent-protocol#extension-api
+ self.emit('wire', wire, addr)
- // Timeout for piece requests to this peer
- wire.setTimeout(PIECE_TIMEOUT, true)
+ if (self.metadata) {
+ process.nextTick(() => {
+ // This allows wire.handshake() to be called (by Peer.onHandshake) before any
+ // messages get sent on the wire
+ self._onWireWithMetadata(wire)
+ })
+ }
+ }
- // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire
- wire.setKeepAlive(true)
+ _onWireWithMetadata (wire) {
+ const self = this
+ let timeoutId = null
- // use ut_metadata extension
- wire.use(utMetadata(self.metadata))
+ function onChokeTimeout () {
+ if (self.destroyed || wire.destroyed) return
- wire.ut_metadata.on('warning', function (err) {
- self._debug('ut_metadata warning: %s', err.message)
- })
+ if (self._numQueued > 2 * (self._numConns - self.numPeers) &&
+ wire.amInterested) {
+ wire.destroy()
+ } else {
+ timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
+ if (timeoutId.unref) timeoutId.unref()
+ }
+ }
- if (!self.metadata) {
- wire.ut_metadata.on('metadata', function (metadata) {
- self._debug('got metadata via ut_metadata')
- self._onMetadata(metadata)
+ let i
+ function updateSeedStatus () {
+ if (wire.peerPieces.buffer.length !== self.bitfield.buffer.length) return
+ for (i = 0; i < self.pieces.length; ++i) {
+ if (!wire.peerPieces.get(i)) return
+ }
+ wire.isSeeder = true
+ wire.choke() // always choke seeders
+ }
+
+ wire.on('bitfield', () => {
+ updateSeedStatus()
+ self._update()
})
- wire.ut_metadata.fetch()
- }
- // use ut_pex extension if the torrent is not flagged as private
- if (typeof utPex === 'function' && !self.private) {
- wire.use(utPex())
+ wire.on('have', () => {
+ updateSeedStatus()
+ self._update()
+ })
- wire.ut_pex.on('peer', function (peer) {
- // Only add potential new peers when we're not seeding
- if (self.done) return
- self._debug('ut_pex: got peer: %s (from %s)', peer, addr)
- self.addPeer(peer)
+ wire.once('interested', () => {
+ wire.unchoke()
})
- wire.ut_pex.on('dropped', function (peer) {
- // the remote peer believes a given peer has been dropped from the torrent swarm.
- // if we're not currently connected to it, then remove it from the queue.
- var peerObj = self._peers[peer]
- if (peerObj && !peerObj.connected) {
- self._debug('ut_pex: dropped peer: %s (from %s)', peer, addr)
- self.removePeer(peer)
- }
+ wire.once('close', () => {
+ clearTimeout(timeoutId)
})
- wire.once('close', function () {
- // Stop sending updates to remote peer
- wire.ut_pex.reset()
+ wire.on('choke', () => {
+ clearTimeout(timeoutId)
+ timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
+ if (timeoutId.unref) timeoutId.unref()
})
- }
- // Hook to allow user-defined `bittorrent-protocol` extensions
- // More info: https://github.com/webtorrent/bittorrent-protocol#extension-api
- self.emit('wire', wire, addr)
+ wire.on('unchoke', () => {
+ clearTimeout(timeoutId)
+ self._update()
+ })
- if (self.metadata) {
- process.nextTick(function () {
- // This allows wire.handshake() to be called (by Peer.onHandshake) before any
- // messages get sent on the wire
- self._onWireWithMetadata(wire)
+ wire.on('request', (index, offset, length, cb) => {
+ if (length > MAX_BLOCK_LENGTH) {
+ // Per spec, disconnect from peers that request >128KB
+ return wire.destroy()
+ }
+ if (self.pieces[index]) return
+ self.store.get(index, { offset, length }, cb)
})
- }
-}
-Torrent.prototype._onWireWithMetadata = function (wire) {
- var self = this
- var timeoutId = null
+ wire.bitfield(self.bitfield) // always send bitfield (required)
+ wire.uninterested() // always start out uninterested (as per protocol)
- function onChokeTimeout () {
- if (self.destroyed || wire.destroyed) return
+ // Send PORT message to peers that support DHT
+ if (wire.peerExtensions.dht && self.client.dht && self.client.dht.listening) {
+ wire.port(self.client.dht.address().port)
+ }
- if (self._numQueued > 2 * (self._numConns - self.numPeers) &&
- wire.amInterested) {
- wire.destroy()
- } else {
+ if (wire.type !== 'webSeed') { // do not choke on webseeds
timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
if (timeoutId.unref) timeoutId.unref()
}
- }
- var i
- function updateSeedStatus () {
- if (wire.peerPieces.buffer.length !== self.bitfield.buffer.length) return
- for (i = 0; i < self.pieces.length; ++i) {
- if (!wire.peerPieces.get(i)) return
- }
- wire.isSeeder = true
- wire.choke() // always choke seeders
+ wire.isSeeder = false
+ updateSeedStatus()
}
- wire.on('bitfield', function () {
- updateSeedStatus()
- self._update()
- })
+ /**
+ * Called on selection changes.
+ */
+ _updateSelections () {
+ const self = this
+ if (!self.ready || self.destroyed) return
- wire.on('have', function () {
- updateSeedStatus()
+ process.nextTick(() => {
+ self._gcSelections()
+ })
+ self._updateInterest()
self._update()
- })
+ }
+
+ /**
+ * Garbage collect selections with respect to the store's current state.
+ */
+ _gcSelections () {
+ const self = this
- wire.once('interested', function () {
- wire.unchoke()
- })
+ for (let i = 0; i < self._selections.length; ++i) {
+ const s = self._selections[i]
+ const oldOffset = s.offset
- wire.once('close', function () {
- clearTimeout(timeoutId)
- })
+ // check for newly downloaded pieces in selection
+ while (self.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) {
+ s.offset += 1
+ }
- wire.on('choke', function () {
- clearTimeout(timeoutId)
- timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
- if (timeoutId.unref) timeoutId.unref()
- })
+ if (oldOffset !== s.offset) s.notify()
+ if (s.to !== s.from + s.offset) continue
+ if (!self.bitfield.get(s.from + s.offset)) continue
- wire.on('unchoke', function () {
- clearTimeout(timeoutId)
- self._update()
- })
+ self._selections.splice(i, 1) // remove fully downloaded selection
+ i -= 1 // decrement i to offset splice
- wire.on('request', function (index, offset, length, cb) {
- if (length > MAX_BLOCK_LENGTH) {
- // Per spec, disconnect from peers that request >128KB
- return wire.destroy()
+ s.notify()
+ self._updateInterest()
}
- if (self.pieces[index]) return
- self.store.get(index, { offset: offset, length: length }, cb)
- })
- wire.bitfield(self.bitfield) // always send bitfield (required)
- wire.uninterested() // always start out uninterested (as per protocol)
-
- // Send PORT message to peers that support DHT
- if (wire.peerExtensions.dht && self.client.dht && self.client.dht.listening) {
- wire.port(self.client.dht.address().port)
+ if (!self._selections.length) self.emit('idle')
}
- if (wire.type !== 'webSeed') { // do not choke on webseeds
- timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
- if (timeoutId.unref) timeoutId.unref()
- }
+ /**
+ * Update interested status for all peers.
+ */
+ _updateInterest () {
+ const self = this
- wire.isSeeder = false
- updateSeedStatus()
-}
+ const prev = self._amInterested
+ self._amInterested = !!self._selections.length
-/**
- * Called on selection changes.
- */
-Torrent.prototype._updateSelections = function () {
- var self = this
- if (!self.ready || self.destroyed) return
+ self.wires.forEach(wire => {
+ let interested = false
+ for (let index = 0; index < self.pieces.length; ++index) {
+ if (self.pieces[index] && wire.peerPieces.get(index)) {
+ interested = true
+ break
+ }
+ }
- process.nextTick(function () {
- self._gcSelections()
- })
- self._updateInterest()
- self._update()
-}
+ if (interested) wire.interested()
+ else wire.uninterested()
+ })
-/**
- * Garbage collect selections with respect to the store's current state.
- */
-Torrent.prototype._gcSelections = function () {
- var self = this
+ if (prev === self._amInterested) return
+ if (self._amInterested) self.emit('interested')
+ else self.emit('uninterested')
+ }
- for (var i = 0; i < self._selections.length; ++i) {
- var s = self._selections[i]
- var oldOffset = s.offset
+ /**
+ * Heartbeat to update all peers and their requests.
+ */
+ _update () {
+ const self = this
+ if (self.destroyed) return
- // check for newly downloaded pieces in selection
- while (self.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) {
- s.offset += 1
+ // update wires in random order for better request distribution
+ const ite = randomIterate(self.wires)
+ let wire
+ while ((wire = ite())) {
+ self._updateWire(wire)
}
+ }
- if (oldOffset !== s.offset) s.notify()
- if (s.to !== s.from + s.offset) continue
- if (!self.bitfield.get(s.from + s.offset)) continue
+ /**
+ * Attempts to update a peer's requests
+ */
+ _updateWire (wire) {
+ const self = this
- self._selections.splice(i, 1) // remove fully downloaded selection
- i -= 1 // decrement i to offset splice
+ if (wire.peerChoking) return
+ if (!wire.downloaded) return validateWire()
- s.notify()
- self._updateInterest()
- }
+ const minOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MIN_DURATION)
+ if (wire.requests.length >= minOutstandingRequests) return
+ const maxOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MAX_DURATION)
- if (!self._selections.length) self.emit('idle')
-}
-
-/**
- * Update interested status for all peers.
- */
-Torrent.prototype._updateInterest = function () {
- var self = this
+ trySelectWire(false) || trySelectWire(true)
- var prev = self._amInterested
- self._amInterested = !!self._selections.length
+ function genPieceFilterFunc (start, end, tried, rank) {
+ return i => i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i))
+ }
- self.wires.forEach(function (wire) {
- var interested = false
- for (var index = 0; index < self.pieces.length; ++index) {
- if (self.pieces[index] && wire.peerPieces.get(index)) {
- interested = true
- break
+ // TODO: Do we need both validateWire and trySelectWire?
+ function validateWire () {
+ if (wire.requests.length) return
+
+ let i = self._selections.length
+ while (i--) {
+ const next = self._selections[i]
+ let piece
+ if (self.strategy === 'rarest') {
+ const start = next.from + next.offset
+ const end = next.to
+ const len = end - start + 1
+ const tried = {}
+ let tries = 0
+ const filter = genPieceFilterFunc(start, end, tried)
+
+ while (tries < len) {
+ piece = self._rarityMap.getRarestPiece(filter)
+ if (piece < 0) break
+ if (self._request(wire, piece, false)) return
+ tried[piece] = true
+ tries += 1
+ }
+ } else {
+ for (piece = next.to; piece >= next.from + next.offset; --piece) {
+ if (!wire.peerPieces.get(piece)) continue
+ if (self._request(wire, piece, false)) return
+ }
+ }
}
+
+ // TODO: wire failed to validate as useful; should we close it?
+ // probably not, since 'have' and 'bitfield' messages might be coming
}
- if (interested) wire.interested()
- else wire.uninterested()
- })
+ function speedRanker () {
+ const speed = wire.downloadSpeed() || 1
+ if (speed > SPEED_THRESHOLD) return () => true
- if (prev === self._amInterested) return
- if (self._amInterested) self.emit('interested')
- else self.emit('uninterested')
-}
+ const secs = Math.max(1, wire.requests.length) * Piece.BLOCK_LENGTH / speed
+ let tries = 10
+ let ptr = 0
-/**
- * Heartbeat to update all peers and their requests.
- */
-Torrent.prototype._update = function () {
- var self = this
- if (self.destroyed) return
-
- // update wires in random order for better request distribution
- var ite = randomIterate(self.wires)
- var wire
- while ((wire = ite())) {
- self._updateWire(wire)
- }
-}
+ return index => {
+ if (!tries || self.bitfield.get(index)) return true
-/**
- * Attempts to update a peer's requests
- */
-Torrent.prototype._updateWire = function (wire) {
- var self = this
+ let missing = self.pieces[index].missing
- if (wire.peerChoking) return
- if (!wire.downloaded) return validateWire()
+ for (; ptr < self.wires.length; ptr++) {
+ const otherWire = self.wires[ptr]
+ const otherSpeed = otherWire.downloadSpeed()
- var minOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MIN_DURATION)
- if (wire.requests.length >= minOutstandingRequests) return
- var maxOutstandingRequests = getBlockPipelineLength(wire, PIPELINE_MAX_DURATION)
+ if (otherSpeed < SPEED_THRESHOLD) continue
+ if (otherSpeed <= speed) continue
+ if (!otherWire.peerPieces.get(index)) continue
+ if ((missing -= otherSpeed * secs) > 0) continue
- trySelectWire(false) || trySelectWire(true)
+ tries--
+ return false
+ }
- function genPieceFilterFunc (start, end, tried, rank) {
- return function (i) {
- return i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i))
+ return true
+ }
}
- }
- // TODO: Do we need both validateWire and trySelectWire?
- function validateWire () {
- if (wire.requests.length) return
-
- var i = self._selections.length
- while (i--) {
- var next = self._selections[i]
- var piece
- if (self.strategy === 'rarest') {
- var start = next.from + next.offset
- var end = next.to
- var len = end - start + 1
- var tried = {}
- var tries = 0
- var filter = genPieceFilterFunc(start, end, tried)
-
- while (tries < len) {
- piece = self._rarityMap.getRarestPiece(filter)
- if (piece < 0) break
- if (self._request(wire, piece, false)) return
- tried[piece] = true
- tries += 1
- }
- } else {
- for (piece = next.to; piece >= next.from + next.offset; --piece) {
- if (!wire.peerPieces.get(piece)) continue
- if (self._request(wire, piece, false)) return
- }
+ function shufflePriority (i) {
+ let last = i
+ for (let j = i; j < self._selections.length && self._selections[j].priority; j++) {
+ last = j
}
+ const tmp = self._selections[i]
+ self._selections[i] = self._selections[last]
+ self._selections[last] = tmp
}
- // TODO: wire failed to validate as useful; should we close it?
- // probably not, since 'have' and 'bitfield' messages might be coming
- }
-
- function speedRanker () {
- var speed = wire.downloadSpeed() || 1
- if (speed > SPEED_THRESHOLD) return function () { return true }
-
- var secs = Math.max(1, wire.requests.length) * Piece.BLOCK_LENGTH / speed
- var tries = 10
- var ptr = 0
-
- return function (index) {
- if (!tries || self.bitfield.get(index)) return true
-
- var missing = self.pieces[index].missing
+ function trySelectWire (hotswap) {
+ if (wire.requests.length >= maxOutstandingRequests) return true
+ const rank = speedRanker()
+
+ for (let i = 0; i < self._selections.length; i++) {
+ const next = self._selections[i]
+
+ let piece
+ if (self.strategy === 'rarest') {
+ const start = next.from + next.offset
+ const end = next.to
+ const len = end - start + 1
+ const tried = {}
+ let tries = 0
+ const filter = genPieceFilterFunc(start, end, tried, rank)
+
+ while (tries < len) {
+ piece = self._rarityMap.getRarestPiece(filter)
+ if (piece < 0) break
+
+ // request all non-reserved blocks in this piece
+ while (self._request(wire, piece, self._critical[piece] || hotswap)) {}
+
+ if (wire.requests.length < maxOutstandingRequests) {
+ tried[piece] = true
+ tries++
+ continue
+ }
+
+ if (next.priority) shufflePriority(i)
+ return true
+ }
+ } else {
+ for (piece = next.from + next.offset; piece <= next.to; piece++) {
+ if (!wire.peerPieces.get(piece) || !rank(piece)) continue
- for (; ptr < self.wires.length; ptr++) {
- var otherWire = self.wires[ptr]
- var otherSpeed = otherWire.downloadSpeed()
+ // request all non-reserved blocks in piece
+ while (self._request(wire, piece, self._critical[piece] || hotswap)) {}
- if (otherSpeed < SPEED_THRESHOLD) continue
- if (otherSpeed <= speed) continue
- if (!otherWire.peerPieces.get(index)) continue
- if ((missing -= otherSpeed * secs) > 0) continue
+ if (wire.requests.length < maxOutstandingRequests) continue
- tries--
- return false
+ if (next.priority) shufflePriority(i)
+ return true
+ }
+ }
}
- return true
- }
- }
-
- function shufflePriority (i) {
- var last = i
- for (var j = i; j < self._selections.length && self._selections[j].priority; j++) {
- last = j
+ return false
}
- var tmp = self._selections[i]
- self._selections[i] = self._selections[last]
- self._selections[last] = tmp
}
- function trySelectWire (hotswap) {
- if (wire.requests.length >= maxOutstandingRequests) return true
- var rank = speedRanker()
-
- for (var i = 0; i < self._selections.length; i++) {
- var next = self._selections[i]
-
- var piece
- if (self.strategy === 'rarest') {
- var start = next.from + next.offset
- var end = next.to
- var len = end - start + 1
- var tried = {}
- var tries = 0
- var filter = genPieceFilterFunc(start, end, tried, rank)
+ /**
+ * Called periodically to update the choked status of all peers, handling optimistic
+ * unchoking as described in BEP3.
+ */
+ _rechoke () {
+ const self = this
+ if (!self.ready) return
- while (tries < len) {
- piece = self._rarityMap.getRarestPiece(filter)
- if (piece < 0) break
+ if (self._rechokeOptimisticTime > 0) self._rechokeOptimisticTime -= 1
+ else self._rechokeOptimisticWire = null
- // request all non-reserved blocks in this piece
- while (self._request(wire, piece, self._critical[piece] || hotswap)) {}
+ const peers = []
- if (wire.requests.length < maxOutstandingRequests) {
- tried[piece] = true
- tries++
- continue
- }
+ self.wires.forEach(wire => {
+ if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) {
+ peers.push({
+ wire,
+ downloadSpeed: wire.downloadSpeed(),
+ uploadSpeed: wire.uploadSpeed(),
+ salt: Math.random(),
+ isChoked: true
+ })
+ }
+ })
- if (next.priority) shufflePriority(i)
- return true
- }
- } else {
- for (piece = next.from + next.offset; piece <= next.to; piece++) {
- if (!wire.peerPieces.get(piece) || !rank(piece)) continue
+ peers.sort(rechokeSort)
- // request all non-reserved blocks in piece
- while (self._request(wire, piece, self._critical[piece] || hotswap)) {}
+ let unchokeInterested = 0
+ let i = 0
+ for (; i < peers.length && unchokeInterested < self._rechokeNumSlots; ++i) {
+ peers[i].isChoked = false
+ if (peers[i].wire.peerInterested) unchokeInterested += 1
+ }
- if (wire.requests.length < maxOutstandingRequests) continue
+ // Optimistically unchoke a peer
+ if (!self._rechokeOptimisticWire && i < peers.length && self._rechokeNumSlots) {
+ const candidates = peers.slice(i).filter(peer => peer.wire.peerInterested)
+ const optimistic = candidates[randomInt(candidates.length)]
- if (next.priority) shufflePriority(i)
- return true
- }
+ if (optimistic) {
+ optimistic.isChoked = false
+ self._rechokeOptimisticWire = optimistic.wire
+ self._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION
}
}
- return false
- }
-}
-
-/**
- * Called periodically to update the choked status of all peers, handling optimistic
- * unchoking as described in BEP3.
- */
-Torrent.prototype._rechoke = function () {
- var self = this
- if (!self.ready) return
-
- if (self._rechokeOptimisticTime > 0) self._rechokeOptimisticTime -= 1
- else self._rechokeOptimisticWire = null
-
- var peers = []
-
- self.wires.forEach(function (wire) {
- if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) {
- peers.push({
- wire: wire,
- downloadSpeed: wire.downloadSpeed(),
- uploadSpeed: wire.uploadSpeed(),
- salt: Math.random(),
- isChoked: true
- })
- }
- })
+ // Unchoke best peers
+ peers.forEach(peer => {
+ if (peer.wire.amChoking !== peer.isChoked) {
+ if (peer.isChoked) peer.wire.choke()
+ else peer.wire.unchoke()
+ }
+ })
- peers.sort(rechokeSort)
+ function rechokeSort (peerA, peerB) {
+ // Prefer higher download speed
+ if (peerA.downloadSpeed !== peerB.downloadSpeed) {
+ return peerB.downloadSpeed - peerA.downloadSpeed
+ }
- var unchokeInterested = 0
- var i = 0
- for (; i < peers.length && unchokeInterested < self._rechokeNumSlots; ++i) {
- peers[i].isChoked = false
- if (peers[i].wire.peerInterested) unchokeInterested += 1
- }
+ // Prefer higher upload speed
+ if (peerA.uploadSpeed !== peerB.uploadSpeed) {
+ return peerB.uploadSpeed - peerA.uploadSpeed
+ }
- // Optimistically unchoke a peer
- if (!self._rechokeOptimisticWire && i < peers.length && self._rechokeNumSlots) {
- var candidates = peers.slice(i).filter(function (peer) { return peer.wire.peerInterested })
- var optimistic = candidates[randomInt(candidates.length)]
+ // Prefer unchoked
+ if (peerA.wire.amChoking !== peerB.wire.amChoking) {
+ return peerA.wire.amChoking ? 1 : -1
+ }
- if (optimistic) {
- optimistic.isChoked = false
- self._rechokeOptimisticWire = optimistic.wire
- self._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION
+ // Random order
+ return peerA.salt - peerB.salt
}
}
- // Unchoke best peers
- peers.forEach(function (peer) {
- if (peer.wire.amChoking !== peer.isChoked) {
- if (peer.isChoked) peer.wire.choke()
- else peer.wire.unchoke()
- }
- })
+ /**
+ * Attempts to cancel a slow block request from another wire such that the
+ * given wire may effectively swap out the request for one of its own.
+ */
+ _hotswap (wire, index) {
+ const self = this
- function rechokeSort (peerA, peerB) {
- // Prefer higher download speed
- if (peerA.downloadSpeed !== peerB.downloadSpeed) {
- return peerB.downloadSpeed - peerA.downloadSpeed
- }
-
- // Prefer higher upload speed
- if (peerA.uploadSpeed !== peerB.uploadSpeed) {
- return peerB.uploadSpeed - peerA.uploadSpeed
- }
+ const speed = wire.downloadSpeed()
+ if (speed < Piece.BLOCK_LENGTH) return false
+ if (!self._reservations[index]) return false
- // Prefer unchoked
- if (peerA.wire.amChoking !== peerB.wire.amChoking) {
- return peerA.wire.amChoking ? 1 : -1
+ const r = self._reservations[index]
+ if (!r) {
+ return false
}
- // Random order
- return peerA.salt - peerB.salt
- }
-}
-
-/**
- * Attempts to cancel a slow block request from another wire such that the
- * given wire may effectively swap out the request for one of its own.
- */
-Torrent.prototype._hotswap = function (wire, index) {
- var self = this
+ let minSpeed = Infinity
+ let minWire
- var speed = wire.downloadSpeed()
- if (speed < Piece.BLOCK_LENGTH) return false
- if (!self._reservations[index]) return false
-
- var r = self._reservations[index]
- if (!r) {
- return false
- }
+ let i
+ for (i = 0; i < r.length; i++) {
+ const otherWire = r[i]
+ if (!otherWire || otherWire === wire) continue
- var minSpeed = Infinity
- var minWire
+ const otherSpeed = otherWire.downloadSpeed()
+ if (otherSpeed >= SPEED_THRESHOLD) continue
+ if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue
- var i
- for (i = 0; i < r.length; i++) {
- var otherWire = r[i]
- if (!otherWire || otherWire === wire) continue
+ minWire = otherWire
+ minSpeed = otherSpeed
+ }
- var otherSpeed = otherWire.downloadSpeed()
- if (otherSpeed >= SPEED_THRESHOLD) continue
- if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue
+ if (!minWire) return false
- minWire = otherWire
- minSpeed = otherSpeed
- }
+ for (i = 0; i < r.length; i++) {
+ if (r[i] === minWire) r[i] = null
+ }
- if (!minWire) return false
+ for (i = 0; i < minWire.requests.length; i++) {
+ const req = minWire.requests[i]
+ if (req.piece !== index) continue
- for (i = 0; i < r.length; i++) {
- if (r[i] === minWire) r[i] = null
- }
-
- for (i = 0; i < minWire.requests.length; i++) {
- var req = minWire.requests[i]
- if (req.piece !== index) continue
+ self.pieces[index].cancel((req.offset / Piece.BLOCK_LENGTH) | 0)
+ }
- self.pieces[index].cancel((req.offset / Piece.BLOCK_LENGTH) | 0)
+ self.emit('hotswap', minWire, wire, index)
+ return true
}
- self.emit('hotswap', minWire, wire, index)
- return true
-}
+ /**
+ * Attempts to request a block from the given wire.
+ */
+ _request (wire, index, hotswap) {
+ const self = this
+ const numRequests = wire.requests.length
+ const isWebSeed = wire.type === 'webSeed'
-/**
- * Attempts to request a block from the given wire.
- */
-Torrent.prototype._request = function (wire, index, hotswap) {
- var self = this
- var numRequests = wire.requests.length
- var isWebSeed = wire.type === 'webSeed'
+ if (self.bitfield.get(index)) return false
- if (self.bitfield.get(index)) return false
+ const maxOutstandingRequests = isWebSeed
+ ? Math.min(
+ getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength),
+ self.maxWebConns
+ )
+ : getBlockPipelineLength(wire, PIPELINE_MAX_DURATION)
- var maxOutstandingRequests = isWebSeed
- ? Math.min(
- getPiecePipelineLength(wire, PIPELINE_MAX_DURATION, self.pieceLength),
- self.maxWebConns
- )
- : getBlockPipelineLength(wire, PIPELINE_MAX_DURATION)
+ if (numRequests >= maxOutstandingRequests) return false
+ // var endGame = (wire.requests.length === 0 && self.store.numMissing < 30)
- if (numRequests >= maxOutstandingRequests) return false
- // var endGame = (wire.requests.length === 0 && self.store.numMissing < 30)
+ const piece = self.pieces[index]
+ let reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve()
- var piece = self.pieces[index]
- var reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve()
+ if (reservation === -1 && hotswap && self._hotswap(wire, index)) {
+ reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve()
+ }
+ if (reservation === -1) return false
- if (reservation === -1 && hotswap && self._hotswap(wire, index)) {
- reservation = isWebSeed ? piece.reserveRemaining() : piece.reserve()
- }
- if (reservation === -1) return false
+ let r = self._reservations[index]
+ if (!r) r = self._reservations[index] = []
+ let i = r.indexOf(null)
+ if (i === -1) i = r.length
+ r[i] = wire
- var r = self._reservations[index]
- if (!r) r = self._reservations[index] = []
- var i = r.indexOf(null)
- if (i === -1) i = r.length
- r[i] = wire
+ const chunkOffset = piece.chunkOffset(reservation)
+ const chunkLength = isWebSeed ? piece.chunkLengthRemaining(reservation) : piece.chunkLength(reservation)
- var chunkOffset = piece.chunkOffset(reservation)
- var chunkLength = isWebSeed ? piece.chunkLengthRemaining(reservation) : piece.chunkLength(reservation)
+ wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) {
+ if (self.destroyed) return
- wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) {
- if (self.destroyed) return
+ // TODO: what is this for?
+ if (!self.ready) return self.once('ready', () => { onChunk(err, chunk) })
- // TODO: what is this for?
- if (!self.ready) return self.once('ready', function () { onChunk(err, chunk) })
+ if (r[i] === wire) r[i] = null
- if (r[i] === wire) r[i] = null
+ if (piece !== self.pieces[index]) return onUpdateTick()
- if (piece !== self.pieces[index]) return onUpdateTick()
+ if (err) {
+ self._debug(
+ 'error getting piece %s (offset: %s length: %s) from %s: %s',
+ index, chunkOffset, chunkLength, `${wire.remoteAddress}:${wire.remotePort}`,
+ err.message
+ )
+ isWebSeed ? piece.cancelRemaining(reservation) : piece.cancel(reservation)
+ onUpdateTick()
+ return
+ }
- if (err) {
self._debug(
- 'error getting piece %s (offset: %s length: %s) from %s: %s',
- index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort,
- err.message
+ 'got piece %s (offset: %s length: %s) from %s',
+ index, chunkOffset, chunkLength, `${wire.remoteAddress}:${wire.remotePort}`
)
- isWebSeed ? piece.cancelRemaining(reservation) : piece.cancel(reservation)
- onUpdateTick()
- return
- }
-
- self._debug(
- 'got piece %s (offset: %s length: %s) from %s',
- index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort
- )
- if (!piece.set(reservation, chunk, wire)) return onUpdateTick()
+ if (!piece.set(reservation, chunk, wire)) return onUpdateTick()
- var buf = piece.flush()
+ const buf = piece.flush()
- // TODO: might need to set self.pieces[index] = null here since sha1 is async
+ // TODO: might need to set self.pieces[index] = null here since sha1 is async
- sha1(buf, function (hash) {
- if (self.destroyed) return
+ sha1(buf, hash => {
+ if (self.destroyed) return
- if (hash === self._hashes[index]) {
- if (!self.pieces[index]) return
- self._debug('piece verified %s', index)
+ if (hash === self._hashes[index]) {
+ if (!self.pieces[index]) return
+ self._debug('piece verified %s', index)
- self.pieces[index] = null
- self._reservations[index] = null
- self.bitfield.set(index, true)
+ self.pieces[index] = null
+ self._reservations[index] = null
+ self.bitfield.set(index, true)
- self.store.put(index, buf)
+ self.store.put(index, buf)
- self.wires.forEach(function (wire) {
- wire.have(index)
- })
+ self.wires.forEach(wire => {
+ wire.have(index)
+ })
- // We also check `self.destroyed` since `torrent.destroy()` could have been
- // called in the `torrent.on('done')` handler, triggered by `_checkDone()`.
- if (self._checkDone() && !self.destroyed) self.discovery.complete()
- } else {
- self.pieces[index] = new Piece(piece.length)
- self.emit('warning', new Error('Piece ' + index + ' failed verification'))
- }
- onUpdateTick()
+ // We also check `self.destroyed` since `torrent.destroy()` could have been
+ // called in the `torrent.on('done')` handler, triggered by `_checkDone()`.
+ if (self._checkDone() && !self.destroyed) self.discovery.complete()
+ } else {
+ self.pieces[index] = new Piece(piece.length)
+ self.emit('warning', new Error(`Piece ${index} failed verification`))
+ }
+ onUpdateTick()
+ })
})
- })
- function onUpdateTick () {
- process.nextTick(function () { self._update() })
+ function onUpdateTick () {
+ process.nextTick(() => { self._update() })
+ }
+
+ return true
}
- return true
-}
+ _checkDone () {
+ const self = this
+ if (self.destroyed) return
-Torrent.prototype._checkDone = function () {
- var self = this
- if (self.destroyed) return
-
- // are any new files done?
- self.files.forEach(function (file) {
- if (file.done) return
- for (var i = file._startPiece; i <= file._endPiece; ++i) {
- if (!self.bitfield.get(i)) return
- }
- file.done = true
- file.emit('done')
- self._debug('file done: ' + file.name)
- })
-
- // is the torrent done? (if all current selections are satisfied, or there are
- // no selections, then torrent is done)
- var done = true
- for (var i = 0; i < self._selections.length; i++) {
- var selection = self._selections[i]
- for (var piece = selection.from; piece <= selection.to; piece++) {
- if (!self.bitfield.get(piece)) {
- done = false
- break
+ // are any new files done?
+ self.files.forEach(file => {
+ if (file.done) return
+ for (let i = file._startPiece; i <= file._endPiece; ++i) {
+ if (!self.bitfield.get(i)) return
+ }
+ file.done = true
+ file.emit('done')
+ self._debug(`file done: ${file.name}`)
+ })
+
+ // is the torrent done? (if all current selections are satisfied, or there are
+ // no selections, then torrent is done)
+ let done = true
+ for (let i = 0; i < self._selections.length; i++) {
+ const selection = self._selections[i]
+ for (let piece = selection.from; piece <= selection.to; piece++) {
+ if (!self.bitfield.get(piece)) {
+ done = false
+ break
+ }
}
+ if (!done) break
}
- if (!done) break
- }
- if (!self.done && done) {
- self.done = true
- self._debug('torrent done: ' + self.infoHash)
- self.emit('done')
+ if (!self.done && done) {
+ self.done = true
+ self._debug(`torrent done: ${self.infoHash}`)
+ self.emit('done')
+ }
+ self._gcSelections()
+
+ return done
}
- self._gcSelections()
- return done
-}
+ load (streams, cb) {
+ const self = this
+ if (self.destroyed) throw new Error('torrent is destroyed')
+ if (!self.ready) return self.once('ready', () => { self.load(streams, cb) })
-Torrent.prototype.load = function (streams, cb) {
- var self = this
- if (self.destroyed) throw new Error('torrent is destroyed')
- if (!self.ready) return self.once('ready', function () { self.load(streams, cb) })
+ if (!Array.isArray(streams)) streams = [ streams ]
+ if (!cb) cb = noop
- if (!Array.isArray(streams)) streams = [ streams ]
- if (!cb) cb = noop
+ const readable = new MultiStream(streams)
+ const writable = new ChunkStoreWriteStream(self.store, self.pieceLength)
- var readable = new MultiStream(streams)
- var writable = new ChunkStoreWriteStream(self.store, self.pieceLength)
+ pump(readable, writable, err => {
+ if (err) return cb(err)
+ self._markAllVerified()
+ self._checkDone()
+ cb(null)
+ })
+ }
- pump(readable, writable, function (err) {
- if (err) return cb(err)
- self._markAllVerified()
- self._checkDone()
- cb(null)
- })
-}
+ createServer (requestListener) {
+ if (typeof Server !== 'function') throw new Error('node.js-only method')
+ if (this.destroyed) throw new Error('torrent is destroyed')
+ const server = new Server(this, requestListener)
+ this._servers.push(server)
+ return server
+ }
-Torrent.prototype.createServer = function (requestListener) {
- if (typeof Server !== 'function') throw new Error('node.js-only method')
- if (this.destroyed) throw new Error('torrent is destroyed')
- var server = new Server(this, requestListener)
- this._servers.push(server)
- return server
-}
+ pause () {
+ if (this.destroyed) return
+ this._debug('pause')
+ this.paused = true
+ }
-Torrent.prototype.pause = function () {
- if (this.destroyed) return
- this._debug('pause')
- this.paused = true
-}
+ resume () {
+ if (this.destroyed) return
+ this._debug('resume')
+ this.paused = false
+ this._drain()
+ }
-Torrent.prototype.resume = function () {
- if (this.destroyed) return
- this._debug('resume')
- this.paused = false
- this._drain()
-}
+ _debug () {
+ const args = [].slice.call(arguments)
+ args[0] = `[${this.client._debugId}] [${this._debugId}] ${args[0]}`
+ debug(...args)
+ }
-Torrent.prototype._debug = function () {
- var args = [].slice.call(arguments)
- args[0] = '[' + this.client._debugId + '] [' + this._debugId + '] ' + args[0]
- debug.apply(null, args)
-}
+ /**
+ * Pop a peer off the FIFO queue and connect to it. When _drain() gets called,
+ * the queue will usually have only one peer in it, except when there are too
+ * many peers (over `this.maxConns`) in which case they will just sit in the
+ * queue until another connection closes.
+ */
+ _drain () {
+ const self = this
+ this._debug('_drain numConns %s maxConns %s', self._numConns, self.client.maxConns)
+ if (typeof net.connect !== 'function' || self.destroyed || self.paused ||
+ self._numConns >= self.client.maxConns) {
+ return
+ }
+ this._debug('drain (%s queued, %s/%s peers)', self._numQueued, self.numPeers, self.client.maxConns)
-/**
- * Pop a peer off the FIFO queue and connect to it. When _drain() gets called,
- * the queue will usually have only one peer in it, except when there are too
- * many peers (over `this.maxConns`) in which case they will just sit in the
- * queue until another connection closes.
- */
-Torrent.prototype._drain = function () {
- var self = this
- this._debug('_drain numConns %s maxConns %s', self._numConns, self.client.maxConns)
- if (typeof net.connect !== 'function' || self.destroyed || self.paused ||
- self._numConns >= self.client.maxConns) {
- return
- }
- this._debug('drain (%s queued, %s/%s peers)', self._numQueued, self.numPeers, self.client.maxConns)
+ const peer = self._queue.shift()
+ if (!peer) return // queue could be empty
- var peer = self._queue.shift()
- if (!peer) return // queue could be empty
+ this._debug('tcp connect attempt to %s', peer.addr)
- this._debug('tcp connect attempt to %s', peer.addr)
+ const parts = addrToIPPort(peer.addr)
+ const opts = {
+ host: parts[0],
+ port: parts[1]
+ }
- var parts = addrToIPPort(peer.addr)
- var opts = {
- host: parts[0],
- port: parts[1]
- }
+ const conn = peer.conn = net.connect(opts)
- var conn = peer.conn = net.connect(opts)
+ conn.once('connect', () => { peer.onConnect() })
+ conn.once('error', err => { peer.destroy(err) })
+ peer.startConnectTimeout()
- conn.once('connect', function () { peer.onConnect() })
- conn.once('error', function (err) { peer.destroy(err) })
- peer.startConnectTimeout()
+ // When connection closes, attempt reconnect after timeout (with exponential backoff)
+ conn.on('close', () => {
+ if (self.destroyed) return
- // When connection closes, attempt reconnect after timeout (with exponential backoff)
- conn.on('close', function () {
- if (self.destroyed) return
+ // TODO: If torrent is done, do not try to reconnect after a timeout
- // TODO: If torrent is done, do not try to reconnect after a timeout
+ if (peer.retries >= RECONNECT_WAIT.length) {
+ self._debug(
+ 'conn %s closed: will not re-add (max %s attempts)',
+ peer.addr, RECONNECT_WAIT.length
+ )
+ return
+ }
- if (peer.retries >= RECONNECT_WAIT.length) {
+ const ms = RECONNECT_WAIT[peer.retries]
self._debug(
- 'conn %s closed: will not re-add (max %s attempts)',
- peer.addr, RECONNECT_WAIT.length
+ 'conn %s closed: will re-add to queue in %sms (attempt %s)',
+ peer.addr, ms, peer.retries + 1
)
- return
- }
-
- var ms = RECONNECT_WAIT[peer.retries]
- self._debug(
- 'conn %s closed: will re-add to queue in %sms (attempt %s)',
- peer.addr, ms, peer.retries + 1
- )
- var reconnectTimeout = setTimeout(function reconnectTimeout () {
- var newPeer = self._addPeer(peer.addr)
- if (newPeer) newPeer.retries = peer.retries + 1
- }, ms)
- if (reconnectTimeout.unref) reconnectTimeout.unref()
- })
-}
+ const reconnectTimeout = setTimeout(function reconnectTimeout () {
+ const newPeer = self._addPeer(peer.addr)
+ if (newPeer) newPeer.retries = peer.retries + 1
+ }, ms)
+ if (reconnectTimeout.unref) reconnectTimeout.unref()
+ })
+ }
-/**
- * Returns `true` if string is valid IPv4/6 address.
- * @param {string} addr
- * @return {boolean}
- */
-Torrent.prototype._validAddr = function (addr) {
- var parts
- try {
- parts = addrToIPPort(addr)
- } catch (e) {
- return false
+ /**
+ * Returns `true` if string is valid IPv4/6 address.
+ * @param {string} addr
+ * @return {boolean}
+ */
+ _validAddr (addr) {
+ let parts
+ try {
+ parts = addrToIPPort(addr)
+ } catch (e) {
+ return false
+ }
+ const host = parts[0]
+ const port = parts[1]
+ return port > 0 && port < 65535 &&
+ !(host === '127.0.0.1' && port === this.client.torrentPort)
}
- var host = parts[0]
- var port = parts[1]
- return port > 0 && port < 65535 &&
- !(host === '127.0.0.1' && port === this.client.torrentPort)
}
function getBlockPipelineLength (wire, duration) {
@@ -1770,3 +1723,5 @@ function randomInt (high) {
}
function noop () {}
+
+module.exports = Torrent