diff options
author | Feross Aboukhadijeh <feross@feross.org> | 2014-10-22 08:49:54 +0400 |
---|---|---|
committer | Feross Aboukhadijeh <feross@feross.org> | 2014-10-22 08:49:54 +0400 |
commit | be5f98e6fb7a153b54b8ba9afeed8d7b60f79646 (patch) | |
tree | 650507c14c4269d8ccf8c6e11a723024f7444251 | |
parent | 2a30d436e9a598dbe202cb1ca5a589f31bc42d9f (diff) |
`client.seed` streams into storage
-rw-r--r-- | index.js | 76 | ||||
-rw-r--r-- | lib/storage.js | 21 | ||||
-rw-r--r-- | package.json | 2 | ||||
-rw-r--r-- | test/download.js | 36 |
4 files changed, 55 insertions, 80 deletions
@@ -3,7 +3,6 @@ module.exports = WebTorrent -var blobToBuffer = require('blob-to-buffer') var createTorrent = require('create-torrent') var debug = require('debug')('webtorrent') var DHT = require('bittorrent-dht/client') // browser exclude @@ -190,7 +189,7 @@ WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { * - W3C FileList object (basically an array of `File` objects) * - Array of `File` objects * - * @param {string|File|FileList|Array.<File>|Blob|Array.<Blob>} input + * @param {string|File|FileList|Blob|Buffer|Array.<File|Blob|Buffer>} input * @param {Object} opts * @param {function} onseed */ @@ -201,48 +200,46 @@ WebTorrent.prototype.seed = function (input, opts, onseed) { opts = {} } - // TODO: support `input` as filesystem path string + // TODO: support `input` as string, or array of strings if (typeof FileList !== 'undefined' && input instanceof FileList) input = Array.prototype.slice.call(input) - if (typeof Blob !== 'undefined' && input instanceof Blob) + if (isBlob(input) || Buffer.isBuffer(input)) { input = [ input ] + } - parallel(input.map(function (file) { - return function (cb) { - if (Buffer.isBuffer(file)) cb(null, file) - else blobToBuffer(file, cb) - } - }), function (err, buffers) { + var streams = input.map(function (item) { + if (isBlob(item)) return new FileReadStream(item) + else if (Buffer.isBuffer(item)) { + var s = new stream.PassThrough() + s.end(item) + return s + } else throw new Error('unsupported input type to `seed`') + }) + + var torrent + createTorrent(input, opts, function (err, torrentBuf) { if (err) return self.emit('error', err) - var buffer = Buffer.concat(buffers) - - var torrent - function clientOnSeed (_torrent) { - if (torrent.infoHash === _torrent.infoHash) { - onseed(torrent) - self.removeListener('seed', clientOnSeed) - } - } - if (onseed) self.on('seed', clientOnSeed) - - createTorrent(input, opts, function (err, torrentBuf) { - if (err) return self.emit('error', err) - var parsedTorrent = parseTorrent(torrentBuf) - self.add(torrentBuf, opts, function (_torrent) { - torrent = _torrent - Storage.writeToStorage( - torrent.storage, - buffer, - parsedTorrent.pieceLength, - function (err) { - if (err) return self.emit('error', err) - self.emit('seed', torrent) - }) - }) + var parsedTorrent = parseTorrent(torrentBuf) + self.add(torrentBuf, opts, function (_torrent) { + torrent = _torrent + torrent.storage.load( + streams, + function (err) { + if (err) return self.emit('error', err) + self.emit('seed', torrent) + }) }) }) + + function clientOnSeed (_torrent) { + if (torrent.infoHash === _torrent.infoHash) { + onseed(torrent) + self.removeListener('seed', clientOnSeed) + } + } + if (onseed) self.on('seed', clientOnSeed) } /** @@ -291,3 +288,12 @@ WebTorrent.prototype.destroy = function (cb) { parallel(tasks, cb) } + +/** + * Check if `obj` is a W3C Blob object (which is the superclass of W3C File) + * @param {*} obj + * @return {boolean} + */ +function isBlob (obj) { + return typeof Blob !== 'undefined' && obj instanceof Blob +} diff --git a/lib/storage.js b/lib/storage.js index 45839d4..69f4bd5 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -9,6 +9,7 @@ var EventEmitter = require('events').EventEmitter var extend = require('extend.js') var FileStream = require('./file-stream') var inherits = require('inherits') +var MultiStream = require('multistream') var sha1 = require('git-sha1') var stream = require('stream') @@ -288,7 +289,7 @@ function Storage (parsedTorrent, opts) { self.buffer = new Buffer(parsedTorrent.length) } - var pieceLength = parsedTorrent.pieceLength + var pieceLength = self.pieceLength = parsedTorrent.pieceLength var lastPieceLength = parsedTorrent.lastPieceLength var numPieces = parsedTorrent.pieces.length @@ -321,12 +322,12 @@ function Storage (parsedTorrent, opts) { Storage.BLOCK_LENGTH = BLOCK_LENGTH -Storage.writeToStorage = function (storage, buf, pieceLength, cb) { +Storage.prototype.load = function (streams, cb) { + var self = this + if (!Array.isArray(streams)) streams = [ streams ] var pieceIndex = 0 - var bufStream = new stream.Readable() - bufStream._read = function () {} - bufStream - .pipe(new BlockStream(pieceLength, { nopad: true })) + ;(new MultiStream(streams)) + .pipe(new BlockStream(self.pieceLength, { nopad: true })) .on('data', function (piece) { var index = pieceIndex pieceIndex += 1 @@ -337,10 +338,9 @@ Storage.writeToStorage = function (storage, buf, pieceLength, cb) { var offset = blockIndex * BLOCK_LENGTH blockIndex += 1 - storage.writeBlock(index, offset, block) + self.writeBlock(index, offset, block) }) - s.write(piece) - s.end() + s.end(piece) }) .on('end', function () { cb(null) @@ -348,9 +348,6 @@ Storage.writeToStorage = function (storage, buf, pieceLength, cb) { .on('error', function (err) { cb(err) }) - - bufStream.push(buf) - bufStream.push(null) } Object.defineProperty(Storage.prototype, 'downloaded', { diff --git a/package.json b/package.json index 7873046..ead05d7 100644 --- a/package.json +++ b/package.json @@ -29,7 +29,6 @@ "bitfield": "^1.0.2", "bittorrent-dht": "^2.6.1", "bittorrent-swarm": "0.x", - "blob-to-buffer": "^1.1.0", "block-stream": "0.0.7", "chromecast-js": "^0.1.4", "clivas": "^0.1.4", @@ -48,6 +47,7 @@ "minimist": "^1.1.0", "mkdirp": "^0.5.0", "moment": "^2.8.3", + "multistream": "^1.4.2", "network-address": "0.0.4", "nodebmc": "^0.0.3", "once": "^1.3.1", diff --git a/test/download.js b/test/download.js index 339d674..9ec7e03 100644 --- a/test/download.js +++ b/test/download.js @@ -5,40 +5,12 @@ var DHT = require('bittorrent-dht/client') var fs = require('fs') var parseTorrent = require('parse-torrent') var test = require('tape') -var TrackerServer = require('bittorrent-tracker').Server +var TrackerServer = require('bittorrent-tracker/server') var leavesFile = __dirname + '/torrents/Leaves of Grass by Walt Whitman.epub' var leavesTorrent = fs.readFileSync(__dirname + '/torrents/leaves.torrent') var leavesParsed = parseTorrent(leavesTorrent) -var BLOCK_LENGTH = 16 * 1024 -function writeToStorage (storage, file, cb) { - var pieceIndex = 0 - fs.createReadStream(file) - .pipe(new BlockStream(leavesParsed.pieceLength, { nopad: true })) - .on('data', function (piece) { - var index = pieceIndex - pieceIndex += 1 - - var blockIndex = 0 - var s = new BlockStream(BLOCK_LENGTH, { nopad: true }) - s.on('data', function (block) { - var offset = blockIndex * BLOCK_LENGTH - blockIndex += 1 - - storage.writeBlock(index, offset, block) - }) - s.write(piece) - s.end() - }) - .on('end', function () { - cb(null) - }) - .on('error', function (err) { - cb(err) - }) -} - function downloadTrackerTest (t, serverType) { t.plan(8) @@ -87,7 +59,7 @@ function downloadTrackerTest (t, serverType) { t.deepEqual(torrent.files.map(function (file) { return file.name }), names) - writeToStorage(torrent.storage, leavesFile, function (err) { + torrent.storage.load(fs.createReadStream(leavesFile), function (err) { cb(err, client1) }) }) @@ -178,7 +150,7 @@ test('Simple download using a tracker (only) via a magnet uri', function (t) { t.deepEqual(torrent.files.map(function (file) { return file.name }), names) - writeToStorage(torrent.storage, leavesFile, function (err) { + torrent.storage.load(fs.createReadStream(leavesFile), function (err) { cb(err, client1) }) }) @@ -264,7 +236,7 @@ test('Simple download using DHT', function (t) { maybeDone(null) }) - writeToStorage(torrent.storage, leavesFile, function (err) { + torrent.storage.load(fs.createReadStream(leavesFile), function (err) { wroteStorage = true maybeDone(err) }) |