module.exports = exports = Storage var BitField = require('bitfield') var BlockStream = require('block-stream2') var debug = require('debug')('webtorrent:storage') var dezalgo = require('dezalgo') var eos = require('end-of-stream') var EventEmitter = require('events').EventEmitter var FileStream = require('./file-stream') var inherits = require('inherits') var mime = require('./mime.json') var MultiStream = require('multistream') var once = require('once') var path = require('path') var sha1 = require('simple-sha1') var BLOCK_LENGTH = 16 * 1024 var BLOCK_BLANK = exports.BLOCK_BLANK = 0 var BLOCK_RESERVED = exports.BLOCK_RESERVED = 1 var BLOCK_WRITTEN = exports.BLOCK_WRITTEN = 2 function noop () {} inherits(Piece, EventEmitter) /** * A torrent piece * * @param {number} index piece index * @param {string} hash sha1 hash (hex) for this piece * @param {Buffer|number} buffer backing buffer, or piece length if backing buffer is lazy * @param {boolean=} noVerify skip piece verification (used when seeding a new file) */ function Piece (index, hash, buffer, noVerify) { var self = this EventEmitter.call(self) if (!debug.enabled) self.setMaxListeners(0) self.index = index self.hash = hash self.noVerify = !!noVerify if (typeof buffer === 'number') { // alloc buffer lazily self.buffer = null self.length = buffer } else { // use buffer provided self.buffer = buffer self.length = buffer.length } self._reset() } Piece.prototype.readBlock = function (offset, length, cb) { var self = this cb = dezalgo(cb) if (!self.buffer || !self._verifyOffset(offset)) { return cb(new Error('invalid block offset ' + offset)) } cb(null, self.buffer.slice(offset, offset + length)) } Piece.prototype.writeBlock = function (offset, buffer, cb) { var self = this cb = dezalgo(cb) if (!self._verifyOffset(offset) || !self._verifyBlock(offset, buffer)) { return cb(new Error('invalid block ' + offset + ':' + buffer.length)) } self._lazyAllocBuffer() var i = offset / BLOCK_LENGTH if (self.blocks[i] === BLOCK_WRITTEN) { return cb(null) } buffer.copy(self.buffer, offset) self.blocks[i] = BLOCK_WRITTEN self.blocksWritten += 1 if (self.blocksWritten === self.blocks.length) { self.verify() } cb(null) } Piece.prototype.reserveBlock = function (endGame) { var self = this var len = self.blocks.length for (var i = 0; i < len; i++) { if ((self.blocks[i] && !endGame) || self.blocks[i] === BLOCK_WRITTEN) { continue } self.blocks[i] = BLOCK_RESERVED return { offset: i * BLOCK_LENGTH, length: (i === len - 1) ? self.length - (i * BLOCK_LENGTH) : BLOCK_LENGTH } } return null } Piece.prototype.cancelBlock = function (offset) { var self = this if (!self._verifyOffset(offset)) { return false } var i = offset / BLOCK_LENGTH if (self.blocks[i] === BLOCK_RESERVED) { self.blocks[i] = BLOCK_BLANK } return true } Piece.prototype._reset = function () { var self = this self.verified = false self.blocks = new Buffer(Math.ceil(self.length / BLOCK_LENGTH)) self.blocks.fill(0) self.blocksWritten = 0 } Piece.prototype.verify = function (buffer) { var self = this buffer = buffer || self.buffer if (self.verified || !buffer) { return } if (self.noVerify) { self.verified = true onResult() return } sha1(buffer, function (expectedHash) { self.verified = (expectedHash === self.hash) onResult() }) function onResult () { if (self.verified) { self.emit('done') } else { self.emit('warning', new Error('piece ' + self.index + ' failed verification')) self._reset() } } } Piece.prototype._verifyOffset = function (offset) { var self = this if (offset % BLOCK_LENGTH === 0) { return true } else { self.emit( 'warning', new Error('invalid block offset ' + offset + ', not multiple of ' + BLOCK_LENGTH) ) return false } } Piece.prototype._verifyBlock = function (offset, buffer) { var self = this if (buffer.length === BLOCK_LENGTH) { // normal block length return true } else if (buffer.length === self.length - offset && self.length - offset < BLOCK_LENGTH) { // last block in piece is allowed to be less than block length return true } else { self.emit('warning', new Error('invalid block size ' + buffer.length)) return false } } Piece.prototype._lazyAllocBuffer = function () { var self = this if (!self.buffer) { self.buffer = new Buffer(self.length) } } inherits(File, EventEmitter) /** * A torrent file * * @param {Storage} storage Storage container object * @param {Object} file the file object from the parsed torrent * @param {Array.} pieces backing pieces for this file * @param {number} pieceLength the length in bytes of a non-terminal piece */ function File (storage, file, pieces, pieceLength) { var self = this EventEmitter.call(self) if (!debug.enabled) self.setMaxListeners(0) self.storage = storage self.name = file.name self.path = file.path self.length = file.length self.offset = file.offset self.pieces = pieces self.pieceLength = pieceLength self.done = false self.pieces.forEach(function (piece) { piece.on('done', function () { self._checkDone() }) }) // if the file is zero-length, it will be done upon initialization self._checkDone() } /** * Selects the file to be downloaded, but at a lower priority than files with streams. * Useful if you know you need the file at a later stage. */ File.prototype.select = function () { var self = this if (self.pieces.length > 0) { var start = self.pieces[0].index var end = self.pieces[self.pieces.length - 1].index self.storage.emit('select', start, end, false) } } /** * Deselects the file, which means it won't be downloaded unless someone creates a stream * for it. */ File.prototype.deselect = function () { var self = this if (self.pieces.length > 0) { var start = self.pieces[0].index var end = self.pieces[self.pieces.length - 1].index self.storage.emit('deselect', start, end, false) } } /** * Create a readable stream to the file. Pieces needed by the stream will be prioritized * highly and fetched from the swarm first. * * @param {Object} opts * @param {number} opts.start stream slice of file, starting from this byte (inclusive) * @param {number} opts.end stream slice of file, ending with this byte (inclusive) * @return {stream.Readable} */ File.prototype.createReadStream = function (opts) { var self = this if (!opts) opts = {} if (opts.pieceLength == null) opts.pieceLength = self.pieceLength var stream = new FileStream(self, opts) self.storage.emit('select', stream.startPiece, stream.endPiece, true, stream.notify.bind(stream)) eos(stream, function () { self.storage.emit('deselect', stream.startPiece, stream.endPiece, true) }) return stream } /** * Note: This function is async to support different types of (async) storage backends in * the future. * @param {function} cb */ File.prototype.getBlobURL = function (cb) { var self = this if (typeof window === 'undefined') throw new Error('browser-only method') self.getBuffer(function (err, buffer) { if (err) return cb(err) var type = mime[path.extname(self.name).toLowerCase()] var blob = type ? new window.Blob([ buffer ], { type: type }) : new window.Blob([ buffer ]) var url = window.URL.createObjectURL(blob) cb(null, url) }) } /** * @param {function} cb */ File.prototype.getBuffer = function (cb) { var self = this cb = dezalgo(once(cb)) var buffer if (self.storage.buffer) { // Use the in-memory buffer (when possible) for better memory utilization var onDone = function () { buffer = self.storage.buffer.slice(self.offset, self.offset + self.length) cb(null, buffer) } if (self.done) onDone() else self.once('done', onDone) } else { buffer = new Buffer(self.length) var start = 0 self.createReadStream() .on('data', function (chunk) { chunk.copy(buffer, start) start += chunk.length }) .on('end', function () { cb(null, buffer) }) .on('error', cb) } } File.prototype._checkDone = function () { var self = this self.done = self.pieces.every(function (piece) { return piece.verified }) if (self.done) { process.nextTick(function () { self.emit('done') }) } } inherits(Storage, EventEmitter) /** * Storage for a torrent download. Handles the complexities of reading and writing * to pieces and files. * * @param {Object} parsedTorrent * @param {Object} opts */ function Storage (parsedTorrent, opts) { var self = this EventEmitter.call(self) if (!debug.enabled) self.setMaxListeners(0) if (!opts) opts = {} self.bitfield = new BitField(parsedTorrent.pieces.length) self.done = false self.closed = false self.readonly = true if (!opts.nobuffer) { self.buffer = new Buffer(parsedTorrent.length) } var pieceLength = self.pieceLength = parsedTorrent.pieceLength var lastPieceLength = parsedTorrent.lastPieceLength var numPieces = parsedTorrent.pieces.length self.pieces = parsedTorrent.pieces.map(function (hash, index) { var start = index * pieceLength var end = start + (index === numPieces - 1 ? lastPieceLength : pieceLength) // if we're backed by a buffer, the piece's buffer will reference the same memory. // otherwise, the piece's buffer will be lazily created on demand var buffer = (self.buffer ? self.buffer.slice(start, end) : end - start) var piece = new Piece(index, hash, buffer, !!opts.noVerify) piece.on('done', self._onPieceDone.bind(self, piece)) return piece }) self.files = parsedTorrent.files.map(function (fileObj) { var start = fileObj.offset var end = start + fileObj.length - 1 var startPiece = start / pieceLength | 0 var endPiece = end / pieceLength | 0 var pieces = self.pieces.slice(startPiece, endPiece + 1) var file = new File(self, fileObj, pieces, pieceLength) file.on('done', self._onFileDone.bind(self, file)) return file }) } Storage.BLOCK_LENGTH = BLOCK_LENGTH Storage.prototype.load = function (streams, cb) { var self = this if (!Array.isArray(streams)) streams = [ streams ] cb = once(cb || function () {}) var pieceIndex = 0 var multistream = new MultiStream(streams) var blockstream = new BlockStream(self.pieceLength, { zeroPadding: false }) multistream.on('error', onError) self.once('done', onDone) multistream .pipe(blockstream) .on('data', onData) .on('error', onError) function onData (piece) { var index = pieceIndex pieceIndex += 1 var blockIndex = 0 var s = new BlockStream(BLOCK_LENGTH, { zeroPadding: false }) s.on('data', onBlockData) s.on('end', onBlockEnd) function onBlockData (block) { var offset = blockIndex * BLOCK_LENGTH blockIndex += 1 self.writeBlock(index, offset, block) } function onBlockEnd () { blockCleanup() } function blockCleanup () { s.removeListener('data', onBlockData) s.removeListener('end', onBlockEnd) } s.end(piece) } function onError (err) { cleanup() cb(err) } function onDone () { cleanup() cb(null) } function cleanup () { multistream.removeListener('error', onError) blockstream.removeListener('data', onData) blockstream.removeListener('error', onError) self.removeListener('done', onDone) } } Object.defineProperty(Storage.prototype, 'downloaded', { get: function () { var self = this return self.pieces.reduce(function (total, piece) { return total + (piece.verified ? piece.length : piece.blocksWritten * BLOCK_LENGTH) }, 0) } }) /** * The number of missing pieces. Used to implement 'end game' mode. */ Object.defineProperty(Storage.prototype, 'numMissing', { get: function () { var self = this var numMissing = self.pieces.length for (var index = 0, len = self.pieces.length; index < len; index++) { numMissing -= self.bitfield.get(index) } return numMissing } }) /** * Reads a block from a piece. * * @param {number} index piece index * @param {number} offset byte offset within piece * @param {number} length length in bytes to read from piece * @param {function} cb */ Storage.prototype.readBlock = function (index, offset, length, cb) { var self = this cb = dezalgo(cb) var piece = self.pieces[index] if (!piece) return cb(new Error('invalid piece index ' + index)) piece.readBlock(offset, length, cb) } /** * Writes a block to a piece. * * @param {number} index piece index * @param {number} offset byte offset within piece * @param {Buffer} buffer buffer to write * @param {function} cb */ Storage.prototype.writeBlock = function (index, offset, buffer, cb) { var self = this if (!cb) cb = noop cb = dezalgo(cb) if (self.readonly) return cb(new Error('cannot write to readonly storage')) var piece = self.pieces[index] if (!piece) return cb(new Error('invalid piece index ' + index)) piece.writeBlock(offset, buffer, cb) } /** * Reads a piece or a range of a piece. * * @param {number} index piece index * @param {Object=} range optional range within piece * @param {number} range.offset byte offset within piece * @param {number} range.length length in bytes to read from piece * @param {function} cb * @param {boolean} force optionally overrides default check preventing reading * from unverified piece */ Storage.prototype.read = function (index, range, cb, force) { var self = this if (typeof range === 'function') { force = cb cb = range range = null } cb = dezalgo(cb) var piece = self.pieces[index] if (!piece) { return cb(new Error('invalid piece index ' + index)) } if (!piece.verified && !force) { return cb(new Error('Storage.read called on incomplete piece ' + index)) } var offset = 0 var length = piece.length if (range) { offset = range.offset || 0 length = range.length || length } if (piece.buffer) { // shortcut for piece with static backing buffer return cb(null, piece.buffer.slice(offset, offset + length)) } var blocks = [] function readNextBlock () { if (length <= 0) return cb(null, Buffer.concat(blocks)) var blockOffset = offset var blockLength = Math.min(BLOCK_LENGTH, length) offset += blockLength length -= blockLength self.readBlock(index, blockOffset, blockLength, function (err, block) { if (err) return cb(err) blocks.push(block) readNextBlock() }) } readNextBlock() } /** * Reserves a block from the given piece. * * @param {number} index piece index * @param {Boolean} endGame whether or not end game mode is enabled * * @returns {Object|null} reservation with offset and length or null if failed. */ Storage.prototype.reserveBlock = function (index, endGame) { var self = this var piece = self.pieces[index] if (!piece) return null return piece.reserveBlock(endGame) } /** * Cancels a previous block reservation from the given piece. * * @param {number} index piece index * @param {number} offset byte offset of block in piece * * @returns {Boolean} */ Storage.prototype.cancelBlock = function (index, offset) { var self = this var piece = self.pieces[index] if (!piece) return false return piece.cancelBlock(offset) } /** * Removes and cleans up any backing store for this storage. * @param {function=} cb */ Storage.prototype.remove = function (cb) { if (cb) dezalgo(cb)(null) } /** * Closes the backing store for this storage. * @param {function=} cb */ Storage.prototype.close = function (cb) { var self = this self.closed = true if (cb) dezalgo(cb)(null) } // // HELPER METHODS // Storage.prototype._onPieceDone = function (piece) { var self = this self.bitfield.set(piece.index) debug('piece done ' + piece.index + ' (' + self.numMissing + ' still missing)') self.emit('piece', piece) } Storage.prototype._onFileDone = function (file) { var self = this debug('file done ' + file.name) self.emit('file', file) self._checkDone() } Storage.prototype._checkDone = function () { var self = this if (!self.done && self.files.every(function (file) { return file.done })) { self.done = true self.emit('done') } }