diff options
-rw-r--r-- | README.md | 10 | ||||
-rwxr-xr-x | bin/cmd.js | 34 | ||||
-rw-r--r-- | index.js | 63 | ||||
-rw-r--r-- | lib/append-to.js | 4 | ||||
-rw-r--r-- | lib/file-stream.js | 86 | ||||
-rw-r--r-- | lib/file.js | 120 | ||||
-rw-r--r-- | lib/fs-storage.js | 211 | ||||
-rw-r--r-- | lib/load-chunk-store.js | 61 | ||||
-rw-r--r-- | lib/storage.js | 683 | ||||
-rw-r--r-- | lib/torrent.js | 551 | ||||
-rw-r--r-- | package.json | 7 | ||||
-rw-r--r-- | test/download-dht-magnet.js | 2 | ||||
-rw-r--r-- | test/download-dht-torrent.js | 2 | ||||
-rw-r--r-- | test/download-tracker-magnet.js | 2 | ||||
-rw-r--r-- | test/download-tracker-torrent.js | 2 | ||||
-rw-r--r-- | test/server.js | 2 | ||||
-rw-r--r-- | test/storage.js | 61 |
17 files changed, 567 insertions, 1334 deletions
@@ -230,9 +230,8 @@ If `opts` is specified, then the default options (shown below) will be overridde nodeId: String|Buffer, // DHT protocol node ID (default=randomly generated) peerId: String|Buffer, // Wire protocol peer ID (default=randomly generated) rtcConfig: Object, // RTCPeerConnection configuration object (default=STUN only) - storage: Function, // custom storage engine, or `false` to use in-memory engine tracker: Boolean, // Whether or not to enable trackers (default=true) - wrtc: {} // custom webrtc implementation (in node, specify the [wrtc](https://www.npmjs.com/package/wrtc) package) + wrtc: {} // Custom webrtc implementation (in node, specify the [wrtc](https://www.npmjs.com/package/wrtc) package) } ``` @@ -253,9 +252,10 @@ If `opts` is specified, then the default options (shown below) will be overridde ```js { - announce: [], // List of additional trackers to use (added to list in .torrent or magnet uri) - path: String, // Folder where files will be downloaded (default=`/tmp/webtorrent/`) - verify: Boolean // Verify previously stored data before starting (default=false) + announce: [], // List of additional trackers to use (added to list in .torrent or magnet uri) + path: String, // Folder where files will be downloaded (default=`/tmp/webtorrent/`) + storage: Function, // Custom storage engine (must follow `abstract-chunk-store` API) + verify: Boolean // Verify previously stored data before starting (default=false) } ``` @@ -12,7 +12,6 @@ var networkAddress = require('network-address') var parseTorrent = require('parse-torrent') var path = require('path') var prettyBytes = require('pretty-bytes') -var Storage = require('../lib/storage') var WebTorrent = require('../') var zeroFill = require('zero-fill') @@ -511,7 +510,7 @@ function drawTorrent (torrent) { linesRemaining -= 1 } - var seeding = torrent.storage.done + var seeding = torrent.done if (!seeding) clivas.line('') clivas.line( @@ -540,34 +539,29 @@ function drawTorrent (torrent) { linesRemaining -= 5 if (argv.verbose) { - var pieces = torrent.storage.pieces - var memoryUsage = 0 + var pieces = torrent.pieces for (var i = 0; i < pieces.length; i++) { var piece = pieces[i] - if (piece.buffer) memoryUsage += piece.buffer.length if (piece.verified || (piece.blocksWritten === 0 && !piece.blocks[0])) continue var bar = '' for (var j = 0; j < piece.blocks.length; j++) { - switch (piece.blocks[j]) { - case Storage.BLOCK_BLANK: - bar += '{red:█}' - break - case Storage.BLOCK_RESERVED: - bar += '{blue:█}' - break - case Storage.BLOCK_WRITTEN: - bar += '{green:█}' - break - } + // switch (piece.blocks[j]) { + // case Storage.BLOCK_BLANK: + // bar += '{red:█}' + // break + // case Storage.BLOCK_RESERVED: + // bar += '{blue:█}' + // break + // case Storage.BLOCK_WRITTEN: + // bar += '{green:█}' + // break + // } } clivas.line('{4+cyan:' + i + '} ' + bar) linesRemaining -= 1 } - clivas.line( - '{red:memory usage:} {bold:' + prettyBytes(memoryUsage) + '}' - ) clivas.line('{80:}') - linesRemaining -= 2 + linesRemaining -= 1 } torrent.swarm.wires.every(function (wire) { @@ -14,8 +14,6 @@ var speedometer = require('speedometer') var zeroFill = require('zero-fill') var path = require('path') -var FSStorage = require('./lib/fs-storage') // browser exclude -var Storage = require('./lib/storage') var Torrent = require('./lib/torrent') inherits(WebTorrent, EventEmitter) @@ -53,12 +51,6 @@ function WebTorrent (opts) { self.downloadSpeed = speedometer() self.uploadSpeed = speedometer() - self.storage = typeof opts.storage === 'function' - ? opts.storage - : (opts.storage !== false && typeof FSStorage === 'function' /* browser exclude */) - ? FSStorage - : Storage - self.peerId = opts.peerId === undefined ? new Buffer('-WW' + VERSION_STR + '-' + hat(48), 'utf8') : typeof opts.peerId === 'string' @@ -85,7 +77,7 @@ function WebTorrent (opts) { loadIPSet(opts.blocklist, { headers: { 'user-agent': 'WebTorrent (http://webtorrent.io)' } }, function (err, ipSet) { - if (err) return self.error('failed to load blocklist: ' + err.message) + if (err) return self.error('Failed to load blocklist: ' + err.message) self.blocked = ipSet ready() }) @@ -126,13 +118,13 @@ Object.defineProperty(WebTorrent.prototype, 'ratio', { WebTorrent.prototype.get = function (torrentId) { var self = this if (torrentId instanceof Torrent) return torrentId + var parsed - try { - parsed = parseTorrent(torrentId) - } catch (err) { - return null - } + try { parsed = parseTorrent(torrentId) } catch (err) {} + + if (!parsed) return null if (!parsed.infoHash) throw new Error('Invalid torrent identifier') + for (var i = 0, len = self.torrents.length; i < len; i++) { var torrent = self.torrents[i] if (torrent.infoHash === parsed.infoHash) return torrent @@ -150,19 +142,16 @@ WebTorrent.prototype.add = WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { var self = this if (self.destroyed) throw new Error('client is destroyed') + if (typeof opts === 'function') return self.add(torrentId, null, opts) debug('add') - if (typeof opts === 'function') { - ontorrent = opts - opts = {} - } if (!opts) opts = {} - if (!opts.storage) opts.storage = self.storage + opts.client = self var torrent = self.get(torrentId) function _ontorrent () { - debug('on torrent') + debug('on torrent %s', torrent.infoHash) if (typeof ontorrent === 'function') ontorrent(torrent) } @@ -194,29 +183,24 @@ WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { /** * Start seeding a new file/folder. * @param {string|File|FileList|Buffer|Array.<string|File|Buffer>} input - * @param {Object} opts - * @param {function} onseed + * @param {Object=} opts + * @param {function=} onseed */ WebTorrent.prototype.seed = function (input, opts, onseed) { var self = this if (self.destroyed) throw new Error('client is destroyed') + if (typeof opts === 'function') return self.seed(input, null, opts) debug('seed') - if (typeof opts === 'function') { - onseed = opts - opts = {} - } if (!opts) opts = {} - opts.noVerify = true - opts.createdBy = 'WebTorrent/' + VERSION - // When seeding from filesystem path, don't perform extra copy to /tmp - // Issue: https://github.com/feross/webtorrent/issues/357 - if (typeof input === 'string' && !opts.path) opts.path = path.dirname(input) + // When seeding from filesystem path, storage should use existing location + if (typeof input === 'string') opts.path = path.dirname(input) + if (!opts.createdBy) opts.createdBy = 'WebTorrent/' + VERSION var streams var torrent = self.add(undefined, opts, function (torrent) { var tasks = [function (cb) { - torrent.storage.load(streams, cb) + torrent.load(streams, cb) }] if (self.dht) { tasks.push(function (cb) { @@ -264,9 +248,11 @@ WebTorrent.prototype.seed = function (input, opts, onseed) { */ WebTorrent.prototype.remove = function (torrentId, cb) { var self = this + debug('remove') + var torrent = self.get(torrentId) if (!torrent) throw new Error('No torrent with id ' + torrentId) - debug('remove') + self.torrents.splice(self.torrents.indexOf(torrent), 1) torrent.destroy(cb) } @@ -282,20 +268,15 @@ WebTorrent.prototype.address = function () { */ WebTorrent.prototype.destroy = function (cb) { var self = this + if (self.destroyed) throw new Error('client already destroyed') self.destroyed = true debug('destroy') var tasks = self.torrents.map(function (torrent) { - return function (cb) { - self.remove(torrent, cb) - } + return function (cb) { self.remove(torrent, cb) } }) - if (self.dht) { - tasks.push(function (cb) { - self.dht.destroy(cb) - }) - } + if (self.dht) tasks.push(function (cb) { self.dht.destroy(cb) }) parallel(tasks, cb) } diff --git a/lib/append-to.js b/lib/append-to.js index 2c3e5d5..8d09b0b 100644 --- a/lib/append-to.js +++ b/lib/append-to.js @@ -17,7 +17,7 @@ var IFRAME_EXTS = [ '.css', '.html', '.js', '.md', '.pdf', '.txt' ] var MediaSource = typeof window !== 'undefined' && window.MediaSource module.exports = function appendTo (file, rootElem, cb) { - cb = dezalgo(cb || function () {}) + cb = dezalgo(cb || noop) var elem var extname = path.extname(file.name).toLowerCase() var currentTime = 0 @@ -159,3 +159,5 @@ module.exports = function appendTo (file, rootElem, cb) { if (cb) cb(err) } } + +function noop () {} diff --git a/lib/file-stream.js b/lib/file-stream.js index 5713aff..bd5f384 100644 --- a/lib/file-stream.js +++ b/lib/file-stream.js @@ -7,71 +7,58 @@ var stream = require('stream') inherits(FileStream, stream.Readable) /** - * A readable stream of a torrent file. + * Readable stream of a torrent file * - * @param {Object} file + * @param {File} file + * @param {Object} opts * @param {number} opts.start stream slice of file, starting from this byte (inclusive) * @param {number} opts.end stream slice of file, ending with this byte (inclusive) - * @param {number} opts.pieceLength length of an individual piece */ function FileStream (file, opts) { - var self = this - if (!(self instanceof FileStream)) return new FileStream(file, opts) - stream.Readable.call(self, opts) - debug('new filestream %s', JSON.stringify(opts)) - - if (!opts) opts = {} - if (!opts.start) opts.start = 0 - if (!opts.end) opts.end = file.length - 1 - - self.destroyed = false - self.length = opts.end - opts.start + 1 - - var offset = opts.start + file.offset - var pieceLength = opts.pieceLength - - self.startPiece = offset / pieceLength | 0 - self.endPiece = (opts.end + file.offset) / pieceLength | 0 - - self._storage = file.storage - self._piece = self.startPiece - self._missing = self.length - self._reading = false - self._notifying = false - self._criticalLength = Math.min((1024 * 1024 / pieceLength) | 0, 2) - self._offset = offset - (self.startPiece * pieceLength) + stream.Readable.call(this, opts) + + this.destroyed = false + this._torrent = file._torrent + + var start = (opts && opts.start) || 0 + var end = (opts && opts.end) || (file.length - 1) + var pieceLength = file._torrent.pieceLength + + this._startPiece = (start + file.offset) / pieceLength | 0 + this._endPiece = (end + file.offset) / pieceLength | 0 + + this._piece = this._startPiece + this._offset = (start + file.offset) - (this._startPiece * pieceLength) + + this._missing = end - start + 1 + this._reading = false + this._notifying = false + this._criticalLength = Math.min((1024 * 1024 / pieceLength) | 0, 2) } FileStream.prototype._read = function () { - var self = this - debug('_read') - if (self._reading) return - self._reading = true - self.notify() + if (this._reading) return + this._reading = true + this._notify() } -FileStream.prototype.notify = function () { +FileStream.prototype._notify = function () { var self = this - debug('notify') if (!self._reading || self._missing === 0) return - if (!self._storage.bitfield.get(self._piece)) { - return self._storage.emit('critical', self._piece, self._piece + self._criticalLength) + if (!self._torrent.bitfield.get(self._piece)) { + return self._torrent.critical(self._piece, self._piece + self._criticalLength) } if (self._notifying) return self._notifying = true var p = self._piece - self._storage.read(self._piece++, function (err, buffer) { + self._torrent.storage.get(p, function (err, buffer) { + console.log('GOT', p) self._notifying = false - if (self.destroyed) return - - if (err) { - self._storage.emit('error', err) - return self.destroy(err) - } - + if (self.destroyed) { console.log('destroyed'); return } + if (err) return self.destroy(err) debug('read %s (length %s) (err %s)', p, buffer.length, err && err.message) if (self._offset) { @@ -83,6 +70,7 @@ FileStream.prototype.notify = function () { buffer = buffer.slice(0, self._missing) } self._missing -= buffer.length + console.log('missing', self._missing) debug('pushing buffer of length %s', buffer.length) self._reading = false @@ -90,11 +78,11 @@ FileStream.prototype.notify = function () { if (self._missing === 0) self.push(null) }) + self._piece += 1 } FileStream.prototype.destroy = function () { - var self = this - if (self.destroyed) return - self.destroyed = true - self._storage.emit('deselect', self.startPiece, self.endPiece, true) + if (this.destroyed) return + this.destroyed = true + this._torrent.deselect(this._startPiece, this._endPiece, true) } diff --git a/lib/file.js b/lib/file.js new file mode 100644 index 0000000..d85784c --- /dev/null +++ b/lib/file.js @@ -0,0 +1,120 @@ +module.exports = File + +var appendTo = require('./append-to') +var eos = require('end-of-stream') +var EventEmitter = require('events').EventEmitter +var FileStream = require('./file-stream') +var inherits = require('inherits') +var mime = require('./mime.json') +var path = require('path') + +inherits(File, EventEmitter) + +/** + * @param {Torrent} torrent torrent that the file belongs to + * @param {Object} file file object from the parsed torrent + */ +function File (torrent, file) { + EventEmitter.call(this) + + this._torrent = torrent + + this.name = file.name + this.path = file.path + this.length = file.length + this.offset = file.offset + + this.done = (this.length === 0) + + var start = file.offset + var end = start + file.length - 1 + + this._startPiece = start / this._torrent.pieceLength | 0 + this._endPiece = end / this._torrent.pieceLength | 0 +} + +/** + * Selects the file to be downloaded, but at a lower priority than files with streams. + * Useful if you know you need the file at a later stage. + */ +File.prototype.select = function () { + if (this.length === 0) return + this._torrent.select(this._startPiece, this._endPiece, false) +} + +/** + * Deselects the file, which means it won't be downloaded unless someone creates a stream + * for it. + */ +File.prototype.deselect = function () { + if (this.length === 0) return + this._torrent.deselect(this._startPiece, this._endPiece, false) +} + +/** + * Create a readable stream to the file. Pieces needed by the stream will be prioritized + * highly and fetched from the swarm first. + * + * @param {Object} opts + * @param {number} opts.start stream slice of file, starting from this byte (inclusive) + * @param {number} opts.end stream slice of file, ending with this byte (inclusive) + * @return {FileStream} + */ +File.prototype.createReadStream = function (opts) { + var self = this + var stream = new FileStream(self, opts) + self._torrent.select(stream._startPiece, stream._endPiece, true, function () { + stream._notify() + }) + eos(stream, function () { + self._torrent.deselect(stream._startPiece, stream._endPiece, true) + }) + return stream +} + +/** + * @param {function} cb + */ +File.prototype.getBuffer = function (cb) { + var buf = new Buffer(this.length) + var offset = 0 + this.createReadStream() + .on('data', function (chunk) { + console.log('data') + chunk.copy(buf, offset) + offset += chunk.length + }) + .on('end', function () { + console.log('END') + cb(null, buf) + }) + .on('error', cb) +} + +/** + * @param {function} cb + */ +File.prototype.getBlobURL = function (cb) { + var self = this + if (typeof window === 'undefined') throw new Error('browser-only method') + + self.getBuffer(function (err, buffer) { + if (err) return cb(err) + var ext = path.extname(self.name).toLowerCase() + var type = mime[ext] + var blob = new window.Blob([ buffer ], type && { type: type }) + var url = window.URL.createObjectURL(blob) + cb(null, url) + }) +} + +/** + * Show the file in a the browser by appending it to the DOM. + * @param {Element|string} elem + * @param {function} cb + */ +File.prototype.appendTo = function (elem, cb) { + if (typeof window === 'undefined') throw new Error('browser-only method') + if (typeof elem === 'string') elem = document.querySelector(elem) + appendTo(this, elem, cb) +} diff --git a/lib/fs-storage.js b/lib/fs-storage.js deleted file mode 100644 index 1172c14..0000000 --- a/lib/fs-storage.js +++ /dev/null @@ -1,211 +0,0 @@ -module.exports = FSStorage - -var dezalgo = require('dezalgo') -var fs = require('fs') -var inherits = require('inherits') -var mkdirp = require('mkdirp') -var os = require('os') -var path = require('path') -var raf = require('random-access-file') -var rimraf = require('rimraf') -var Storage = require('./storage') -var thunky = require('thunky') - -var TMP = path.join(fs.existsSync('/tmp') ? '/tmp' : os.tmpDir(), 'webtorrent') - -inherits(FSStorage, Storage) - -/** - * fs-backed Storage for a torrent download. - * - * @param {Object} parsedTorrent - * @param {Object} opts - */ -function FSStorage (parsedTorrent, opts) { - var self = this - - self.tmp = opts.tmp || TMP - self.path = opts.path || path.join(self.tmp, parsedTorrent.infoHash) - - self.piecesMap = [] - self.nonExistentError = new Error('Cannot read from non-existent file') - - opts.nobuffer = true - Storage.call(self, parsedTorrent, opts) - - self.files.forEach(function (file) { - var fileStart = file.offset - var fileEnd = fileStart + file.length - - var pieceLength = file.pieceLength - var filePath = path.join(self.path, file.path) - - var openWrite = thunky(function (cb) { - var fileDir = path.dirname(filePath) - - mkdirp(fileDir, function (err) { - if (err) return cb(err) - if (self.closed) return cb(new Error('Storage closed')) - - var fd = raf(filePath) - file.fd = fd - cb(null, fd) - }) - }) - - var openRead = thunky(function (cb) { - // TODO: no need for fs.exists call, just try opening and handle error. - // fs.exists then open creates opportunity for race condition. - fs.exists(filePath, function (exists) { - if (exists) return openWrite(cb) - cb(self.nonExistentError) - }) - }) - - file.pieces.forEach(function (piece) { - var index = piece.index - - var pieceStart = index * pieceLength - var pieceEnd = pieceStart + piece.length - - var from = (fileStart < pieceStart) ? 0 : fileStart - pieceStart - var to = (fileEnd > pieceEnd) ? pieceLength : fileEnd - pieceStart - var offset = (fileStart > pieceStart) ? 0 : pieceStart - fileStart - - if (!self.piecesMap[index]) self.piecesMap[index] = [] - - self.piecesMap[index].push({ - from: from, - to: to, - offset: offset, - openWrite: openWrite, - openRead: openRead - }) - }) - }) -} - -FSStorage.prototype.readBlock = function (index, offset, length, cb) { - var self = this - cb = dezalgo(cb) - var piece = self.pieces[index] - if (!piece) return cb(new Error('invalid piece index ' + index)) - - if (piece.verified && piece.buffer) { - // piece is verified and cached in memory, so read directly from its buffer - // instead of reading from the filesystem. - return piece.readBlock(offset, length, cb) - } - - var rangeFrom = offset - var rangeTo = rangeFrom + length - - var targets = self.piecesMap[index].filter(function (target) { - return (target.to > rangeFrom && target.from < rangeTo) - }) - - if (!targets.length) return cb(new Error('no file matching the requested range?')) - - var buffers = [] - var end = targets.length - var i = 0 - - function readFromNextFile (err, buffer) { - if (err) return cb(err) - if (buffer) buffers.push(buffer) - if (i >= end) return cb(null, Buffer.concat(buffers)) - - var target = targets[i++] - - var from = target.from - var to = target.to - var offset = target.offset - - if (to > rangeTo) to = rangeTo - if (from < rangeFrom) { - offset += (rangeFrom - from) - from = rangeFrom - } - - target.openRead(function (err, file) { - if (self.closed) return - if (err) { - return err === self.nonExistentError - ? readFromNextFile(null, new Buffer(0)) - : cb(err) - } - file.read(offset, to - from, readFromNextFile) - }) - } - - readFromNextFile() -} - -// flush pieces to file once they're done and verified -FSStorage.prototype._onPieceDone = function (piece) { - var self = this - var targets = self.piecesMap[piece.index] - var end = targets.length - var i = 0 - - function done () { - Storage.prototype._onPieceDone.call(self, piece) - } - - if (!piece.buffer || self.readonly) return done() - - function writeToNextFile (err) { - if (err) return self.emit('error', err) - if (i >= end) { - // piece.buffer = null // TODO: free this memory! - return done() - } - - var target = targets[i++] - target.openWrite(function (err, file) { - if (self.closed) return - if (err) return self.emit('error', err) - file.write(target.offset, piece.buffer.slice(target.from, target.to), writeToNextFile) - }) - } - - writeToNextFile() -} - -/** - * Removes and cleans up any backing store for this storage. - */ -FSStorage.prototype.remove = function (cb) { - var self = this - if (!cb) cb = function () {} - - self.close(function (err) { - if (err) return cb(err) - var root = self.files[0].path.split(path.sep)[0] - rimraf(path.join(self.path, root), cb) - }) -} - -/** - * Closes the backing store for this storage. - */ -FSStorage.prototype.close = function (cb) { - var self = this - if (!cb) cb = function () {} - if (self.closed) return cb() - - Storage.prototype.close.call(self, function (err) { - if (err) return cb(err) - - var i = 0 - function loop (err) { - if (i >= self.files.length) return cb() - if (err) return cb(err) - var next = self.files[i++] - if (!next || !next.fd) return process.nextTick(loop) - next.fd.close(loop) - } - - process.nextTick(loop) - }) -} diff --git a/lib/load-chunk-store.js b/lib/load-chunk-store.js new file mode 100644 index 0000000..f935935 --- /dev/null +++ b/lib/load-chunk-store.js @@ -0,0 +1,61 @@ +// TODO: publish this as a standalone module + +module.exports = loadChunkStore + +var BlockStream = require('block-stream2') +var MultiStream = require('multistream') + +function loadChunkStore (streams, store, chunkLength, cb) { + if (!Array.isArray(streams)) streams = [ streams ] + if (!cb) cb = noop + + var index = 0 + var outstandingPuts = 0 + var finished = false + + var multistream = new MultiStream(streams) + var blockstream = new BlockStream(chunkLength, { zeroPadding: false }) + + multistream + .on('error', onError) + .pipe(blockstream) + .on('data', onData) + .on('finish', onFinish) + .on('error', onError) + + function onData (chunk) { + outstandingPuts += 1 + store.put(index, chunk, function (err) { + if (err) return onError(err) + outstandingPuts -= 1 + maybeDone() + }) + index += 1 + } + + function onFinish () { + finished = true + maybeDone() + } + + function onError (err) { + cleanup() + cb(err) + } + + function maybeDone () { + if (finished && outstandingPuts === 0) { + cleanup() + cb(null) + } + } + + function cleanup () { + multistream.removeListener('error', onError) + blockstream.removeListener('data', onData) + blockstream.removeListener('finish', onFinish) + blockstream.removeListener('error', onError) + } +} + +function noop () {} diff --git a/lib/storage.js b/lib/storage.js deleted file mode 100644 index 27444ed..0000000 --- a/lib/storage.js +++ /dev/null @@ -1,683 +0,0 @@ -module.exports = exports = Storage - -var appendTo = require('./append-to') -var BitField = require('bitfield') -var BlockStream = require('block-stream2') -var debug = require('debug')('webtorrent:storage') -var dezalgo = require('dezalgo') -var eos = require('end-of-stream') -var EventEmitter = require('events').EventEmitter -var FileStream = require('./file-stream') -var inherits = require('inherits') -var mime = require('./mime.json') -var MultiStream = require('multistream') -var once = require('once') -var path = require('path') -var sha1 = require('simple-sha1') - -var BLOCK_LENGTH = 16 * 1024 - -var BLOCK_BLANK = exports.BLOCK_BLANK = 0 -var BLOCK_RESERVED = exports.BLOCK_RESERVED = 1 -var BLOCK_WRITTEN = exports.BLOCK_WRITTEN = 2 - -function noop () {} - -inherits(Piece, EventEmitter) - -/** - * A torrent piece - * - * @param {number} index piece index - * @param {string} hash sha1 hash (hex) for this piece - * @param {Buffer|number} buffer backing buffer, or piece length if backing buffer is lazy - * @param {boolean=} noVerify skip piece verification (used when seeding a new file) - */ -function Piece (index, hash, buffer, noVerify) { - var self = this - EventEmitter.call(self) - if (!debug.enabled) self.setMaxListeners(0) - - self.index = index - self.hash = hash - self.noVerify = !!noVerify - - if (typeof buffer === 'number') { - // alloc buffer lazily - self.buffer = null - self.length = buffer - } else { - // use buffer provided - self.buffer = buffer - self.length = buffer.length - } - - self._reset() -} - -Piece.prototype.readBlock = function (offset, length, cb) { - var self = this - cb = dezalgo(cb) - if (!self.buffer || !self._verifyOffset(offset)) { - return cb(new Error('invalid block offset ' + offset)) - } - cb(null, self.buffer.slice(offset, offset + length)) -} - -Piece.prototype.writeBlock = function (offset, buffer, cb) { - var self = this - cb = dezalgo(cb) - if (!self._verifyOffset(offset) || !self._verifyBlock(offset, buffer)) { - return cb(new Error('invalid block ' + offset + ':' + buffer.length)) - } - self._lazyAllocBuffer() - - var i = offset / BLOCK_LENGTH - if (self.blocks[i] === BLOCK_WRITTEN) { - return cb(null) - } - - buffer.copy(self.buffer, offset) - self.blocks[i] = BLOCK_WRITTEN - self.blocksWritten += 1 - - if (self.blocksWritten === self.blocks.length) { - self.verify() - } - - cb(null) -} - -Piece.prototype.reserveBlock = function (endGame) { - var self = this - var len = self.blocks.length - for (var i = 0; i < len; i++) { - if ((self.blocks[i] && !endGame) || self.blocks[i] === BLOCK_WRITTEN) { - continue - } - self.blocks[i] = BLOCK_RESERVED - return { - offset: i * BLOCK_LENGTH, - length: (i === len - 1) - ? self.length - (i * BLOCK_LENGTH) - : BLOCK_LENGTH - } - } - return null -} - -Piece.prototype.cancelBlock = function (offset) { - var self = this - if (!self._verifyOffset(offset)) { - return false - } - - var i = offset / BLOCK_LENGTH - if (self.blocks[i] === BLOCK_RESERVED) { - self.blocks[i] = BLOCK_BLANK - } - - return true -} - -Piece.prototype._reset = function () { - var self = this - self.verified = false - self.blocks = new Buffer(Math.ceil(self.length / BLOCK_LENGTH)) - if (!process.browser) self.blocks.fill(0) - self.blocksWritten = 0 -} - -Piece.prototype.verify = function (buffer) { - var self = this - if (!buffer) buffer = self.buffer - if (self.verified || !buffer) { - return - } - - if (self.noVerify) { - self.verified = true - onResult() - return - } - - sha1(buffer, function (expectedHash) { - self.verified = (expectedHash === self.hash) - onResult() - }) - - function onResult () { - if (self.verified) { - self.emit('done') - } else { - self.emit('warning', new Error('piece ' + self.index + ' failed verification')) - self._reset() - } - } -} - -Piece.prototype._verifyOffset = function (offset) { - var self = this - if (offset % BLOCK_LENGTH === 0) { - return true - } else { - self.emit( - 'warning', - new Error('invalid block offset ' + offset + ', not multiple of ' + BLOCK_LENGTH) - ) - return false - } -} - -Piece.prototype._verifyBlock = function (offset, buffer) { - var self = this - if (buffer.length === BLOCK_LENGTH) { - // normal block length - return true - } else if (buffer.length === self.length - offset && - self.length - offset < BLOCK_LENGTH) { - // last block in piece is allowed to be less than block length - return true - } else { - self.emit('warning', new Error('invalid block size ' + buffer.length)) - return false - } -} - -Piece.prototype._lazyAllocBuffer = function () { - var self = this - if (!self.buffer) self.buffer = new Buffer(self.length) -} - -inherits(File, EventEmitter) - -/** - * A torrent file - * - * @param {Storage} storage Storage container object - * @param {Object} file the file object from the parsed torrent - * @param {Array.<Piece>} pieces backing pieces for this file - * @param {number} pieceLength the length in bytes of a non-terminal piece - */ -function File (storage, file, pieces, pieceLength) { - var self = this - EventEmitter.call(self) - if (!debug.enabled) self.setMaxListeners(0) - - self.storage = storage - self.name = file.name - self.path = file.path - self.length = file.length - self.offset = file.offset - self.pieces = pieces - self.pieceLength = pieceLength - self.done = false - - self._blobUrl = null - self._blobUrlPending = false - - self.pieces.forEach(function (piece) { - piece.on('done', function () { - self._checkDone() - }) - }) - - // if the file is zero-length, it will be done upon initialization - self._checkDone() -} - -/** - * Selects the file to be downloaded, but at a lower priority than files with streams. - * Useful if you know you need the file at a later stage. - */ -File.prototype.select = function () { - var self = this - if (self.pieces.length > 0) { - var start = self.pieces[0].index - var end = self.pieces[self.pieces.length - 1].index - self.storage.emit('select', start, end, false) - } -} - -/** - * Deselects the file, which means it won't be downloaded unless someone creates a stream - * for it. - */ -File.prototype.deselect = function () { - var self = this - if (self.pieces.length > 0) { - var start = self.pieces[0].index - var end = self.pieces[self.pieces.length - 1].index - self.storage.emit('deselect', start, end, false) - } -} - -/** - * Create a readable stream to the file. Pieces needed by the stream will be prioritized - * highly and fetched from the swarm first. - * - * @param {Object} opts - * @param {number} opts.start stream slice of file, starting from this byte (inclusive) - * @param {number} opts.end stream slice of file, ending with this byte (inclusive) - * @return {stream.Readable} - */ -File.prototype.createReadStream = function (opts) { - var self = this - if (!opts) opts = {} - if (opts.pieceLength == null) opts.pieceLength = self.pieceLength - var stream = new FileStream(self, opts) - self.storage.emit('select', stream.startPiece, stream.endPiece, true, stream.notify.bind(stream)) - eos(stream, function () { - self.storage.emit('deselect', stream.startPiece, stream.endPiece, true) - }) - - return stream -} - -/** - * @param {function} cb - */ -File.prototype.getBuffer = function (cb) { - var self = this - cb = dezalgo(once(cb)) - - var buffer - if (self.storage.buffer) { - // Use the in-memory buffer (when possible) for better memory utilization - var onDone = function () { - buffer = self.storage.buffer.slice(self.offset, self.offset + self.length) - cb(null, buffer) - } - if (self.done) onDone() - else self.once('done', onDone) - } else { - buffer = new Buffer(self.length) - var start = 0 - self.createReadStream() - .on('data', function (chunk) { - chunk.copy(buffer, start) - start += chunk.length - }) - .on('end', function () { - cb(null, buffer) - }) - .on('error', cb) - } -} - -File.prototype.appendTo = function (elem, cb) { - var self = this - if (typeof window === 'undefined') throw new Error('browser-only method') - if (typeof elem === 'string') elem = document.querySelector(elem) - - appendTo(self, elem, cb) -} - -/** - * Note: This function is async to support different types of (async) storage backends in - * the future. - * @param {function} cb - */ -File.prototype.getBlobURL = function (cb) { - var self = this - if (typeof window === 'undefined') throw new Error('browser-only method') - cb = dezalgo(cb) - - if (self._blobUrl) return cb(null, self._blobUrl) - if (self._blobUrlPending) return self.once('_blobUrl', cb) - - self._blobUrlPending = true - - self.getBuffer(function (err, buffer) { - self._blobUrlPending = false - if (err) { - cb(err) - self.emit('_blobUrl', err) - return - } - - var type = mime[path.extname(self.name).toLowerCase()] - var blob = type - ? new window.Blob([ buffer ], { type: type }) - : new window.Blob([ buffer ]) - self._blobUrl = window.URL.createObjectURL(blob) - - cb(null, self._blobUrl) - self.emit('_blobUrl', null, self._blobUrl) - }) -} - -File.prototype._checkDone = function () { - var self = this - self.done = self.pieces.every(function (piece) { - return piece.verified - }) - - if (self.done) { - process.nextTick(function () { - self.emit('done') - }) - } -} - -inherits(Storage, EventEmitter) - -/** - * Storage for a torrent download. Handles the complexities of reading and writing - * to pieces and files. - * - * @param {Object} parsedTorrent - * @param {Object} opts - */ -function Storage (parsedTorrent, opts) { - var self = this - EventEmitter.call(self) - if (!debug.enabled) self.setMaxListeners(0) - if (!opts) opts = {} - - self.bitfield = new BitField(parsedTorrent.pieces.length) - - self.done = false - self.closed = false - self.readonly = true - - if (!opts.nobuffer) { - self.buffer = new Buffer(parsedTorrent.length) - } - - var pieceLength = self.pieceLength = parsedTorrent.pieceLength - var lastPieceLength = parsedTorrent.lastPieceLength - var numPieces = parsedTorrent.pieces.length - - self.pieces = parsedTorrent.pieces.map(function (hash, index) { - var start = index * pieceLength - var end = start + (index === numPieces - 1 ? lastPieceLength : pieceLength) - - // if we're backed by a buffer, the piece's buffer will reference the same memory. - // otherwise, the piece's buffer will be lazily created on demand - var buffer = (self.buffer ? self.buffer.slice(start, end) : end - start) - - var piece = new Piece(index, hash, buffer, !!opts.noVerify) - piece.on('done', self._onPieceDone.bind(self, piece)) - return piece - }) - - self.files = parsedTorrent.files.map(function (fileObj) { - var start = fileObj.offset - var end = start + fileObj.length - 1 - - var startPiece = start / pieceLength | 0 - var endPiece = end / pieceLength | 0 - var pieces = self.pieces.slice(startPiece, endPiece + 1) - - var file = new File(self, fileObj, pieces, pieceLength) - file.on('done', self._onFileDone.bind(self, file)) - return file - }) -} - -Storage.BLOCK_LENGTH = BLOCK_LENGTH - -Storage.prototype.load = function (streams, cb) { - var self = this - if (!Array.isArray(streams)) streams = [ streams ] - cb = once(cb || function () {}) - - var pieceIndex = 0 - var multistream = new MultiStream(streams) - var blockstream = new BlockStream(self.pieceLength, { zeroPadding: false }) - - multistream.on('error', onError) - - self.once('done', onDone) - - multistream - .pipe(blockstream) - .on('data', onData) - .on('error', onError) - - function onData (piece) { - var index = pieceIndex - pieceIndex += 1 - - var blockIndex = 0 - var s = new BlockStream(BLOCK_LENGTH, { zeroPadding: false }) - - s.on('data', onBlockData) - s.on('end', onBlockEnd) - - function onBlockData (block) { - var offset = blockIndex * BLOCK_LENGTH - blockIndex += 1 - self.writeBlock(index, offset, block) - } - - function onBlockEnd () { - blockCleanup() - } - - function blockCleanup () { - s.removeListener('data', onBlockData) - s.removeListener('end', onBlockEnd) - } - - s.end(piece) - } - - function onError (err) { - cleanup() - cb(err) - } - - function onDone () { - cleanup() - cb(null) - } - - function cleanup () { - multistream.removeListener('error', onError) - blockstream.removeListener('data', onData) - blockstream.removeListener('error', onError) - self.removeListener('done', onDone) - } -} - -Object.defineProperty(Storage.prototype, 'downloaded', { - get: function () { - var self = this - return self.pieces.reduce(function (total, piece) { - return total + (piece.verified ? piece.length : piece.blocksWritten * BLOCK_LENGTH) - }, 0) - } -}) - -/** - * The number of missing pieces. Used to implement 'end game' mode. - */ -Object.defineProperty(Storage.prototype, 'numMissing', { - get: function () { - var self = this - var numMissing = self.pieces.length - for (var index = 0, len = self.pieces.length; index < len; index++) { - numMissing -= self.bitfield.get(index) - } - return numMissing - } -}) - -/** - * Reads a block from a piece. - * - * @param {number} index piece index - * @param {number} offset byte offset within piece - * @param {number} length length in bytes to read from piece - * @param {function} cb - */ -Storage.prototype.readBlock = function (index, offset, length, cb) { - var self = this - cb = dezalgo(cb) - var piece = self.pieces[index] - if (!piece) return cb(new Error('invalid piece index ' + index)) - piece.readBlock(offset, length, cb) -} - -/** - * Writes a block to a piece. - * - * @param {number} index piece index - * @param {number} offset byte offset within piece - * @param {Buffer} buffer buffer to write - * @param {function} cb - */ -Storage.prototype.writeBlock = function (index, offset, buffer, cb) { - var self = this - if (!cb) cb = noop - cb = dezalgo(cb) - - if (self.readonly) return cb(new Error('cannot write to readonly storage')) - var piece = self.pieces[index] - if (!piece) return cb(new Error('invalid piece index ' + index)) - piece.writeBlock(offset, buffer, cb) -} - -/** - * Reads a piece or a range of a piece. - * - * @param {number} index piece index - * @param {Object=} range optional range within piece - * @param {number} range.offset byte offset within piece - * @param {number} range.length length in bytes to read from piece - * @param {function} cb - * @param {boolean} force optionally overrides default check preventing reading - * from unverified piece - */ -Storage.prototype.read = function (index, range, cb, force) { - var self = this - - if (typeof range === 'function') { - force = cb - cb = range - range = null - } - cb = dezalgo(cb) - - var piece = self.pieces[index] - if (!piece) { - return cb(new Error('invalid piece index ' + index)) - } - - if (!piece.verified && !force) { - return cb(new Error('Storage.read called on incomplete piece ' + index)) - } - - var offset = 0 - var length = piece.length - - if (range) { - offset = range.offset || 0 - length = range.length || length - } - - if (piece.buffer) { - // shortcut for piece with static backing buffer - return cb(null, piece.buffer.slice(offset, offset + length)) - } - - var blocks = [] - function readNextBlock () { - if (length <= 0) return cb(null, Buffer.concat(blocks)) - - var blockOffset = offset - var blockLength = Math.min(BLOCK_LENGTH, length) - - offset += blockLength - length -= blockLength - - self.readBlock(index, blockOffset, blockLength, function (err, block) { - if (err) return cb(err) - - blocks.push(block) - readNextBlock() - }) - } - - readNextBlock() -} - -/** - * Reserves a block from the given piece. - * - * @param {number} index piece index - * @param {Boolean} endGame whether or not end game mode is enabled - * - * @returns {Object|null} reservation with offset and length or null if failed. - */ -Storage.prototype.reserveBlock = function (index, endGame) { - var self = this - var piece = self.pieces[index] - if (!piece) return null - - return piece.reserveBlock(endGame) -} - -/** - * Cancels a previous block reservation from the given piece. - * - * @param {number} index piece index - * @param {number} offset byte offset of block in piece - * - * @returns {Boolean} - */ -Storage.prototype.cancelBlock = function (index, offset) { - var self = this - var piece = self.pieces[index] - if (!piece) return false - - return piece.cancelBlock(offset) -} - -/** - * Removes and cleans up any backing store for this storage. - * @param {function=} cb - */ -Storage.prototype.remove = function (cb) { - if (cb) dezalgo(cb)(null) -} - -/** - * Closes the backing store for this storage. - * @param {function=} cb - */ -Storage.prototype.close = function (cb) { - var self = this - self.closed = true - if (cb) dezalgo(cb)(null) -} - -// -// HELPER METHODS -// - -Storage.prototype._onPieceDone = function (piece) { - var self = this - self.bitfield.set(piece.index) - debug('piece done ' + piece.index + ' (' + self.numMissing + ' still missing)') - self.emit('piece', piece) -} - -Storage.prototype._onFileDone = function (file) { - var self = this - debug('file done ' + file.name) - self.emit('file', file) - - self._checkDone() -} - -Storage.prototype._checkDone = function () { - var self = this - - if (!self.done && self.files.every(function (file) { return file.done })) { - self.done = true - self.emit('done') - } -} diff --git a/lib/torrent.js b/lib/torrent.js index eff6433..8d5b8f3 100644 --- a/lib/torrent.js +++ b/lib/torrent.js @@ -1,28 +1,38 @@ module.exports = Torrent var addrToIPPort = require('addr-to-ip-port') // browser exclude +var BitField = require('bitfield') var createTorrent = require('create-torrent') var debug = require('debug')('webtorrent:torrent') var Discovery = require('torrent-discovery') var EventEmitter = require('events').EventEmitter +var extend = require('xtend/mutable') +var FSChunkStore = require('fs-chunk-store') +var ImmediateChunkStore = require('immediate-chunk-store') var inherits = require('inherits') +var os = require('os') var parallel = require('run-parallel') var parseTorrent = require('parse-torrent') +var path = require('path') +var pathExists = require('path-exists') +var Piece = require('torrent-piece') var randomIterate = require('random-iterate') var reemit = require('re-emitter') +var sha1 = require('simple-sha1') var Swarm = require('bittorrent-swarm') var uniq = require('uniq') var ut_metadata = require('ut_metadata') var ut_pex = require('ut_pex') // browser exclude +var File = require('./file') +var loadChunkStore = require('./load-chunk-store') var RarityMap = require('./rarity-map') var Server = require('./server') // browser exclude -var Storage = require('./storage') var MAX_BLOCK_LENGTH = 128 * 1024 var PIECE_TIMEOUT = 30000 var CHOKE_TIMEOUT = 5000 -var SPEED_THRESHOLD = 3 * Storage.BLOCK_LENGTH +var SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH var PIPELINE_MIN_DURATION = 0.5 var PIPELINE_MAX_DURATION = 1 @@ -30,13 +40,11 @@ var PIPELINE_MAX_DURATION = 1 var RECHOKE_INTERVAL = 10000 // 10 seconds var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds -function noop () {} +var TMP = path.join(pathExists.sync('/tmp') ? '/tmp' : os.tmpDir(), 'webtorrent') inherits(Torrent, EventEmitter) /** - * A torrent - * * @param {string|Buffer|Object} torrentId * @param {Object} opts */ @@ -46,14 +54,14 @@ function Torrent (torrentId, opts) { if (!debug.enabled) self.setMaxListeners(0) debug('new torrent') - self.opts = opts self.client = opts.client - self.hotswapEnabled = ('hotswap' in opts ? opts.hotswap : true) - self.verify = opts.verify + self.announce = opts.announce + self.urlList = opts.urlList + + self._path = opts.path + self._storage = opts.storage || FSChunkStore - self.chokeTimeout = opts.chokeTimeout || CHOKE_TIMEOUT - self.pieceTimeout = opts.pieceTimeout || PIECE_TIMEOUT self.strategy = opts.strategy || 'sequential' self._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0) @@ -63,32 +71,24 @@ function Torrent (torrentId, opts) { self._rechokeOptimisticTime = 0 self._rechokeIntervalId = null - self.infoHash = null self.ready = false self.destroyed = false - self.files = [] self.metadata = null - self.parsedTorrent = null self.storage = null self.numBlockedPeers = 0 + self.files = null + self._amInterested = false self._selections = [] self._critical = [] - self._storageImpl = opts.storage || Storage - this._torrentFileURL = null + + // for cleanup self._servers = [] if (torrentId) self._onTorrentId(torrentId) } -// torrent size (in bytes) -Object.defineProperty(Torrent.prototype, 'length', { - get: function () { - return (this.parsedTorrent && this.parsedTorrent.length) || 0 - } -}) - -// time remaining (in milliseconds) +// Time remaining (in milliseconds) Object.defineProperty(Torrent.prototype, 'timeRemaining', { get: function () { if (this.swarm.downloadSpeed() === 0) return Infinity @@ -96,60 +96,62 @@ Object.defineProperty(Torrent.prototype, 'timeRemaining', { } }) -// percentage complete, represented as a number between 0 and 1 -Object.defineProperty(Torrent.prototype, 'progress', { - get: function () { - return (this.parsedTorrent && (this.downloaded / this.parsedTorrent.length)) || 0 - } -}) - -// bytes downloaded (not necessarily verified) +// Bytes downloaded Object.defineProperty(Torrent.prototype, 'downloaded', { - get: function () { - return (this.storage && this.storage.downloaded) || 0 - } + get: function () { return this.swarm ? this.swarm.downloaded : 0 } }) -// bytes uploaded +// Bytes uploaded Object.defineProperty(Torrent.prototype, 'uploaded', { - get: function () { - return this.swarm.uploaded - } + get: function () { return this.swarm ? this.swarm.uploaded : 0 } }) -// ratio of bytes downloaded to uploaded -Object.defineProperty(Torrent.prototype, 'ratio', { - get: function () { - return (this.uploaded && (this.downloaded / this.uploaded)) || 0 - } +// TODO: add this and use it for "progress" property +// Object.defineProperty(Torrent.prototype, 'verified', { +// get: function () { +// var self = this +// return self.pieces.reduce(function (total, piece) { +// return total + (piece.verified ? piece.length : piece.blocksWritten * BLOCK_LENGTH) +// }, 0) +// } +// }) + +/** + * The number of missing pieces. Used to implement 'end game' mode. + */ +// Object.defineProperty(Storage.prototype, 'numMissing', { +// get: function () { +// var self = this +// var numMissing = self.pieces.length +// for (var index = 0, len = self.pieces.length; index < len; index++) { +// numMissing -= self.bitfield.get(index) +// } +// return numMissing +// } +// }) + +// Percentage complete, represented as a number between 0 and 1 +Object.defineProperty(Torrent.prototype, 'progress', { + get: function () { return this.length ? this.downloaded / this.length : 0 } }) -Object.defineProperty(Torrent.prototype, 'magnetURI', { - get: function () { - return parseTorrent.toMagnetURI(this.parsedTorrent) - } +// Seed ratio +Object.defineProperty(Torrent.prototype, 'ratio', { + get: function () { return this.uploaded / (this.downloaded || 1) } }) -Object.defineProperty(Torrent.prototype, 'torrentFile', { - get: function () { - return parseTorrent.toTorrentFile(this.parsedTorrent) - } +// Number of peers +Object.defineProperty(Torrent.prototype, 'numPeers', { + get: function () { return this.swarm ? this.swarm.numPeers : 0 } }) Object.defineProperty(Torrent.prototype, 'torrentFileURL', { get: function () { if (typeof window === 'undefined') throw new Error('browser-only property') - if (this._torrentFileURL) return this._torrentFileURL - this._torrentFileURL = window.URL.createObjectURL( + if (!this.torrentFile) return null + return window.URL.createObjectURL( new window.Blob([ this.torrentFile ], { type: 'application/x-bittorrent' }) ) - return this._torrentFileURL - } -}) - -Object.defineProperty(Torrent.prototype, 'numPeers', { - get: function () { - return this.swarm ? this.swarm.numPeers : 0 } }) @@ -179,35 +181,14 @@ Torrent.prototype._onTorrentId = function (torrentId) { Torrent.prototype._onParsedTorrent = function (parsedTorrent) { var self = this if (self.destroyed) return - self.parsedTorrent = parsedTorrent - self.infoHash = parsedTorrent.infoHash - - if (!self.infoHash) { - return self._onError(new Error('Malformed torrent data: Missing info hash.')) - } - - if (self.parsedTorrent.name) self.name = self.parsedTorrent.name // preliminary name - // Allow specifying trackers via `opts` parameter - if (self.opts.announce) { - self.parsedTorrent.announce = - self.parsedTorrent.announce.concat(self.opts.announce) - } + self._processParsedTorrent(parsedTorrent) - // So `webtorrent-hybrid` can force specific trackers to be used - if (global.WEBTORRENT_ANNOUNCE) { - self.parsedTorrent.announce = - self.parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE) - } - - // When no trackers specified, use some reasonable defaults - if (self.parsedTorrent.announce.length === 0) { - self.parsedTorrent.announce = createTorrent.announceList.map(function (list) { - return list[0] - }) + if (!self.infoHash) { + return self._onError(new Error('Malformed torrent data: No info hash')) } - uniq(self.parsedTorrent.announce) + if (!self._path) self._path = path.join(TMP, self.infoHash) // create swarm self.swarm = new Swarm(self.infoHash, self.client.peerId, { @@ -238,6 +219,37 @@ Torrent.prototype._onParsedTorrent = function (parsedTorrent) { }) } +Torrent.prototype._processParsedTorrent = function (parsedTorrent) { + if (this.announce) { + // Allow specifying trackers via `opts` parameter + parsedTorrent.announce = parsedTorrent.announce.concat(this.announce) + } + + if (global.WEBTORRENT_ANNOUNCE) { + // So `webtorrent-hybrid` can force specific trackers to be used + parsedTorrent.announce = parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE) + } + + if (parsedTorrent.announce.length === 0) { + // When no trackers specified, use some reasonable defaults + parsedTorrent.announce = createTorrent.announceList.map(function (list) { + return list[0] + }) + } + + if (this.urlList) { + // Allow specifying web seeds via `opts` parameter + parsedTorrent.urlList = parsedTorrent.urlList.concat(this.urlList) + } + + uniq(parsedTorrent.announce) + + extend(this, parsedTorrent) + + this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent) + this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent) +} + Torrent.prototype._onSwarmListening = function () { var self = this if (self.destroyed) return @@ -246,7 +258,7 @@ Torrent.prototype._onSwarmListening = function () { // begin discovering peers via the DHT and tracker servers self.discovery = new Discovery({ - announce: self.parsedTorrent.announce, + announce: self.announce, dht: self.client.dht, tracker: self.client.tracker, peerId: self.client.peerId, @@ -262,74 +274,73 @@ Torrent.prototype._onSwarmListening = function () { reemit(self.discovery, self, ['trackerAnnounce', 'dhtAnnounce', 'warning']) // if full metadata was included in initial torrent id, use it - if (self.parsedTorrent.info) self._onMetadata(self.parsedTorrent) + if (self.info) self._onMetadata(self) self.emit('listening', self.client.torrentPort) } /** - * Called when the metadata is received. + * Called when the full torrent metadata is received. */ Torrent.prototype._onMetadata = function (metadata) { var self = this if (self.metadata || self.destroyed) return debug('got metadata') + var parsedTorrent if (metadata && metadata.infoHash) { // `metadata` is a parsed torrent (from parse-torrent module) - self.metadata = parseTorrent.toTorrentFile(metadata) - self.parsedTorrent = metadata + parsedTorrent = metadata } else { - self.metadata = metadata - var announce = self.parsedTorrent.announce - var urlList = self.parsedTorrent.urlList try { - self.parsedTorrent = parseTorrent(self.metadata) + parsedTorrent = parseTorrent(metadata) } catch (err) { return self._onError(err) } - self.parsedTorrent.announce = announce - self.parsedTorrent.urlList = urlList } - // update preliminary torrent name - self.name = self.parsedTorrent.name + self._processParsedTorrent(parsedTorrent) + self.metadata = self.torrentFile // update discovery module with full torrent metadata - self.discovery.setTorrent(self.parsedTorrent) + self.discovery.setTorrent(self) // add web seed urls (BEP19) - if (self.parsedTorrent.urlList) { - self.parsedTorrent.urlList.forEach(self.addWebSeed.bind(self)) - } + if (self.urlList) self.urlList.forEach(self.addWebSeed.bind(self)) - self.rarityMap = new RarityMap(self.swarm, self.parsedTorrent.pieces.length) + self.rarityMap = new RarityMap(self.swarm, self.pieces.length) - self.storage = new self._storageImpl(self.parsedTorrent, self.opts) - self.storage.on('piece', self._onStoragePiece.bind(self)) - self.storage.on('file', function (file) { - self.emit('file', file) - }) + self.storage = new ImmediateChunkStore( + new self._storage(self.pieceLength, { + files: self.files.map(function (file) { + return { + path: path.join(self._path, file.path), + length: file.length, + offset: file.offset + } + }) + }) + ) - self._reservations = self.storage.pieces.map(function () { - return [] + self.files = self.files.map(function (file) { + return new File(self, file) }) - self.storage.on('done', function () { - if (self.discovery.tracker) self.discovery.tracker.complete() + self._hashes = self.pieces - debug('torrent ' + self.infoHash + ' done') - self.emit('done') + self.pieces = self.pieces.map(function (hash, i) { + var pieceLength = (i === self.pieces.length - 1) + ? self.lastPieceLength + : self.pieceLength + return new Piece(pieceLength) }) - self.storage.on('select', self.select.bind(self)) - self.storage.on('deselect', self.deselect.bind(self)) - self.storage.on('critical', self.critical.bind(self)) - - self.storage.files.forEach(function (file) { - self.files.push(file) + self._reservations = self.pieces.map(function () { + return [] }) + self.bitfield = new BitField(self.pieces.length) + self.swarm.wires.forEach(function (wire) { // If we didn't have the metadata at the time ut_metadata was initialized for this // wire, we still want to make it available to the peer in case they request it. @@ -338,37 +349,38 @@ Torrent.prototype._onMetadata = function (metadata) { self._onWireWithMetadata(wire) }) - if (self.verify) { - process.nextTick(function () { - debug('verifying existing torrent data') - var numPieces = 0 - var numVerified = 0 - - // TODO: move storage verification to storage.js? - parallel(self.storage.pieces.map(function (piece) { - return function (cb) { - self.storage.read(piece.index, function (err, buffer) { - numPieces += 1 - self.emit('verifying', { - percentDone: 100 * numPieces / self.storage.pieces.length, - percentVerified: 100 * numVerified / self.storage.pieces.length - }) - - if (!err && buffer) { - // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer - piece.verify(buffer) - numVerified += piece.verified - debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index) - } - // continue regardless of whether piece verification failed - cb() - }, true) // forces override to allow reading from non-verified pieces - } - }), self._onStorage.bind(self)) - }) - } else { - process.nextTick(self._onStorage.bind(self)) - } + // VERIFY TORRENT DATA + // TODO: remove nextTick + // process.nextTick(function () { + // debug('verifying existing torrent data') + // var numPieces = 0 + // var numVerified = 0 + + // // TODO: move storage verification to storage.js? + // parallel(self.storage.pieces.map(function (piece) { + // return function (cb) { + // self.storage.read(piece.index, function (err, buffer) { + // numPieces += 1 + // self.emit('verifying', { + // percentDone: 100 * numPieces / self.storage.pieces.length, + // percentVerified: 100 * numVerified / self.storage.pieces.length + // }) + + // if (!err && buffer) { + // // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer + // piece.verify(buffer) + // numVerified += piece.verified + // debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index) + // } + // // continue regardless of whether piece verification failed + // cb() + // }, true) // forces override to allow reading from non-verified pieces + // } + // }), self._onStorage.bind(self)) + // }) + process.nextTick(function () { + self._onStorage() + }) process.nextTick(function () { self.emit('metadata') @@ -376,6 +388,26 @@ Torrent.prototype._onMetadata = function (metadata) { } /** + * Called when the metadata, swarm, and underlying storage are all fully initialized. + */ +Torrent.prototype._onStorage = function () { + var self = this + if (self.destroyed) return + debug('on storage') + + // start off selecting the entire torrent with low priority + self.select(0, self.pieces.length - 1, false) + + self._rechokeIntervalId = setInterval(self._rechoke.bind(self), RECHOKE_INTERVAL) + if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref() + + process.nextTick(function () { + self.ready = true + self.emit('ready') + }) +} + +/** * Destroy and cleanup this torrent. */ Torrent.prototype.destroy = function (cb) { @@ -391,11 +423,6 @@ Torrent.prototype.destroy = function (cb) { self._rechokeIntervalId = null } - self.files.forEach(function (file) { - if (file._blobURL) window.URL.revokeObjectURL(file._blobURL) - }) - if (self._torrentFileURL) window.URL.revokeObjectURL(self._torrentFileURL) - var tasks = [] self._servers.forEach(function (server) { @@ -441,7 +468,7 @@ Torrent.prototype.addPeer = function (peer) { */ Torrent.prototype.addWebSeed = function (url) { var self = this - self.swarm.addWebSeed(url, self.parsedTorrent) + self.swarm.addWebSeed(url, self) } /** @@ -454,7 +481,7 @@ Torrent.prototype.addWebSeed = function (url) { */ Torrent.prototype.select = function (start, end, priority, notify) { var self = this - if (start > end || start < 0 || end >= self.storage.pieces.length) { + if (start > end || start < 0 || end >= self.pieces.length) { throw new Error('invalid selection ', start, ':', end) } priority = Number(priority) || 0 @@ -549,7 +576,7 @@ Torrent.prototype._onWire = function (wire, addr) { }) // Timeout for piece requests to this peer - wire.setTimeout(self.pieceTimeout, true) + wire.setTimeout(PIECE_TIMEOUT, true) // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire wire.setKeepAlive(true) @@ -598,7 +625,6 @@ Torrent.prototype._onWire = function (wire, addr) { Torrent.prototype._onWireWithMetadata = function (wire) { var self = this var timeoutId = null - var timeoutMs = self.chokeTimeout function onChokeTimeout () { if (self.destroyed || wire.destroyed) return @@ -607,15 +633,15 @@ Torrent.prototype._onWireWithMetadata = function (wire) { wire.amInterested) { wire.destroy() } else { - timeoutId = setTimeout(onChokeTimeout, timeoutMs) + timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) if (timeoutId.unref) timeoutId.unref() } } var i = 0 function updateSeedStatus () { - if (wire.peerPieces.length !== self.storage.pieces.length) return - for (; i < self.storage.pieces.length; ++i) { + if (wire.peerPieces.length !== self.pieces.length) return + for (; i < self.pieces.length; ++i) { if (!wire.peerPieces.get(i)) return } wire.isSeeder = true @@ -642,7 +668,7 @@ Torrent.prototype._onWireWithMetadata = function (wire) { wire.on('choke', function () { clearTimeout(timeoutId) - timeoutId = setTimeout(onChokeTimeout, timeoutMs) + timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) if (timeoutId.unref) timeoutId.unref() }) @@ -652,22 +678,18 @@ Torrent.prototype._onWireWithMetadata = function (wire) { }) wire.on('request', function (index, offset, length, cb) { - // Disconnect from peers that request more than 128KB, per spec if (length > MAX_BLOCK_LENGTH) { - debug( - 'got invalid block size request %s (from %s)', - length, wire.remoteAddress + ':' + wire.remotePort - ) + // Per spec, disconnect from peers that request >128KB return wire.destroy() } - - self.storage.readBlock(index, offset, length, cb) + if (self.pieces[index]) return + self.storage.get(index, { offset: offset, length: length }, cb) }) - wire.bitfield(self.storage.bitfield) // always send bitfield (required) + wire.bitfield(self.bitfield) // always send bitfield (required) wire.interested() // always start out interested - timeoutId = setTimeout(onChokeTimeout, timeoutMs) + timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT) if (timeoutId.unref) timeoutId.unref() wire.isSeeder = false @@ -675,45 +697,6 @@ Torrent.prototype._onWireWithMetadata = function (wire) { } /** - * Called when the metadata, swarm, and underlying storage are all fully initialized. - */ -Torrent.prototype._onStorage = function () { - var self = this - if (self.destroyed) return - debug('on storage') - - // allow writes to storage only after initial piece verification is finished - self.storage.readonly = false - - // start off selecting the entire torrent with low priority - self.select(0, self.storage.pieces.length - 1, false) - - self._rechokeIntervalId = setInterval(self._rechoke.bind(self), RECHOKE_INTERVAL) - if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref() - - process.nextTick(function () { - self.ready = true - self.emit('ready') - }) -} - -/** - * When a piece is fully downloaded, notify all peers with a HAVE message. - * @param {Piece} piece - */ -Torrent.prototype._onStoragePiece = function (piece) { - var self = this - debug('piece done %s', piece.index) - self._reservations[piece.index] = null - - self.swarm.wires.forEach(function (wire) { - wire.have(piece.index) - }) - - self._gcSelections() -} - -/** * Called on selection changes. */ Torrent.prototype._updateSelections = function () { @@ -737,13 +720,13 @@ Torrent.prototype._gcSelections = function () { var oldOffset = s.offset // check for newly downloaded pieces in selection - while (self.storage.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) { + while (self.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) { s.offset++ } if (oldOffset !== s.offset) s.notify() if (s.to !== s.from + s.offset) continue - if (!self.storage.bitfield.get(s.from + s.offset)) continue + if (!self.bitfield.get(s.from + s.offset)) continue // remove fully downloaded selection self._selections.splice(i--, 1) // decrement i to offset splice @@ -816,7 +799,6 @@ Torrent.prototype._updateWire = function (wire) { for (var i = self._selections.length; i--;) { var next = self._selections[i] - var piece if (self.strategy === 'rarest') { var start = next.from + next.offset @@ -849,15 +831,14 @@ Torrent.prototype._updateWire = function (wire) { var speed = wire.downloadSpeed() || 1 if (speed > SPEED_THRESHOLD) return function () { return true } - var secs = Math.max(1, wire.requests.length) * Storage.BLOCK_LENGTH / speed + var secs = Math.max(1, wire.requests.length) * Piece.BLOCK_LENGTH / speed var tries = 10 var ptr = 0 return function (index) { - if (!tries || self.storage.bitfield.get(index)) return true + if (!tries || self.bitfield.get(index)) return true - var piece = self.storage.pieces[index] - var missing = piece.blocks.length - piece.blocksWritten + var missing = self.pieces[index].missing for (; ptr < self.swarm.wires.length; ptr++) { var otherWire = self.swarm.wires[ptr] @@ -1017,10 +998,9 @@ Torrent.prototype._rechoke = function () { */ Torrent.prototype._hotswap = function (wire, index) { var self = this - if (!self.hotswapEnabled) return false var speed = wire.downloadSpeed() - if (speed < Storage.BLOCK_LENGTH) return false + if (speed < Piece.BLOCK_LENGTH) return false if (!self._reservations[index]) return false var r = self._reservations[index] @@ -1054,7 +1034,7 @@ Torrent.prototype._hotswap = function (wire, index) { var req = minWire.requests[i] if (req.piece !== index) continue - self.storage.cancelBlock(index, req.offset) + self.pieces[index].cancel((req.offset / Piece.BLOCK_SIZE) | 0) } self.emit('hotswap', minWire, wire, index) @@ -1068,66 +1048,121 @@ Torrent.prototype._request = function (wire, index, hotswap) { var self = this var numRequests = wire.requests.length - if (self.storage.bitfield.get(index)) return false + if (self.bitfield.get(index)) return false + var maxOutstandingRequests = getPipelineLength(wire, PIPELINE_MAX_DURATION) if (numRequests >= maxOutstandingRequests) return false + // var endGame = (wire.requests.length === 0 && self.storage.numMissing < 30) - var endGame = (wire.requests.length === 0 && self.storage.numMissing < 30) - var block = self.storage.reserveBlock(index, endGame) + var piece = self.pieces[index] + var reservation = piece.reserve() - if (!block && !endGame && hotswap && self._hotswap(wire, index)) { - block = self.storage.reserveBlock(index, false) + if (reservation === -1 && hotswap && self._hotswap(wire, index)) { + reservation = piece.reserve() } - if (!block) return false + if (reservation === -1) return false var r = self._reservations[index] - if (!r) { - r = self._reservations[index] = [] - } + if (!r) r = self._reservations[index] = [] var i = r.indexOf(null) if (i === -1) i = r.length r[i] = wire - function gotPiece (err, buffer) { - if (!self.ready) { - self.once('ready', function () { - gotPiece(err, buffer) - }) - return - } + var chunkOffset = piece.chunkOffset(reservation) + var chunkLength = piece.chunkLength(reservation) + + wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) { + // TODO: what is this for? + if (!self.ready) return self.once('ready', function () { onChunk(err, chunk) }) if (r[i] === wire) r[i] = null + if (piece !== self.pieces[index]) return onUpdateTick() + if (err) { debug( 'error getting piece %s (offset: %s length: %s) from %s: %s', - index, block.offset, block.length, wire.remoteAddress + ':' + wire.remotePort, + index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort, err.message ) - self.storage.cancelBlock(index, block.offset) - process.nextTick(self._update.bind(self)) - return false - } else { - debug( - 'got piece %s (offset: %s length: %s) from %s', - index, block.offset, block.length, wire.remoteAddress + ':' + wire.remotePort - ) - self.storage.writeBlock(index, block.offset, buffer, function (err) { - if (err) { - debug('error writing block') - self.storage.cancelBlock(index, block.offset) + piece.cancel(reservation) + onUpdateTick() + return + } + + debug( + 'got piece %s (offset: %s length: %s) from %s', + index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort + ) + + if (!piece.set(reservation, chunk, wire)) return onUpdateTick() + + var buf = piece.flush() + + // TODO: might need to set self.pieces[index] = null here since sha1 is async + + sha1(buf, function (hash) { + if (hash === self._hashes[index]) { + if (!self.pieces[index]) return + debug('piece verified %s', index) + + self.pieces[index] = null + self._reservations[index] = null + + self.bitfield.set(index, true) + self.storage.put(index, buf) + + self.swarm.wires.forEach(function (wire) { + wire.have(index) + }) + + // are any new files done? + self.files.forEach(function (file) { + if (file.done) return + for (var i = file._startPiece; i <= file._endPiece; ++i) { + if (!self.bitfield.get(i)) return + } + file.done = true + file.emit('done') + debug('file done: ' + file.name) + }) + + // is the torrent done? + if (self.files.every(function (file) { return file.done })) { + self.done = true + self.emit('done') + debug('torrent done: ' + self.infoHash) + if (self.discovery.tracker) self.discovery.tracker.complete() } - process.nextTick(self._update.bind(self)) - }) - } - } + self._gcSelections() + } else { + self.pieces[index] = new Piece(piece.length) + self.emit('warning', new Error('Piece ' + index + ' failed verification')) + } + onUpdateTick() + }) + }) - wire.request(index, block.offset, block.length, gotPiece) + function onUpdateTick () { + process.nextTick(function () { self._update() }) + } return true } +Torrent.prototype.load = function (streams, cb) { + var self = this + loadChunkStore(streams, this.storage, Piece.BLOCK_LENGTH, function (err) { + if (err) return cb(err) + self.pieces.forEach(function (piece, index) { + self.pieces[index] = null + self.bitfield.set(index, true) + }) + cb(null) + }) +} + Torrent.prototype.createServer = function (opts) { var self = this if (typeof Server === 'function' /* browser exclude */) { @@ -1145,7 +1180,7 @@ Torrent.prototype._onError = function (err) { } function getPipelineLength (wire, duration) { - return Math.ceil(2 + duration * wire.downloadSpeed() / Storage.BLOCK_LENGTH) + return Math.ceil(2 + duration * wire.downloadSpeed() / Piece.BLOCK_LENGTH) } /** @@ -1154,3 +1189,5 @@ function getPipelineLength (wire, duration) { function randomInt (high) { return Math.random() * high | 0 } + +function noop () {} diff --git a/package.json b/package.json index 0c03eaf..c056981 100644 --- a/package.json +++ b/package.json @@ -11,9 +11,9 @@ "webtorrent": "./bin/cmd.js" }, "browser": { - "./lib/fs-storage": false, "./lib/server": false, "bittorrent-dht/client": false, + "fs-chunk-store": "memory-chunk-store", "load-ip-set": false, "ut_pex": false }, @@ -32,10 +32,13 @@ "dezalgo": "^1.0.1", "end-of-stream": "^1.0.0", "executable": "^1.1.0", + "fs-chunk-store": "^1.3.1", "hat": "0.0.3", + "immediate-chunk-store": "^1.0.7", "inherits": "^2.0.1", "inquirer": "^0.8.0", "load-ip-set": "^1.0.3", + "memory-chunk-store": "^1.1.2", "mime": "^1.2.11", "minimist": "^1.1.0", "mkdirp": "^0.5.0", @@ -44,6 +47,7 @@ "network-address": "^1.0.0", "once": "^1.3.1", "parse-torrent": "^5.1.0", + "path-exists": "^1.0.0", "pretty-bytes": "^2.0.1", "pump": "^1.0.0", "random-access-file": "^0.3.1", @@ -56,6 +60,7 @@ "speedometer": "^0.1.2", "thunky": "^0.1.0", "torrent-discovery": "^3.0.0", + "torrent-piece": "^1.0.0", "uniq": "^1.0.1", "ut_metadata": "^2.1.0", "ut_pex": "^1.0.1", diff --git a/test/download-dht-magnet.js b/test/download-dht-magnet.js index 3c05b30..93e9534 100644 --- a/test/download-dht-magnet.js +++ b/test/download-dht-magnet.js @@ -55,7 +55,7 @@ test('Download using DHT (via magnet uri)', function (t) { maybeDone() }) - torrent.storage.load(fs.createReadStream(leavesPath), function (err) { + torrent.load(fs.createReadStream(leavesPath), function (err) { t.error(err) wroteStorage = true maybeDone() diff --git a/test/download-dht-torrent.js b/test/download-dht-torrent.js index 63f90bd..cda235a 100644 --- a/test/download-dht-torrent.js +++ b/test/download-dht-torrent.js @@ -55,7 +55,7 @@ test('Download using DHT (via .torrent file)', function (t) { maybeDone(null) }) - torrent.storage.load(fs.createReadStream(leavesPath), function (err) { + torrent.load(fs.createReadStream(leavesPath), function (err) { wroteStorage = true maybeDone(err) }) diff --git a/test/download-tracker-magnet.js b/test/download-tracker-magnet.js index 079c162..43ed393 100644 --- a/test/download-tracker-magnet.js +++ b/test/download-tracker-magnet.js @@ -66,7 +66,7 @@ function magnetDownloadTest (t, serverType) { t.deepEqual(torrent.files.map(function (file) { return file.name }), names) - torrent.storage.load(fs.createReadStream(leavesPath), function (err) { + torrent.load(fs.createReadStream(leavesPath), function (err) { cb(err, client1) }) }) diff --git a/test/download-tracker-torrent.js b/test/download-tracker-torrent.js index 3c9a8be..c724f0a 100644 --- a/test/download-tracker-torrent.js +++ b/test/download-tracker-torrent.js @@ -66,7 +66,7 @@ function torrentDownloadTest (t, serverType) { t.deepEqual(torrent.files.map(function (file) { return file.name }), names) - torrent.storage.load(fs.createReadStream(leavesPath), function (err) { + torrent.load(fs.createReadStream(leavesPath), function (err) { cb(err, client1) }) }) diff --git a/test/server.js b/test/server.js index 920637a..3a3cd36 100644 --- a/test/server.js +++ b/test/server.js @@ -29,6 +29,6 @@ test('start http server programmatically', function (t) { }) }) torrent.on('ready', function () { - torrent.storage.load(fs.createReadStream(leavesPath)) + torrent.load(fs.createReadStream(leavesPath)) }) }) diff --git a/test/storage.js b/test/storage.js deleted file mode 100644 index 15c3987..0000000 --- a/test/storage.js +++ /dev/null @@ -1,61 +0,0 @@ -var fs = require('fs') -var parseTorrent = require('parse-torrent') -var Storage = require('../lib/storage') -var test = require('tape') - -var torrents = [ 'leaves', 'pride' ].map(function (name) { - var torrent = fs.readFileSync(__dirname + '/torrents/' + name + '.torrent') - - return { - name: name, - torrent: torrent, - parsedTorrent: parseTorrent(torrent) - } -}) - -torrents.forEach(function (torrent) { - test('sanity check backing storage for ' + torrent.name + ' torrent', function (t) { - var parsedTorrent = torrent.parsedTorrent - var storage = new Storage(parsedTorrent) - - t.equal(storage.files.length, parsedTorrent.files.length) - t.equal(storage.pieces.length, parsedTorrent.pieces.length) - - var length = 0 - var pieces = 0 - - storage.pieces.forEach(function (piece) { - t.notOk(piece.verified) - length += piece.length - - // ensure all blocks start out empty - for (var i = 0; i < piece.blocks.length; ++i) { - t.equal(piece.blocks[i], 0) - } - }) - - t.equal(length, parsedTorrent.length) - length = 0 - - storage.files.forEach(function (file) { - t.notOk(file.done) - length += file.length - pieces += file.pieces.length - - t.assert(file.length >= 0) - t.assert(file.pieces.length >= 0) - }) - - t.equal(length, parsedTorrent.length) - - if (parsedTorrent.files.length > 1) { - // if the torrent contains multiple files, the pieces may overlap file boundaries, - // so the aggregate number of file pieces will be at least the number of pieces. - t.assert(pieces >= parsedTorrent.pieces.length) - } else { - t.equal(pieces, parsedTorrent.pieces.length) - } - - t.end() - }) -}) |