module.exports = Storage var BitField = require('bitfield') var BlockStream = require('block-stream') var debug = require('debug')('webtorrent:storage') var dezalgo = require('dezalgo') var eos = require('end-of-stream') var EventEmitter = require('events').EventEmitter var extend = require('extend.js') var FileStream = require('./file-stream') var inherits = require('inherits') var MultiStream = require('multistream') var once = require('once') var sha1 = require('git-sha1') var BLOCK_LENGTH = 16 * 1024 var BLOCK_BLANK = 0 var BLOCK_RESERVED = 1 var BLOCK_WRITTEN = 2 function noop () {} inherits(Piece, EventEmitter) /** * A torrent piece * * @param {number} index piece index * @param {string} hash sha1 hash (hex) for this piece * @param {Buffer|number} buffer backing buffer for this piece or length of piece if the backing buffer is lazy */ function Piece (index, hash, buffer) { var self = this if (!debug.enabled) self.setMaxListeners(0) self.index = index self.hash = hash if (typeof buffer === 'number') { // alloc buffer lazily self.buffer = null self.length = buffer } else { // use buffer provided self.buffer = buffer self.length = buffer.length } self._reset() } Piece.prototype.readBlock = function (offset, length, cb) { var self = this cb = dezalgo(cb) if (!self.buffer || !self._verifyOffset(offset)) { return cb(new Error('invalid block offset ' + offset)) } cb(null, self.buffer.slice(offset, offset + length)) } Piece.prototype.writeBlock = function (offset, buffer, cb) { var self = this cb = dezalgo(cb) if (!self._verifyOffset(offset) || !self._verifyBlock(offset, buffer)) { return cb(new Error('invalid block ' + offset + ':' + buffer.length)) } self._lazyAllocBuffer() var i = offset / BLOCK_LENGTH if (self.blocks[i] === BLOCK_WRITTEN) { return cb(null) } buffer.copy(self.buffer, offset) self.blocks[i] = BLOCK_WRITTEN self.blocksWritten += 1 if (self.blocksWritten === self.blocks.length) { self.verify() } cb(null) } Piece.prototype.reserveBlock = function (endGame) { var self = this var len = self.blocks.length for (var i = 0; i < len; i++) { if ((self.blocks[i] && !endGame) || self.blocks[i] === BLOCK_WRITTEN) { continue } self.blocks[i] = BLOCK_RESERVED return { offset: i * BLOCK_LENGTH, length: (i === len - 1) ? self.length - (i * BLOCK_LENGTH) : BLOCK_LENGTH } } return null } Piece.prototype.cancelBlock = function (offset) { var self = this if (!self.buffer || !self._verifyOffset(offset)) { return false } var i = offset / BLOCK_LENGTH if (self.blocks[i] === BLOCK_RESERVED) { self.blocks[i] = BLOCK_BLANK } return true } Piece.prototype._reset = function () { var self = this self.verified = false self.blocks = new Buffer(Math.ceil(self.length / BLOCK_LENGTH)) self.blocks.fill(0) self.blocksWritten = 0 } Piece.prototype.verify = function (buffer) { var self = this buffer = buffer || self.buffer if (self.verified || !buffer) { return } self.verified = (sha1(buffer) === self.hash) if (self.verified) { self.emit('done') } else { self.emit('warning', new Error('piece ' + self.index + ' failed verification; ' + sha1(buffer) + ' expected ' + self.hash)) self._reset() } } Piece.prototype._verifyOffset = function (offset) { var self = this if (offset % BLOCK_LENGTH === 0) { return true } else { self.emit('warning', new Error('piece ' + self.index + ' invalid offset ' + offset + ' not multiple of ' + BLOCK_LENGTH + ' bytes')) return false } } Piece.prototype._verifyBlock = function (offset, buffer) { var self = this if (buffer.length === BLOCK_LENGTH) { // normal block length return true } else if (buffer.length === self.length - offset && self.length - offset < BLOCK_LENGTH) { // last block in piece is allowed to be less than block length return true } else { self.emit('warning', new Error('piece ' + self.index + ' invalid block of size ' + buffer.length + ' bytes')) return false } } Piece.prototype._lazyAllocBuffer = function () { var self = this if (!self.buffer) { self.buffer = new Buffer(self.length) } } inherits(File, EventEmitter) /** * A torrent file * * @param {Storage} storage Storage container object * @param {Object} file the file object from the parsed torrent * @param {Array.} pieces backing pieces for this file * @param {number} pieceLength the length in bytes of a non-terminal piece */ function File (storage, file, pieces, pieceLength) { var self = this if (!debug.enabled) self.setMaxListeners(0) = storage = self.path = file.path self.length = file.length self.offset = file.offset self.pieces = pieces self.pieceLength = pieceLength self.done = false self.pieces.forEach(function (piece) { piece.on('done', function () { self._checkDone() }) }) // if the file is zero-length, it will be done upon initialization self._checkDone() } /** * Selects the file to be downloaded, but at a lower priority than files with streams. * Useful if you know you need the file at a later stage. */ = function () { var self = this if (self.pieces.length > 0) {'select', self.pieces[0].index, self.pieces[self.pieces.length - 1].index, false) } } /** * Deselects the file, which means it won't be downloaded unless someone creates a stream * for it. */ File.prototype.deselect = function () { var self = this if (self.pieces.length > 0) {'deselect', self.pieces[0].index, self.pieces[self.pieces.length - 1].index, false) } } /** * Create a readable stream to the file. Pieces needed by the stream will be prioritized * highly and fetched from the swarm first. * * @param {Object} opts * @param {number} opts.start stream slice of file, starting from this byte (inclusive) * @param {number} opts.end stream slice of file, ending with this byte (inclusive) * @return {stream.Readable} */ File.prototype.createReadStream = function (opts) { var self = this opts = extend({ pieceLength: self.pieceLength }, opts) var stream = new FileStream(self, opts)'select', stream.startPiece, stream.endPiece, true, stream.notify.bind(stream)) eos(stream, function () {'deselect', stream.startPiece, stream.endPiece, true) }) return stream } /** * @param {function} cb */ File.prototype.getBlobURL = function (cb) { var self = this self.getBuffer(function (err, buf) { if (err) return cb(err) var url = URL.createObjectURL(new Blob([ buf ])) cb(null, url) }) } /** * TODO: detect errors and call callback with error * @param {function} cb */ File.prototype.getBuffer = function (cb) { var self = this var buf = new Buffer(self.length) var start = 0 self.createReadStream() .on('data', function (chunk) { chunk.copy(buf, start) start += chunk.length }) .on('end', function () { cb(null, buf) }) } File.prototype._checkDone = function () { var self = this self.done = self.pieces.every(function (piece) { return piece.verified }) if (self.done) { process.nextTick(function () { self.emit('done') }) } } inherits(Storage, EventEmitter) /** * Storage for a torrent download. Handles the complexities of reading and writing * to pieces and files. * * @param {Object} parsedTorrent * @param {Object} opts */ function Storage (parsedTorrent, opts) { var self = this if (!debug.enabled) self.setMaxListeners(0) opts = opts || {} self.bitfield = new BitField(parsedTorrent.pieces.length) self.done = false self.closed = false self.readonly = true if (!opts.nobuffer) { self.buffer = new Buffer(parsedTorrent.length) } var pieceLength = self.pieceLength = parsedTorrent.pieceLength var lastPieceLength = parsedTorrent.lastPieceLength var numPieces = parsedTorrent.pieces.length self.pieces = (hash, index) { var start = index * pieceLength var end = start + (index === numPieces - 1 ? lastPieceLength : pieceLength) // if we're backed by a buffer, the piece's buffer will reference the same memory. // otherwise, the piece's buffer will be lazily created on demand var buffer = (self.buffer ? self.buffer.slice(start, end) : end - start) var piece = new Piece(index, hash, buffer) piece.on('done', self._onPieceDone.bind(self, piece)) return piece }) self.files = (fileObj) { var start = fileObj.offset var end = start + fileObj.length - 1 var startPiece = start / pieceLength | 0 var endPiece = end / pieceLength | 0 var pieces = self.pieces.slice(startPiece, endPiece + 1) var file = new File(self, fileObj, pieces, pieceLength) file.on('done', self._onFileDone.bind(self, file)) return file }) } Storage.BLOCK_LENGTH = BLOCK_LENGTH Storage.prototype.load = function (streams, cb) { var self = this if (!Array.isArray(streams)) streams = [ streams ] if (!cb) cb = function () {} cb = once(cb) var pieceIndex = 0 ;(new MultiStream(streams)) .pipe(new BlockStream(self.pieceLength, { nopad: true })) .on('data', function (piece) { var index = pieceIndex pieceIndex += 1 var blockIndex = 0 var s = new BlockStream(BLOCK_LENGTH, { nopad: true }) s.on('data', function (block) { var offset = blockIndex * BLOCK_LENGTH blockIndex += 1 self.writeBlock(index, offset, block) }) s.end(piece) }) .on('end', function () { cb(null) }) .on('error', function (err) { cb(err) }) } Object.defineProperty(Storage.prototype, 'downloaded', { get: function () { var self = this return self.pieces.reduce(function (total, piece) { return total + (piece.verified ? piece.length : piece.blocksWritten * BLOCK_LENGTH) }, 0) } }) /** * The number of missing pieces. Used to implement 'end game' mode. */ Object.defineProperty(Storage.prototype, 'numMissing', { get: function () { var self = this var numMissing = self.pieces.length for (var index = 0, len = self.pieces.length; index < len; index++) { numMissing -= self.bitfield.get(index) } return numMissing } }) /** * Reads a block from a piece. * * @param {number} index piece index * @param {number} offset byte offset within piece * @param {number} length length in bytes to read from piece * @param {function} cb */ Storage.prototype.readBlock = function (index, offset, length, cb) { var self = this cb = dezalgo(cb) var piece = self.pieces[index] if (!piece) return cb(new Error('invalid piece index ' + index)) piece.readBlock(offset, length, cb) } /** * Writes a block to a piece. * * @param {number} index piece index * @param {number} offset byte offset within piece * @param {Buffer} buffer buffer to write * @param {function} cb */ Storage.prototype.writeBlock = function (index, offset, buffer, cb) { var self = this if (!cb) cb = noop cb = dezalgo(cb) if (self.readonly) return cb(new Error('cannot write to readonly storage')) var piece = self.pieces[index] if (!piece) return cb(new Error('invalid piece index ' + index)) piece.writeBlock(offset, buffer, cb) } /** * Reads a piece or a range of a piece. * * @param {number} index piece index * @param {Object=} range optional range within piece * @param {number} range.offset byte offset within piece * @param {number} range.length length in bytes to read from piece * @param {function} cb * @param {boolean} force optionally overrides default check preventing reading * from unverified piece */ = function (index, range, cb, force) { var self = this if (typeof range === 'function') { force = cb cb = range range = null } cb = dezalgo(cb) var piece = self.pieces[index] if (!piece) { return cb(new Error('invalid piece index ' + index)) } if (!piece.verified && !force) { return cb(new Error(' called on incomplete piece ' + index)) } var offset = 0 var length = piece.length if (range) { offset = range.offset || 0 length = range.length || length } if (piece.buffer) { // shortcut for piece with static backing buffer return cb(null, piece.buffer.slice(offset, offset + length)) } var blocks = [] function readNextBlock () { if (length <= 0) return cb(null, Buffer.concat(blocks)) var blockOffset = offset var blockLength = Math.min(BLOCK_LENGTH, length) offset += blockLength length -= blockLength self.readBlock(index, blockOffset, blockLength, function (err, block) { if (err) return cb(err) blocks.push(block) readNextBlock() }) } readNextBlock() } /** * Reserves a block from the given piece. * * @param {number} index piece index * @param {Boolean} endGame whether or not end game mode is enabled * * @returns {Object|null} reservation with offset and length or null if failed. */ Storage.prototype.reserveBlock = function (index, endGame) { var self = this var piece = self.pieces[index] if (!piece) return null return piece.reserveBlock(endGame) } /** * Cancels a previous block reservation from the given piece. * * @param {number} index piece index * @param {number} offset byte offset of block in piece * * @returns {Boolean} */ Storage.prototype.cancelBlock = function (index, offset) { var self = this var piece = self.pieces[index] if (!piece) return false return piece.cancelBlock(offset) } /** * Removes and cleans up any backing store for this storage. * @param {function=} cb */ Storage.prototype.remove = function (cb) { if (cb) dezalgo(cb)(null) } /** * Closes the backing store for this storage. * @param {function=} cb */ Storage.prototype.close = function (cb) { var self = this self.closed = true if (cb) dezalgo(cb)(null) } // // HELPER METHODS // Storage.prototype._onPieceDone = function (piece) { var self = this self.bitfield.set(piece.index) debug('piece done ' + piece.index + ' (' + self.numMissing + ' still missing)') self.emit('piece', piece) } Storage.prototype._onFileDone = function (file) { var self = this debug('file done ' + self.emit('file', file) self._checkDone() } Storage.prototype._checkDone = function () { var self = this if (!self.done && self.files.every(function (file) { return file.done })) { self.done = true self.emit('done') } }