Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--README.md10
-rwxr-xr-xbin/cmd.js34
-rw-r--r--index.js63
-rw-r--r--lib/append-to.js4
-rw-r--r--lib/file-stream.js86
-rw-r--r--lib/file.js120
-rw-r--r--lib/fs-storage.js211
-rw-r--r--lib/load-chunk-store.js61
-rw-r--r--lib/storage.js683
-rw-r--r--lib/torrent.js551
-rw-r--r--package.json7
-rw-r--r--test/download-dht-magnet.js2
-rw-r--r--test/download-dht-torrent.js2
-rw-r--r--test/download-tracker-magnet.js2
-rw-r--r--test/download-tracker-torrent.js2
-rw-r--r--test/server.js2
-rw-r--r--test/storage.js61
17 files changed, 567 insertions, 1334 deletions
diff --git a/README.md b/README.md
index 9e7d348..94ab6af 100644
--- a/README.md
+++ b/README.md
@@ -230,9 +230,8 @@ If `opts` is specified, then the default options (shown below) will be overridde
nodeId: String|Buffer, // DHT protocol node ID (default=randomly generated)
peerId: String|Buffer, // Wire protocol peer ID (default=randomly generated)
rtcConfig: Object, // RTCPeerConnection configuration object (default=STUN only)
- storage: Function, // custom storage engine, or `false` to use in-memory engine
tracker: Boolean, // Whether or not to enable trackers (default=true)
- wrtc: {} // custom webrtc implementation (in node, specify the [wrtc](https://www.npmjs.com/package/wrtc) package)
+ wrtc: {} // Custom webrtc implementation (in node, specify the [wrtc](https://www.npmjs.com/package/wrtc) package)
}
```
@@ -253,9 +252,10 @@ If `opts` is specified, then the default options (shown below) will be overridde
```js
{
- announce: [], // List of additional trackers to use (added to list in .torrent or magnet uri)
- path: String, // Folder where files will be downloaded (default=`/tmp/webtorrent/`)
- verify: Boolean // Verify previously stored data before starting (default=false)
+ announce: [], // List of additional trackers to use (added to list in .torrent or magnet uri)
+ path: String, // Folder where files will be downloaded (default=`/tmp/webtorrent/`)
+ storage: Function, // Custom storage engine (must follow `abstract-chunk-store` API)
+ verify: Boolean // Verify previously stored data before starting (default=false)
}
```
diff --git a/bin/cmd.js b/bin/cmd.js
index 5cc5b96..2ca529d 100755
--- a/bin/cmd.js
+++ b/bin/cmd.js
@@ -12,7 +12,6 @@ var networkAddress = require('network-address')
var parseTorrent = require('parse-torrent')
var path = require('path')
var prettyBytes = require('pretty-bytes')
-var Storage = require('../lib/storage')
var WebTorrent = require('../')
var zeroFill = require('zero-fill')
@@ -511,7 +510,7 @@ function drawTorrent (torrent) {
linesRemaining -= 1
}
- var seeding = torrent.storage.done
+ var seeding = torrent.done
if (!seeding) clivas.line('')
clivas.line(
@@ -540,34 +539,29 @@ function drawTorrent (torrent) {
linesRemaining -= 5
if (argv.verbose) {
- var pieces = torrent.storage.pieces
- var memoryUsage = 0
+ var pieces = torrent.pieces
for (var i = 0; i < pieces.length; i++) {
var piece = pieces[i]
- if (piece.buffer) memoryUsage += piece.buffer.length
if (piece.verified || (piece.blocksWritten === 0 && !piece.blocks[0])) continue
var bar = ''
for (var j = 0; j < piece.blocks.length; j++) {
- switch (piece.blocks[j]) {
- case Storage.BLOCK_BLANK:
- bar += '{red:█}'
- break
- case Storage.BLOCK_RESERVED:
- bar += '{blue:█}'
- break
- case Storage.BLOCK_WRITTEN:
- bar += '{green:█}'
- break
- }
+ // switch (piece.blocks[j]) {
+ // case Storage.BLOCK_BLANK:
+ // bar += '{red:█}'
+ // break
+ // case Storage.BLOCK_RESERVED:
+ // bar += '{blue:█}'
+ // break
+ // case Storage.BLOCK_WRITTEN:
+ // bar += '{green:█}'
+ // break
+ // }
}
clivas.line('{4+cyan:' + i + '} ' + bar)
linesRemaining -= 1
}
- clivas.line(
- '{red:memory usage:} {bold:' + prettyBytes(memoryUsage) + '}'
- )
clivas.line('{80:}')
- linesRemaining -= 2
+ linesRemaining -= 1
}
torrent.swarm.wires.every(function (wire) {
diff --git a/index.js b/index.js
index cd8f7d7..ad96320 100644
--- a/index.js
+++ b/index.js
@@ -14,8 +14,6 @@ var speedometer = require('speedometer')
var zeroFill = require('zero-fill')
var path = require('path')
-var FSStorage = require('./lib/fs-storage') // browser exclude
-var Storage = require('./lib/storage')
var Torrent = require('./lib/torrent')
inherits(WebTorrent, EventEmitter)
@@ -53,12 +51,6 @@ function WebTorrent (opts) {
self.downloadSpeed = speedometer()
self.uploadSpeed = speedometer()
- self.storage = typeof opts.storage === 'function'
- ? opts.storage
- : (opts.storage !== false && typeof FSStorage === 'function' /* browser exclude */)
- ? FSStorage
- : Storage
-
self.peerId = opts.peerId === undefined
? new Buffer('-WW' + VERSION_STR + '-' + hat(48), 'utf8')
: typeof opts.peerId === 'string'
@@ -85,7 +77,7 @@ function WebTorrent (opts) {
loadIPSet(opts.blocklist, {
headers: { 'user-agent': 'WebTorrent (http://webtorrent.io)' }
}, function (err, ipSet) {
- if (err) return self.error('failed to load blocklist: ' + err.message)
+ if (err) return self.error('Failed to load blocklist: ' + err.message)
self.blocked = ipSet
ready()
})
@@ -126,13 +118,13 @@ Object.defineProperty(WebTorrent.prototype, 'ratio', {
WebTorrent.prototype.get = function (torrentId) {
var self = this
if (torrentId instanceof Torrent) return torrentId
+
var parsed
- try {
- parsed = parseTorrent(torrentId)
- } catch (err) {
- return null
- }
+ try { parsed = parseTorrent(torrentId) } catch (err) {}
+
+ if (!parsed) return null
if (!parsed.infoHash) throw new Error('Invalid torrent identifier')
+
for (var i = 0, len = self.torrents.length; i < len; i++) {
var torrent = self.torrents[i]
if (torrent.infoHash === parsed.infoHash) return torrent
@@ -150,19 +142,16 @@ WebTorrent.prototype.add =
WebTorrent.prototype.download = function (torrentId, opts, ontorrent) {
var self = this
if (self.destroyed) throw new Error('client is destroyed')
+ if (typeof opts === 'function') return self.add(torrentId, null, opts)
debug('add')
- if (typeof opts === 'function') {
- ontorrent = opts
- opts = {}
- }
if (!opts) opts = {}
- if (!opts.storage) opts.storage = self.storage
+
opts.client = self
var torrent = self.get(torrentId)
function _ontorrent () {
- debug('on torrent')
+ debug('on torrent %s', torrent.infoHash)
if (typeof ontorrent === 'function') ontorrent(torrent)
}
@@ -194,29 +183,24 @@ WebTorrent.prototype.download = function (torrentId, opts, ontorrent) {
/**
* Start seeding a new file/folder.
* @param {string|File|FileList|Buffer|Array.<string|File|Buffer>} input
- * @param {Object} opts
- * @param {function} onseed
+ * @param {Object=} opts
+ * @param {function=} onseed
*/
WebTorrent.prototype.seed = function (input, opts, onseed) {
var self = this
if (self.destroyed) throw new Error('client is destroyed')
+ if (typeof opts === 'function') return self.seed(input, null, opts)
debug('seed')
- if (typeof opts === 'function') {
- onseed = opts
- opts = {}
- }
if (!opts) opts = {}
- opts.noVerify = true
- opts.createdBy = 'WebTorrent/' + VERSION
- // When seeding from filesystem path, don't perform extra copy to /tmp
- // Issue: https://github.com/feross/webtorrent/issues/357
- if (typeof input === 'string' && !opts.path) opts.path = path.dirname(input)
+ // When seeding from filesystem path, storage should use existing location
+ if (typeof input === 'string') opts.path = path.dirname(input)
+ if (!opts.createdBy) opts.createdBy = 'WebTorrent/' + VERSION
var streams
var torrent = self.add(undefined, opts, function (torrent) {
var tasks = [function (cb) {
- torrent.storage.load(streams, cb)
+ torrent.load(streams, cb)
}]
if (self.dht) {
tasks.push(function (cb) {
@@ -264,9 +248,11 @@ WebTorrent.prototype.seed = function (input, opts, onseed) {
*/
WebTorrent.prototype.remove = function (torrentId, cb) {
var self = this
+ debug('remove')
+
var torrent = self.get(torrentId)
if (!torrent) throw new Error('No torrent with id ' + torrentId)
- debug('remove')
+
self.torrents.splice(self.torrents.indexOf(torrent), 1)
torrent.destroy(cb)
}
@@ -282,20 +268,15 @@ WebTorrent.prototype.address = function () {
*/
WebTorrent.prototype.destroy = function (cb) {
var self = this
+ if (self.destroyed) throw new Error('client already destroyed')
self.destroyed = true
debug('destroy')
var tasks = self.torrents.map(function (torrent) {
- return function (cb) {
- self.remove(torrent, cb)
- }
+ return function (cb) { self.remove(torrent, cb) }
})
- if (self.dht) {
- tasks.push(function (cb) {
- self.dht.destroy(cb)
- })
- }
+ if (self.dht) tasks.push(function (cb) { self.dht.destroy(cb) })
parallel(tasks, cb)
}
diff --git a/lib/append-to.js b/lib/append-to.js
index 2c3e5d5..8d09b0b 100644
--- a/lib/append-to.js
+++ b/lib/append-to.js
@@ -17,7 +17,7 @@ var IFRAME_EXTS = [ '.css', '.html', '.js', '.md', '.pdf', '.txt' ]
var MediaSource = typeof window !== 'undefined' && window.MediaSource
module.exports = function appendTo (file, rootElem, cb) {
- cb = dezalgo(cb || function () {})
+ cb = dezalgo(cb || noop)
var elem
var extname = path.extname(file.name).toLowerCase()
var currentTime = 0
@@ -159,3 +159,5 @@ module.exports = function appendTo (file, rootElem, cb) {
if (cb) cb(err)
}
}
+
+function noop () {}
diff --git a/lib/file-stream.js b/lib/file-stream.js
index 5713aff..bd5f384 100644
--- a/lib/file-stream.js
+++ b/lib/file-stream.js
@@ -7,71 +7,58 @@ var stream = require('stream')
inherits(FileStream, stream.Readable)
/**
- * A readable stream of a torrent file.
+ * Readable stream of a torrent file
*
- * @param {Object} file
+ * @param {File} file
+ * @param {Object} opts
* @param {number} opts.start stream slice of file, starting from this byte (inclusive)
* @param {number} opts.end stream slice of file, ending with this byte (inclusive)
- * @param {number} opts.pieceLength length of an individual piece
*/
function FileStream (file, opts) {
- var self = this
- if (!(self instanceof FileStream)) return new FileStream(file, opts)
- stream.Readable.call(self, opts)
- debug('new filestream %s', JSON.stringify(opts))
-
- if (!opts) opts = {}
- if (!opts.start) opts.start = 0
- if (!opts.end) opts.end = file.length - 1
-
- self.destroyed = false
- self.length = opts.end - opts.start + 1
-
- var offset = opts.start + file.offset
- var pieceLength = opts.pieceLength
-
- self.startPiece = offset / pieceLength | 0
- self.endPiece = (opts.end + file.offset) / pieceLength | 0
-
- self._storage = file.storage
- self._piece = self.startPiece
- self._missing = self.length
- self._reading = false
- self._notifying = false
- self._criticalLength = Math.min((1024 * 1024 / pieceLength) | 0, 2)
- self._offset = offset - (self.startPiece * pieceLength)
+ stream.Readable.call(this, opts)
+
+ this.destroyed = false
+ this._torrent = file._torrent
+
+ var start = (opts && opts.start) || 0
+ var end = (opts && opts.end) || (file.length - 1)
+ var pieceLength = file._torrent.pieceLength
+
+ this._startPiece = (start + file.offset) / pieceLength | 0
+ this._endPiece = (end + file.offset) / pieceLength | 0
+
+ this._piece = this._startPiece
+ this._offset = (start + file.offset) - (this._startPiece * pieceLength)
+
+ this._missing = end - start + 1
+ this._reading = false
+ this._notifying = false
+ this._criticalLength = Math.min((1024 * 1024 / pieceLength) | 0, 2)
}
FileStream.prototype._read = function () {
- var self = this
- debug('_read')
- if (self._reading) return
- self._reading = true
- self.notify()
+ if (this._reading) return
+ this._reading = true
+ this._notify()
}
-FileStream.prototype.notify = function () {
+FileStream.prototype._notify = function () {
var self = this
- debug('notify')
if (!self._reading || self._missing === 0) return
- if (!self._storage.bitfield.get(self._piece)) {
- return self._storage.emit('critical', self._piece, self._piece + self._criticalLength)
+ if (!self._torrent.bitfield.get(self._piece)) {
+ return self._torrent.critical(self._piece, self._piece + self._criticalLength)
}
if (self._notifying) return
self._notifying = true
var p = self._piece
- self._storage.read(self._piece++, function (err, buffer) {
+ self._torrent.storage.get(p, function (err, buffer) {
+ console.log('GOT', p)
self._notifying = false
- if (self.destroyed) return
-
- if (err) {
- self._storage.emit('error', err)
- return self.destroy(err)
- }
-
+ if (self.destroyed) { console.log('destroyed'); return }
+ if (err) return self.destroy(err)
debug('read %s (length %s) (err %s)', p, buffer.length, err && err.message)
if (self._offset) {
@@ -83,6 +70,7 @@ FileStream.prototype.notify = function () {
buffer = buffer.slice(0, self._missing)
}
self._missing -= buffer.length
+ console.log('missing', self._missing)
debug('pushing buffer of length %s', buffer.length)
self._reading = false
@@ -90,11 +78,11 @@ FileStream.prototype.notify = function () {
if (self._missing === 0) self.push(null)
})
+ self._piece += 1
}
FileStream.prototype.destroy = function () {
- var self = this
- if (self.destroyed) return
- self.destroyed = true
- self._storage.emit('deselect', self.startPiece, self.endPiece, true)
+ if (this.destroyed) return
+ this.destroyed = true
+ this._torrent.deselect(this._startPiece, this._endPiece, true)
}
diff --git a/lib/file.js b/lib/file.js
new file mode 100644
index 0000000..d85784c
--- /dev/null
+++ b/lib/file.js
@@ -0,0 +1,120 @@
+module.exports = File
+
+var appendTo = require('./append-to')
+var eos = require('end-of-stream')
+var EventEmitter = require('events').EventEmitter
+var FileStream = require('./file-stream')
+var inherits = require('inherits')
+var mime = require('./mime.json')
+var path = require('path')
+
+inherits(File, EventEmitter)
+
+/**
+ * @param {Torrent} torrent torrent that the file belongs to
+ * @param {Object} file file object from the parsed torrent
+ */
+function File (torrent, file) {
+ EventEmitter.call(this)
+
+ this._torrent = torrent
+
+ this.name = file.name
+ this.path = file.path
+ this.length = file.length
+ this.offset = file.offset
+
+ this.done = (this.length === 0)
+
+ var start = file.offset
+ var end = start + file.length - 1
+
+ this._startPiece = start / this._torrent.pieceLength | 0
+ this._endPiece = end / this._torrent.pieceLength | 0
+}
+
+/**
+ * Selects the file to be downloaded, but at a lower priority than files with streams.
+ * Useful if you know you need the file at a later stage.
+ */
+File.prototype.select = function () {
+ if (this.length === 0) return
+ this._torrent.select(this._startPiece, this._endPiece, false)
+}
+
+/**
+ * Deselects the file, which means it won't be downloaded unless someone creates a stream
+ * for it.
+ */
+File.prototype.deselect = function () {
+ if (this.length === 0) return
+ this._torrent.deselect(this._startPiece, this._endPiece, false)
+}
+
+/**
+ * Create a readable stream to the file. Pieces needed by the stream will be prioritized
+ * highly and fetched from the swarm first.
+ *
+ * @param {Object} opts
+ * @param {number} opts.start stream slice of file, starting from this byte (inclusive)
+ * @param {number} opts.end stream slice of file, ending with this byte (inclusive)
+ * @return {FileStream}
+ */
+File.prototype.createReadStream = function (opts) {
+ var self = this
+ var stream = new FileStream(self, opts)
+ self._torrent.select(stream._startPiece, stream._endPiece, true, function () {
+ stream._notify()
+ })
+ eos(stream, function () {
+ self._torrent.deselect(stream._startPiece, stream._endPiece, true)
+ })
+ return stream
+}
+
+/**
+ * @param {function} cb
+ */
+File.prototype.getBuffer = function (cb) {
+ var buf = new Buffer(this.length)
+ var offset = 0
+ this.createReadStream()
+ .on('data', function (chunk) {
+ console.log('data')
+ chunk.copy(buf, offset)
+ offset += chunk.length
+ })
+ .on('end', function () {
+ console.log('END')
+ cb(null, buf)
+ })
+ .on('error', cb)
+}
+
+/**
+ * @param {function} cb
+ */
+File.prototype.getBlobURL = function (cb) {
+ var self = this
+ if (typeof window === 'undefined') throw new Error('browser-only method')
+
+ self.getBuffer(function (err, buffer) {
+ if (err) return cb(err)
+ var ext = path.extname(self.name).toLowerCase()
+ var type = mime[ext]
+ var blob = new window.Blob([ buffer ], type && { type: type })
+ var url = window.URL.createObjectURL(blob)
+ cb(null, url)
+ })
+}
+
+/**
+ * Show the file in a the browser by appending it to the DOM.
+ * @param {Element|string} elem
+ * @param {function} cb
+ */
+File.prototype.appendTo = function (elem, cb) {
+ if (typeof window === 'undefined') throw new Error('browser-only method')
+ if (typeof elem === 'string') elem = document.querySelector(elem)
+ appendTo(this, elem, cb)
+}
diff --git a/lib/fs-storage.js b/lib/fs-storage.js
deleted file mode 100644
index 1172c14..0000000
--- a/lib/fs-storage.js
+++ /dev/null
@@ -1,211 +0,0 @@
-module.exports = FSStorage
-
-var dezalgo = require('dezalgo')
-var fs = require('fs')
-var inherits = require('inherits')
-var mkdirp = require('mkdirp')
-var os = require('os')
-var path = require('path')
-var raf = require('random-access-file')
-var rimraf = require('rimraf')
-var Storage = require('./storage')
-var thunky = require('thunky')
-
-var TMP = path.join(fs.existsSync('/tmp') ? '/tmp' : os.tmpDir(), 'webtorrent')
-
-inherits(FSStorage, Storage)
-
-/**
- * fs-backed Storage for a torrent download.
- *
- * @param {Object} parsedTorrent
- * @param {Object} opts
- */
-function FSStorage (parsedTorrent, opts) {
- var self = this
-
- self.tmp = opts.tmp || TMP
- self.path = opts.path || path.join(self.tmp, parsedTorrent.infoHash)
-
- self.piecesMap = []
- self.nonExistentError = new Error('Cannot read from non-existent file')
-
- opts.nobuffer = true
- Storage.call(self, parsedTorrent, opts)
-
- self.files.forEach(function (file) {
- var fileStart = file.offset
- var fileEnd = fileStart + file.length
-
- var pieceLength = file.pieceLength
- var filePath = path.join(self.path, file.path)
-
- var openWrite = thunky(function (cb) {
- var fileDir = path.dirname(filePath)
-
- mkdirp(fileDir, function (err) {
- if (err) return cb(err)
- if (self.closed) return cb(new Error('Storage closed'))
-
- var fd = raf(filePath)
- file.fd = fd
- cb(null, fd)
- })
- })
-
- var openRead = thunky(function (cb) {
- // TODO: no need for fs.exists call, just try opening and handle error.
- // fs.exists then open creates opportunity for race condition.
- fs.exists(filePath, function (exists) {
- if (exists) return openWrite(cb)
- cb(self.nonExistentError)
- })
- })
-
- file.pieces.forEach(function (piece) {
- var index = piece.index
-
- var pieceStart = index * pieceLength
- var pieceEnd = pieceStart + piece.length
-
- var from = (fileStart < pieceStart) ? 0 : fileStart - pieceStart
- var to = (fileEnd > pieceEnd) ? pieceLength : fileEnd - pieceStart
- var offset = (fileStart > pieceStart) ? 0 : pieceStart - fileStart
-
- if (!self.piecesMap[index]) self.piecesMap[index] = []
-
- self.piecesMap[index].push({
- from: from,
- to: to,
- offset: offset,
- openWrite: openWrite,
- openRead: openRead
- })
- })
- })
-}
-
-FSStorage.prototype.readBlock = function (index, offset, length, cb) {
- var self = this
- cb = dezalgo(cb)
- var piece = self.pieces[index]
- if (!piece) return cb(new Error('invalid piece index ' + index))
-
- if (piece.verified && piece.buffer) {
- // piece is verified and cached in memory, so read directly from its buffer
- // instead of reading from the filesystem.
- return piece.readBlock(offset, length, cb)
- }
-
- var rangeFrom = offset
- var rangeTo = rangeFrom + length
-
- var targets = self.piecesMap[index].filter(function (target) {
- return (target.to > rangeFrom && target.from < rangeTo)
- })
-
- if (!targets.length) return cb(new Error('no file matching the requested range?'))
-
- var buffers = []
- var end = targets.length
- var i = 0
-
- function readFromNextFile (err, buffer) {
- if (err) return cb(err)
- if (buffer) buffers.push(buffer)
- if (i >= end) return cb(null, Buffer.concat(buffers))
-
- var target = targets[i++]
-
- var from = target.from
- var to = target.to
- var offset = target.offset
-
- if (to > rangeTo) to = rangeTo
- if (from < rangeFrom) {
- offset += (rangeFrom - from)
- from = rangeFrom
- }
-
- target.openRead(function (err, file) {
- if (self.closed) return
- if (err) {
- return err === self.nonExistentError
- ? readFromNextFile(null, new Buffer(0))
- : cb(err)
- }
- file.read(offset, to - from, readFromNextFile)
- })
- }
-
- readFromNextFile()
-}
-
-// flush pieces to file once they're done and verified
-FSStorage.prototype._onPieceDone = function (piece) {
- var self = this
- var targets = self.piecesMap[piece.index]
- var end = targets.length
- var i = 0
-
- function done () {
- Storage.prototype._onPieceDone.call(self, piece)
- }
-
- if (!piece.buffer || self.readonly) return done()
-
- function writeToNextFile (err) {
- if (err) return self.emit('error', err)
- if (i >= end) {
- // piece.buffer = null // TODO: free this memory!
- return done()
- }
-
- var target = targets[i++]
- target.openWrite(function (err, file) {
- if (self.closed) return
- if (err) return self.emit('error', err)
- file.write(target.offset, piece.buffer.slice(target.from, target.to), writeToNextFile)
- })
- }
-
- writeToNextFile()
-}
-
-/**
- * Removes and cleans up any backing store for this storage.
- */
-FSStorage.prototype.remove = function (cb) {
- var self = this
- if (!cb) cb = function () {}
-
- self.close(function (err) {
- if (err) return cb(err)
- var root = self.files[0].path.split(path.sep)[0]
- rimraf(path.join(self.path, root), cb)
- })
-}
-
-/**
- * Closes the backing store for this storage.
- */
-FSStorage.prototype.close = function (cb) {
- var self = this
- if (!cb) cb = function () {}
- if (self.closed) return cb()
-
- Storage.prototype.close.call(self, function (err) {
- if (err) return cb(err)
-
- var i = 0
- function loop (err) {
- if (i >= self.files.length) return cb()
- if (err) return cb(err)
- var next = self.files[i++]
- if (!next || !next.fd) return process.nextTick(loop)
- next.fd.close(loop)
- }
-
- process.nextTick(loop)
- })
-}
diff --git a/lib/load-chunk-store.js b/lib/load-chunk-store.js
new file mode 100644
index 0000000..f935935
--- /dev/null
+++ b/lib/load-chunk-store.js
@@ -0,0 +1,61 @@
+// TODO: publish this as a standalone module
+
+module.exports = loadChunkStore
+
+var BlockStream = require('block-stream2')
+var MultiStream = require('multistream')
+
+function loadChunkStore (streams, store, chunkLength, cb) {
+ if (!Array.isArray(streams)) streams = [ streams ]
+ if (!cb) cb = noop
+
+ var index = 0
+ var outstandingPuts = 0
+ var finished = false
+
+ var multistream = new MultiStream(streams)
+ var blockstream = new BlockStream(chunkLength, { zeroPadding: false })
+
+ multistream
+ .on('error', onError)
+ .pipe(blockstream)
+ .on('data', onData)
+ .on('finish', onFinish)
+ .on('error', onError)
+
+ function onData (chunk) {
+ outstandingPuts += 1
+ store.put(index, chunk, function (err) {
+ if (err) return onError(err)
+ outstandingPuts -= 1
+ maybeDone()
+ })
+ index += 1
+ }
+
+ function onFinish () {
+ finished = true
+ maybeDone()
+ }
+
+ function onError (err) {
+ cleanup()
+ cb(err)
+ }
+
+ function maybeDone () {
+ if (finished && outstandingPuts === 0) {
+ cleanup()
+ cb(null)
+ }
+ }
+
+ function cleanup () {
+ multistream.removeListener('error', onError)
+ blockstream.removeListener('data', onData)
+ blockstream.removeListener('finish', onFinish)
+ blockstream.removeListener('error', onError)
+ }
+}
+
+function noop () {}
diff --git a/lib/storage.js b/lib/storage.js
deleted file mode 100644
index 27444ed..0000000
--- a/lib/storage.js
+++ /dev/null
@@ -1,683 +0,0 @@
-module.exports = exports = Storage
-
-var appendTo = require('./append-to')
-var BitField = require('bitfield')
-var BlockStream = require('block-stream2')
-var debug = require('debug')('webtorrent:storage')
-var dezalgo = require('dezalgo')
-var eos = require('end-of-stream')
-var EventEmitter = require('events').EventEmitter
-var FileStream = require('./file-stream')
-var inherits = require('inherits')
-var mime = require('./mime.json')
-var MultiStream = require('multistream')
-var once = require('once')
-var path = require('path')
-var sha1 = require('simple-sha1')
-
-var BLOCK_LENGTH = 16 * 1024
-
-var BLOCK_BLANK = exports.BLOCK_BLANK = 0
-var BLOCK_RESERVED = exports.BLOCK_RESERVED = 1
-var BLOCK_WRITTEN = exports.BLOCK_WRITTEN = 2
-
-function noop () {}
-
-inherits(Piece, EventEmitter)
-
-/**
- * A torrent piece
- *
- * @param {number} index piece index
- * @param {string} hash sha1 hash (hex) for this piece
- * @param {Buffer|number} buffer backing buffer, or piece length if backing buffer is lazy
- * @param {boolean=} noVerify skip piece verification (used when seeding a new file)
- */
-function Piece (index, hash, buffer, noVerify) {
- var self = this
- EventEmitter.call(self)
- if (!debug.enabled) self.setMaxListeners(0)
-
- self.index = index
- self.hash = hash
- self.noVerify = !!noVerify
-
- if (typeof buffer === 'number') {
- // alloc buffer lazily
- self.buffer = null
- self.length = buffer
- } else {
- // use buffer provided
- self.buffer = buffer
- self.length = buffer.length
- }
-
- self._reset()
-}
-
-Piece.prototype.readBlock = function (offset, length, cb) {
- var self = this
- cb = dezalgo(cb)
- if (!self.buffer || !self._verifyOffset(offset)) {
- return cb(new Error('invalid block offset ' + offset))
- }
- cb(null, self.buffer.slice(offset, offset + length))
-}
-
-Piece.prototype.writeBlock = function (offset, buffer, cb) {
- var self = this
- cb = dezalgo(cb)
- if (!self._verifyOffset(offset) || !self._verifyBlock(offset, buffer)) {
- return cb(new Error('invalid block ' + offset + ':' + buffer.length))
- }
- self._lazyAllocBuffer()
-
- var i = offset / BLOCK_LENGTH
- if (self.blocks[i] === BLOCK_WRITTEN) {
- return cb(null)
- }
-
- buffer.copy(self.buffer, offset)
- self.blocks[i] = BLOCK_WRITTEN
- self.blocksWritten += 1
-
- if (self.blocksWritten === self.blocks.length) {
- self.verify()
- }
-
- cb(null)
-}
-
-Piece.prototype.reserveBlock = function (endGame) {
- var self = this
- var len = self.blocks.length
- for (var i = 0; i < len; i++) {
- if ((self.blocks[i] && !endGame) || self.blocks[i] === BLOCK_WRITTEN) {
- continue
- }
- self.blocks[i] = BLOCK_RESERVED
- return {
- offset: i * BLOCK_LENGTH,
- length: (i === len - 1)
- ? self.length - (i * BLOCK_LENGTH)
- : BLOCK_LENGTH
- }
- }
- return null
-}
-
-Piece.prototype.cancelBlock = function (offset) {
- var self = this
- if (!self._verifyOffset(offset)) {
- return false
- }
-
- var i = offset / BLOCK_LENGTH
- if (self.blocks[i] === BLOCK_RESERVED) {
- self.blocks[i] = BLOCK_BLANK
- }
-
- return true
-}
-
-Piece.prototype._reset = function () {
- var self = this
- self.verified = false
- self.blocks = new Buffer(Math.ceil(self.length / BLOCK_LENGTH))
- if (!process.browser) self.blocks.fill(0)
- self.blocksWritten = 0
-}
-
-Piece.prototype.verify = function (buffer) {
- var self = this
- if (!buffer) buffer = self.buffer
- if (self.verified || !buffer) {
- return
- }
-
- if (self.noVerify) {
- self.verified = true
- onResult()
- return
- }
-
- sha1(buffer, function (expectedHash) {
- self.verified = (expectedHash === self.hash)
- onResult()
- })
-
- function onResult () {
- if (self.verified) {
- self.emit('done')
- } else {
- self.emit('warning', new Error('piece ' + self.index + ' failed verification'))
- self._reset()
- }
- }
-}
-
-Piece.prototype._verifyOffset = function (offset) {
- var self = this
- if (offset % BLOCK_LENGTH === 0) {
- return true
- } else {
- self.emit(
- 'warning',
- new Error('invalid block offset ' + offset + ', not multiple of ' + BLOCK_LENGTH)
- )
- return false
- }
-}
-
-Piece.prototype._verifyBlock = function (offset, buffer) {
- var self = this
- if (buffer.length === BLOCK_LENGTH) {
- // normal block length
- return true
- } else if (buffer.length === self.length - offset &&
- self.length - offset < BLOCK_LENGTH) {
- // last block in piece is allowed to be less than block length
- return true
- } else {
- self.emit('warning', new Error('invalid block size ' + buffer.length))
- return false
- }
-}
-
-Piece.prototype._lazyAllocBuffer = function () {
- var self = this
- if (!self.buffer) self.buffer = new Buffer(self.length)
-}
-
-inherits(File, EventEmitter)
-
-/**
- * A torrent file
- *
- * @param {Storage} storage Storage container object
- * @param {Object} file the file object from the parsed torrent
- * @param {Array.<Piece>} pieces backing pieces for this file
- * @param {number} pieceLength the length in bytes of a non-terminal piece
- */
-function File (storage, file, pieces, pieceLength) {
- var self = this
- EventEmitter.call(self)
- if (!debug.enabled) self.setMaxListeners(0)
-
- self.storage = storage
- self.name = file.name
- self.path = file.path
- self.length = file.length
- self.offset = file.offset
- self.pieces = pieces
- self.pieceLength = pieceLength
- self.done = false
-
- self._blobUrl = null
- self._blobUrlPending = false
-
- self.pieces.forEach(function (piece) {
- piece.on('done', function () {
- self._checkDone()
- })
- })
-
- // if the file is zero-length, it will be done upon initialization
- self._checkDone()
-}
-
-/**
- * Selects the file to be downloaded, but at a lower priority than files with streams.
- * Useful if you know you need the file at a later stage.
- */
-File.prototype.select = function () {
- var self = this
- if (self.pieces.length > 0) {
- var start = self.pieces[0].index
- var end = self.pieces[self.pieces.length - 1].index
- self.storage.emit('select', start, end, false)
- }
-}
-
-/**
- * Deselects the file, which means it won't be downloaded unless someone creates a stream
- * for it.
- */
-File.prototype.deselect = function () {
- var self = this
- if (self.pieces.length > 0) {
- var start = self.pieces[0].index
- var end = self.pieces[self.pieces.length - 1].index
- self.storage.emit('deselect', start, end, false)
- }
-}
-
-/**
- * Create a readable stream to the file. Pieces needed by the stream will be prioritized
- * highly and fetched from the swarm first.
- *
- * @param {Object} opts
- * @param {number} opts.start stream slice of file, starting from this byte (inclusive)
- * @param {number} opts.end stream slice of file, ending with this byte (inclusive)
- * @return {stream.Readable}
- */
-File.prototype.createReadStream = function (opts) {
- var self = this
- if (!opts) opts = {}
- if (opts.pieceLength == null) opts.pieceLength = self.pieceLength
- var stream = new FileStream(self, opts)
- self.storage.emit('select', stream.startPiece, stream.endPiece, true, stream.notify.bind(stream))
- eos(stream, function () {
- self.storage.emit('deselect', stream.startPiece, stream.endPiece, true)
- })
-
- return stream
-}
-
-/**
- * @param {function} cb
- */
-File.prototype.getBuffer = function (cb) {
- var self = this
- cb = dezalgo(once(cb))
-
- var buffer
- if (self.storage.buffer) {
- // Use the in-memory buffer (when possible) for better memory utilization
- var onDone = function () {
- buffer = self.storage.buffer.slice(self.offset, self.offset + self.length)
- cb(null, buffer)
- }
- if (self.done) onDone()
- else self.once('done', onDone)
- } else {
- buffer = new Buffer(self.length)
- var start = 0
- self.createReadStream()
- .on('data', function (chunk) {
- chunk.copy(buffer, start)
- start += chunk.length
- })
- .on('end', function () {
- cb(null, buffer)
- })
- .on('error', cb)
- }
-}
-
-File.prototype.appendTo = function (elem, cb) {
- var self = this
- if (typeof window === 'undefined') throw new Error('browser-only method')
- if (typeof elem === 'string') elem = document.querySelector(elem)
-
- appendTo(self, elem, cb)
-}
-
-/**
- * Note: This function is async to support different types of (async) storage backends in
- * the future.
- * @param {function} cb
- */
-File.prototype.getBlobURL = function (cb) {
- var self = this
- if (typeof window === 'undefined') throw new Error('browser-only method')
- cb = dezalgo(cb)
-
- if (self._blobUrl) return cb(null, self._blobUrl)
- if (self._blobUrlPending) return self.once('_blobUrl', cb)
-
- self._blobUrlPending = true
-
- self.getBuffer(function (err, buffer) {
- self._blobUrlPending = false
- if (err) {
- cb(err)
- self.emit('_blobUrl', err)
- return
- }
-
- var type = mime[path.extname(self.name).toLowerCase()]
- var blob = type
- ? new window.Blob([ buffer ], { type: type })
- : new window.Blob([ buffer ])
- self._blobUrl = window.URL.createObjectURL(blob)
-
- cb(null, self._blobUrl)
- self.emit('_blobUrl', null, self._blobUrl)
- })
-}
-
-File.prototype._checkDone = function () {
- var self = this
- self.done = self.pieces.every(function (piece) {
- return piece.verified
- })
-
- if (self.done) {
- process.nextTick(function () {
- self.emit('done')
- })
- }
-}
-
-inherits(Storage, EventEmitter)
-
-/**
- * Storage for a torrent download. Handles the complexities of reading and writing
- * to pieces and files.
- *
- * @param {Object} parsedTorrent
- * @param {Object} opts
- */
-function Storage (parsedTorrent, opts) {
- var self = this
- EventEmitter.call(self)
- if (!debug.enabled) self.setMaxListeners(0)
- if (!opts) opts = {}
-
- self.bitfield = new BitField(parsedTorrent.pieces.length)
-
- self.done = false
- self.closed = false
- self.readonly = true
-
- if (!opts.nobuffer) {
- self.buffer = new Buffer(parsedTorrent.length)
- }
-
- var pieceLength = self.pieceLength = parsedTorrent.pieceLength
- var lastPieceLength = parsedTorrent.lastPieceLength
- var numPieces = parsedTorrent.pieces.length
-
- self.pieces = parsedTorrent.pieces.map(function (hash, index) {
- var start = index * pieceLength
- var end = start + (index === numPieces - 1 ? lastPieceLength : pieceLength)
-
- // if we're backed by a buffer, the piece's buffer will reference the same memory.
- // otherwise, the piece's buffer will be lazily created on demand
- var buffer = (self.buffer ? self.buffer.slice(start, end) : end - start)
-
- var piece = new Piece(index, hash, buffer, !!opts.noVerify)
- piece.on('done', self._onPieceDone.bind(self, piece))
- return piece
- })
-
- self.files = parsedTorrent.files.map(function (fileObj) {
- var start = fileObj.offset
- var end = start + fileObj.length - 1
-
- var startPiece = start / pieceLength | 0
- var endPiece = end / pieceLength | 0
- var pieces = self.pieces.slice(startPiece, endPiece + 1)
-
- var file = new File(self, fileObj, pieces, pieceLength)
- file.on('done', self._onFileDone.bind(self, file))
- return file
- })
-}
-
-Storage.BLOCK_LENGTH = BLOCK_LENGTH
-
-Storage.prototype.load = function (streams, cb) {
- var self = this
- if (!Array.isArray(streams)) streams = [ streams ]
- cb = once(cb || function () {})
-
- var pieceIndex = 0
- var multistream = new MultiStream(streams)
- var blockstream = new BlockStream(self.pieceLength, { zeroPadding: false })
-
- multistream.on('error', onError)
-
- self.once('done', onDone)
-
- multistream
- .pipe(blockstream)
- .on('data', onData)
- .on('error', onError)
-
- function onData (piece) {
- var index = pieceIndex
- pieceIndex += 1
-
- var blockIndex = 0
- var s = new BlockStream(BLOCK_LENGTH, { zeroPadding: false })
-
- s.on('data', onBlockData)
- s.on('end', onBlockEnd)
-
- function onBlockData (block) {
- var offset = blockIndex * BLOCK_LENGTH
- blockIndex += 1
- self.writeBlock(index, offset, block)
- }
-
- function onBlockEnd () {
- blockCleanup()
- }
-
- function blockCleanup () {
- s.removeListener('data', onBlockData)
- s.removeListener('end', onBlockEnd)
- }
-
- s.end(piece)
- }
-
- function onError (err) {
- cleanup()
- cb(err)
- }
-
- function onDone () {
- cleanup()
- cb(null)
- }
-
- function cleanup () {
- multistream.removeListener('error', onError)
- blockstream.removeListener('data', onData)
- blockstream.removeListener('error', onError)
- self.removeListener('done', onDone)
- }
-}
-
-Object.defineProperty(Storage.prototype, 'downloaded', {
- get: function () {
- var self = this
- return self.pieces.reduce(function (total, piece) {
- return total + (piece.verified ? piece.length : piece.blocksWritten * BLOCK_LENGTH)
- }, 0)
- }
-})
-
-/**
- * The number of missing pieces. Used to implement 'end game' mode.
- */
-Object.defineProperty(Storage.prototype, 'numMissing', {
- get: function () {
- var self = this
- var numMissing = self.pieces.length
- for (var index = 0, len = self.pieces.length; index < len; index++) {
- numMissing -= self.bitfield.get(index)
- }
- return numMissing
- }
-})
-
-/**
- * Reads a block from a piece.
- *
- * @param {number} index piece index
- * @param {number} offset byte offset within piece
- * @param {number} length length in bytes to read from piece
- * @param {function} cb
- */
-Storage.prototype.readBlock = function (index, offset, length, cb) {
- var self = this
- cb = dezalgo(cb)
- var piece = self.pieces[index]
- if (!piece) return cb(new Error('invalid piece index ' + index))
- piece.readBlock(offset, length, cb)
-}
-
-/**
- * Writes a block to a piece.
- *
- * @param {number} index piece index
- * @param {number} offset byte offset within piece
- * @param {Buffer} buffer buffer to write
- * @param {function} cb
- */
-Storage.prototype.writeBlock = function (index, offset, buffer, cb) {
- var self = this
- if (!cb) cb = noop
- cb = dezalgo(cb)
-
- if (self.readonly) return cb(new Error('cannot write to readonly storage'))
- var piece = self.pieces[index]
- if (!piece) return cb(new Error('invalid piece index ' + index))
- piece.writeBlock(offset, buffer, cb)
-}
-
-/**
- * Reads a piece or a range of a piece.
- *
- * @param {number} index piece index
- * @param {Object=} range optional range within piece
- * @param {number} range.offset byte offset within piece
- * @param {number} range.length length in bytes to read from piece
- * @param {function} cb
- * @param {boolean} force optionally overrides default check preventing reading
- * from unverified piece
- */
-Storage.prototype.read = function (index, range, cb, force) {
- var self = this
-
- if (typeof range === 'function') {
- force = cb
- cb = range
- range = null
- }
- cb = dezalgo(cb)
-
- var piece = self.pieces[index]
- if (!piece) {
- return cb(new Error('invalid piece index ' + index))
- }
-
- if (!piece.verified && !force) {
- return cb(new Error('Storage.read called on incomplete piece ' + index))
- }
-
- var offset = 0
- var length = piece.length
-
- if (range) {
- offset = range.offset || 0
- length = range.length || length
- }
-
- if (piece.buffer) {
- // shortcut for piece with static backing buffer
- return cb(null, piece.buffer.slice(offset, offset + length))
- }
-
- var blocks = []
- function readNextBlock () {
- if (length <= 0) return cb(null, Buffer.concat(blocks))
-
- var blockOffset = offset
- var blockLength = Math.min(BLOCK_LENGTH, length)
-
- offset += blockLength
- length -= blockLength
-
- self.readBlock(index, blockOffset, blockLength, function (err, block) {
- if (err) return cb(err)
-
- blocks.push(block)
- readNextBlock()
- })
- }
-
- readNextBlock()
-}
-
-/**
- * Reserves a block from the given piece.
- *
- * @param {number} index piece index
- * @param {Boolean} endGame whether or not end game mode is enabled
- *
- * @returns {Object|null} reservation with offset and length or null if failed.
- */
-Storage.prototype.reserveBlock = function (index, endGame) {
- var self = this
- var piece = self.pieces[index]
- if (!piece) return null
-
- return piece.reserveBlock(endGame)
-}
-
-/**
- * Cancels a previous block reservation from the given piece.
- *
- * @param {number} index piece index
- * @param {number} offset byte offset of block in piece
- *
- * @returns {Boolean}
- */
-Storage.prototype.cancelBlock = function (index, offset) {
- var self = this
- var piece = self.pieces[index]
- if (!piece) return false
-
- return piece.cancelBlock(offset)
-}
-
-/**
- * Removes and cleans up any backing store for this storage.
- * @param {function=} cb
- */
-Storage.prototype.remove = function (cb) {
- if (cb) dezalgo(cb)(null)
-}
-
-/**
- * Closes the backing store for this storage.
- * @param {function=} cb
- */
-Storage.prototype.close = function (cb) {
- var self = this
- self.closed = true
- if (cb) dezalgo(cb)(null)
-}
-
-//
-// HELPER METHODS
-//
-
-Storage.prototype._onPieceDone = function (piece) {
- var self = this
- self.bitfield.set(piece.index)
- debug('piece done ' + piece.index + ' (' + self.numMissing + ' still missing)')
- self.emit('piece', piece)
-}
-
-Storage.prototype._onFileDone = function (file) {
- var self = this
- debug('file done ' + file.name)
- self.emit('file', file)
-
- self._checkDone()
-}
-
-Storage.prototype._checkDone = function () {
- var self = this
-
- if (!self.done && self.files.every(function (file) { return file.done })) {
- self.done = true
- self.emit('done')
- }
-}
diff --git a/lib/torrent.js b/lib/torrent.js
index eff6433..8d5b8f3 100644
--- a/lib/torrent.js
+++ b/lib/torrent.js
@@ -1,28 +1,38 @@
module.exports = Torrent
var addrToIPPort = require('addr-to-ip-port') // browser exclude
+var BitField = require('bitfield')
var createTorrent = require('create-torrent')
var debug = require('debug')('webtorrent:torrent')
var Discovery = require('torrent-discovery')
var EventEmitter = require('events').EventEmitter
+var extend = require('xtend/mutable')
+var FSChunkStore = require('fs-chunk-store')
+var ImmediateChunkStore = require('immediate-chunk-store')
var inherits = require('inherits')
+var os = require('os')
var parallel = require('run-parallel')
var parseTorrent = require('parse-torrent')
+var path = require('path')
+var pathExists = require('path-exists')
+var Piece = require('torrent-piece')
var randomIterate = require('random-iterate')
var reemit = require('re-emitter')
+var sha1 = require('simple-sha1')
var Swarm = require('bittorrent-swarm')
var uniq = require('uniq')
var ut_metadata = require('ut_metadata')
var ut_pex = require('ut_pex') // browser exclude
+var File = require('./file')
+var loadChunkStore = require('./load-chunk-store')
var RarityMap = require('./rarity-map')
var Server = require('./server') // browser exclude
-var Storage = require('./storage')
var MAX_BLOCK_LENGTH = 128 * 1024
var PIECE_TIMEOUT = 30000
var CHOKE_TIMEOUT = 5000
-var SPEED_THRESHOLD = 3 * Storage.BLOCK_LENGTH
+var SPEED_THRESHOLD = 3 * Piece.BLOCK_LENGTH
var PIPELINE_MIN_DURATION = 0.5
var PIPELINE_MAX_DURATION = 1
@@ -30,13 +40,11 @@ var PIPELINE_MAX_DURATION = 1
var RECHOKE_INTERVAL = 10000 // 10 seconds
var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds
-function noop () {}
+var TMP = path.join(pathExists.sync('/tmp') ? '/tmp' : os.tmpDir(), 'webtorrent')
inherits(Torrent, EventEmitter)
/**
- * A torrent
- *
* @param {string|Buffer|Object} torrentId
* @param {Object} opts
*/
@@ -46,14 +54,14 @@ function Torrent (torrentId, opts) {
if (!debug.enabled) self.setMaxListeners(0)
debug('new torrent')
- self.opts = opts
self.client = opts.client
- self.hotswapEnabled = ('hotswap' in opts ? opts.hotswap : true)
- self.verify = opts.verify
+ self.announce = opts.announce
+ self.urlList = opts.urlList
+
+ self._path = opts.path
+ self._storage = opts.storage || FSChunkStore
- self.chokeTimeout = opts.chokeTimeout || CHOKE_TIMEOUT
- self.pieceTimeout = opts.pieceTimeout || PIECE_TIMEOUT
self.strategy = opts.strategy || 'sequential'
self._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0)
@@ -63,32 +71,24 @@ function Torrent (torrentId, opts) {
self._rechokeOptimisticTime = 0
self._rechokeIntervalId = null
- self.infoHash = null
self.ready = false
self.destroyed = false
- self.files = []
self.metadata = null
- self.parsedTorrent = null
self.storage = null
self.numBlockedPeers = 0
+ self.files = null
+
self._amInterested = false
self._selections = []
self._critical = []
- self._storageImpl = opts.storage || Storage
- this._torrentFileURL = null
+
+ // for cleanup
self._servers = []
if (torrentId) self._onTorrentId(torrentId)
}
-// torrent size (in bytes)
-Object.defineProperty(Torrent.prototype, 'length', {
- get: function () {
- return (this.parsedTorrent && this.parsedTorrent.length) || 0
- }
-})
-
-// time remaining (in milliseconds)
+// Time remaining (in milliseconds)
Object.defineProperty(Torrent.prototype, 'timeRemaining', {
get: function () {
if (this.swarm.downloadSpeed() === 0) return Infinity
@@ -96,60 +96,62 @@ Object.defineProperty(Torrent.prototype, 'timeRemaining', {
}
})
-// percentage complete, represented as a number between 0 and 1
-Object.defineProperty(Torrent.prototype, 'progress', {
- get: function () {
- return (this.parsedTorrent && (this.downloaded / this.parsedTorrent.length)) || 0
- }
-})
-
-// bytes downloaded (not necessarily verified)
+// Bytes downloaded
Object.defineProperty(Torrent.prototype, 'downloaded', {
- get: function () {
- return (this.storage && this.storage.downloaded) || 0
- }
+ get: function () { return this.swarm ? this.swarm.downloaded : 0 }
})
-// bytes uploaded
+// Bytes uploaded
Object.defineProperty(Torrent.prototype, 'uploaded', {
- get: function () {
- return this.swarm.uploaded
- }
+ get: function () { return this.swarm ? this.swarm.uploaded : 0 }
})
-// ratio of bytes downloaded to uploaded
-Object.defineProperty(Torrent.prototype, 'ratio', {
- get: function () {
- return (this.uploaded && (this.downloaded / this.uploaded)) || 0
- }
+// TODO: add this and use it for "progress" property
+// Object.defineProperty(Torrent.prototype, 'verified', {
+// get: function () {
+// var self = this
+// return self.pieces.reduce(function (total, piece) {
+// return total + (piece.verified ? piece.length : piece.blocksWritten * BLOCK_LENGTH)
+// }, 0)
+// }
+// })
+
+/**
+ * The number of missing pieces. Used to implement 'end game' mode.
+ */
+// Object.defineProperty(Storage.prototype, 'numMissing', {
+// get: function () {
+// var self = this
+// var numMissing = self.pieces.length
+// for (var index = 0, len = self.pieces.length; index < len; index++) {
+// numMissing -= self.bitfield.get(index)
+// }
+// return numMissing
+// }
+// })
+
+// Percentage complete, represented as a number between 0 and 1
+Object.defineProperty(Torrent.prototype, 'progress', {
+ get: function () { return this.length ? this.downloaded / this.length : 0 }
})
-Object.defineProperty(Torrent.prototype, 'magnetURI', {
- get: function () {
- return parseTorrent.toMagnetURI(this.parsedTorrent)
- }
+// Seed ratio
+Object.defineProperty(Torrent.prototype, 'ratio', {
+ get: function () { return this.uploaded / (this.downloaded || 1) }
})
-Object.defineProperty(Torrent.prototype, 'torrentFile', {
- get: function () {
- return parseTorrent.toTorrentFile(this.parsedTorrent)
- }
+// Number of peers
+Object.defineProperty(Torrent.prototype, 'numPeers', {
+ get: function () { return this.swarm ? this.swarm.numPeers : 0 }
})
Object.defineProperty(Torrent.prototype, 'torrentFileURL', {
get: function () {
if (typeof window === 'undefined') throw new Error('browser-only property')
- if (this._torrentFileURL) return this._torrentFileURL
- this._torrentFileURL = window.URL.createObjectURL(
+ if (!this.torrentFile) return null
+ return window.URL.createObjectURL(
new window.Blob([ this.torrentFile ], { type: 'application/x-bittorrent' })
)
- return this._torrentFileURL
- }
-})
-
-Object.defineProperty(Torrent.prototype, 'numPeers', {
- get: function () {
- return this.swarm ? this.swarm.numPeers : 0
}
})
@@ -179,35 +181,14 @@ Torrent.prototype._onTorrentId = function (torrentId) {
Torrent.prototype._onParsedTorrent = function (parsedTorrent) {
var self = this
if (self.destroyed) return
- self.parsedTorrent = parsedTorrent
- self.infoHash = parsedTorrent.infoHash
-
- if (!self.infoHash) {
- return self._onError(new Error('Malformed torrent data: Missing info hash.'))
- }
-
- if (self.parsedTorrent.name) self.name = self.parsedTorrent.name // preliminary name
- // Allow specifying trackers via `opts` parameter
- if (self.opts.announce) {
- self.parsedTorrent.announce =
- self.parsedTorrent.announce.concat(self.opts.announce)
- }
+ self._processParsedTorrent(parsedTorrent)
- // So `webtorrent-hybrid` can force specific trackers to be used
- if (global.WEBTORRENT_ANNOUNCE) {
- self.parsedTorrent.announce =
- self.parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE)
- }
-
- // When no trackers specified, use some reasonable defaults
- if (self.parsedTorrent.announce.length === 0) {
- self.parsedTorrent.announce = createTorrent.announceList.map(function (list) {
- return list[0]
- })
+ if (!self.infoHash) {
+ return self._onError(new Error('Malformed torrent data: No info hash'))
}
- uniq(self.parsedTorrent.announce)
+ if (!self._path) self._path = path.join(TMP, self.infoHash)
// create swarm
self.swarm = new Swarm(self.infoHash, self.client.peerId, {
@@ -238,6 +219,37 @@ Torrent.prototype._onParsedTorrent = function (parsedTorrent) {
})
}
+Torrent.prototype._processParsedTorrent = function (parsedTorrent) {
+ if (this.announce) {
+ // Allow specifying trackers via `opts` parameter
+ parsedTorrent.announce = parsedTorrent.announce.concat(this.announce)
+ }
+
+ if (global.WEBTORRENT_ANNOUNCE) {
+ // So `webtorrent-hybrid` can force specific trackers to be used
+ parsedTorrent.announce = parsedTorrent.announce.concat(global.WEBTORRENT_ANNOUNCE)
+ }
+
+ if (parsedTorrent.announce.length === 0) {
+ // When no trackers specified, use some reasonable defaults
+ parsedTorrent.announce = createTorrent.announceList.map(function (list) {
+ return list[0]
+ })
+ }
+
+ if (this.urlList) {
+ // Allow specifying web seeds via `opts` parameter
+ parsedTorrent.urlList = parsedTorrent.urlList.concat(this.urlList)
+ }
+
+ uniq(parsedTorrent.announce)
+
+ extend(this, parsedTorrent)
+
+ this.magnetURI = parseTorrent.toMagnetURI(parsedTorrent)
+ this.torrentFile = parseTorrent.toTorrentFile(parsedTorrent)
+}
+
Torrent.prototype._onSwarmListening = function () {
var self = this
if (self.destroyed) return
@@ -246,7 +258,7 @@ Torrent.prototype._onSwarmListening = function () {
// begin discovering peers via the DHT and tracker servers
self.discovery = new Discovery({
- announce: self.parsedTorrent.announce,
+ announce: self.announce,
dht: self.client.dht,
tracker: self.client.tracker,
peerId: self.client.peerId,
@@ -262,74 +274,73 @@ Torrent.prototype._onSwarmListening = function () {
reemit(self.discovery, self, ['trackerAnnounce', 'dhtAnnounce', 'warning'])
// if full metadata was included in initial torrent id, use it
- if (self.parsedTorrent.info) self._onMetadata(self.parsedTorrent)
+ if (self.info) self._onMetadata(self)
self.emit('listening', self.client.torrentPort)
}
/**
- * Called when the metadata is received.
+ * Called when the full torrent metadata is received.
*/
Torrent.prototype._onMetadata = function (metadata) {
var self = this
if (self.metadata || self.destroyed) return
debug('got metadata')
+ var parsedTorrent
if (metadata && metadata.infoHash) {
// `metadata` is a parsed torrent (from parse-torrent module)
- self.metadata = parseTorrent.toTorrentFile(metadata)
- self.parsedTorrent = metadata
+ parsedTorrent = metadata
} else {
- self.metadata = metadata
- var announce = self.parsedTorrent.announce
- var urlList = self.parsedTorrent.urlList
try {
- self.parsedTorrent = parseTorrent(self.metadata)
+ parsedTorrent = parseTorrent(metadata)
} catch (err) {
return self._onError(err)
}
- self.parsedTorrent.announce = announce
- self.parsedTorrent.urlList = urlList
}
- // update preliminary torrent name
- self.name = self.parsedTorrent.name
+ self._processParsedTorrent(parsedTorrent)
+ self.metadata = self.torrentFile
// update discovery module with full torrent metadata
- self.discovery.setTorrent(self.parsedTorrent)
+ self.discovery.setTorrent(self)
// add web seed urls (BEP19)
- if (self.parsedTorrent.urlList) {
- self.parsedTorrent.urlList.forEach(self.addWebSeed.bind(self))
- }
+ if (self.urlList) self.urlList.forEach(self.addWebSeed.bind(self))
- self.rarityMap = new RarityMap(self.swarm, self.parsedTorrent.pieces.length)
+ self.rarityMap = new RarityMap(self.swarm, self.pieces.length)
- self.storage = new self._storageImpl(self.parsedTorrent, self.opts)
- self.storage.on('piece', self._onStoragePiece.bind(self))
- self.storage.on('file', function (file) {
- self.emit('file', file)
- })
+ self.storage = new ImmediateChunkStore(
+ new self._storage(self.pieceLength, {
+ files: self.files.map(function (file) {
+ return {
+ path: path.join(self._path, file.path),
+ length: file.length,
+ offset: file.offset
+ }
+ })
+ })
+ )
- self._reservations = self.storage.pieces.map(function () {
- return []
+ self.files = self.files.map(function (file) {
+ return new File(self, file)
})
- self.storage.on('done', function () {
- if (self.discovery.tracker) self.discovery.tracker.complete()
+ self._hashes = self.pieces
- debug('torrent ' + self.infoHash + ' done')
- self.emit('done')
+ self.pieces = self.pieces.map(function (hash, i) {
+ var pieceLength = (i === self.pieces.length - 1)
+ ? self.lastPieceLength
+ : self.pieceLength
+ return new Piece(pieceLength)
})
- self.storage.on('select', self.select.bind(self))
- self.storage.on('deselect', self.deselect.bind(self))
- self.storage.on('critical', self.critical.bind(self))
-
- self.storage.files.forEach(function (file) {
- self.files.push(file)
+ self._reservations = self.pieces.map(function () {
+ return []
})
+ self.bitfield = new BitField(self.pieces.length)
+
self.swarm.wires.forEach(function (wire) {
// If we didn't have the metadata at the time ut_metadata was initialized for this
// wire, we still want to make it available to the peer in case they request it.
@@ -338,37 +349,38 @@ Torrent.prototype._onMetadata = function (metadata) {
self._onWireWithMetadata(wire)
})
- if (self.verify) {
- process.nextTick(function () {
- debug('verifying existing torrent data')
- var numPieces = 0
- var numVerified = 0
-
- // TODO: move storage verification to storage.js?
- parallel(self.storage.pieces.map(function (piece) {
- return function (cb) {
- self.storage.read(piece.index, function (err, buffer) {
- numPieces += 1
- self.emit('verifying', {
- percentDone: 100 * numPieces / self.storage.pieces.length,
- percentVerified: 100 * numVerified / self.storage.pieces.length
- })
-
- if (!err && buffer) {
- // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer
- piece.verify(buffer)
- numVerified += piece.verified
- debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index)
- }
- // continue regardless of whether piece verification failed
- cb()
- }, true) // forces override to allow reading from non-verified pieces
- }
- }), self._onStorage.bind(self))
- })
- } else {
- process.nextTick(self._onStorage.bind(self))
- }
+ // VERIFY TORRENT DATA
+ // TODO: remove nextTick
+ // process.nextTick(function () {
+ // debug('verifying existing torrent data')
+ // var numPieces = 0
+ // var numVerified = 0
+
+ // // TODO: move storage verification to storage.js?
+ // parallel(self.storage.pieces.map(function (piece) {
+ // return function (cb) {
+ // self.storage.read(piece.index, function (err, buffer) {
+ // numPieces += 1
+ // self.emit('verifying', {
+ // percentDone: 100 * numPieces / self.storage.pieces.length,
+ // percentVerified: 100 * numVerified / self.storage.pieces.length
+ // })
+
+ // if (!err && buffer) {
+ // // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer
+ // piece.verify(buffer)
+ // numVerified += piece.verified
+ // debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index)
+ // }
+ // // continue regardless of whether piece verification failed
+ // cb()
+ // }, true) // forces override to allow reading from non-verified pieces
+ // }
+ // }), self._onStorage.bind(self))
+ // })
+ process.nextTick(function () {
+ self._onStorage()
+ })
process.nextTick(function () {
self.emit('metadata')
@@ -376,6 +388,26 @@ Torrent.prototype._onMetadata = function (metadata) {
}
/**
+ * Called when the metadata, swarm, and underlying storage are all fully initialized.
+ */
+Torrent.prototype._onStorage = function () {
+ var self = this
+ if (self.destroyed) return
+ debug('on storage')
+
+ // start off selecting the entire torrent with low priority
+ self.select(0, self.pieces.length - 1, false)
+
+ self._rechokeIntervalId = setInterval(self._rechoke.bind(self), RECHOKE_INTERVAL)
+ if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref()
+
+ process.nextTick(function () {
+ self.ready = true
+ self.emit('ready')
+ })
+}
+
+/**
* Destroy and cleanup this torrent.
*/
Torrent.prototype.destroy = function (cb) {
@@ -391,11 +423,6 @@ Torrent.prototype.destroy = function (cb) {
self._rechokeIntervalId = null
}
- self.files.forEach(function (file) {
- if (file._blobURL) window.URL.revokeObjectURL(file._blobURL)
- })
- if (self._torrentFileURL) window.URL.revokeObjectURL(self._torrentFileURL)
-
var tasks = []
self._servers.forEach(function (server) {
@@ -441,7 +468,7 @@ Torrent.prototype.addPeer = function (peer) {
*/
Torrent.prototype.addWebSeed = function (url) {
var self = this
- self.swarm.addWebSeed(url, self.parsedTorrent)
+ self.swarm.addWebSeed(url, self)
}
/**
@@ -454,7 +481,7 @@ Torrent.prototype.addWebSeed = function (url) {
*/
Torrent.prototype.select = function (start, end, priority, notify) {
var self = this
- if (start > end || start < 0 || end >= self.storage.pieces.length) {
+ if (start > end || start < 0 || end >= self.pieces.length) {
throw new Error('invalid selection ', start, ':', end)
}
priority = Number(priority) || 0
@@ -549,7 +576,7 @@ Torrent.prototype._onWire = function (wire, addr) {
})
// Timeout for piece requests to this peer
- wire.setTimeout(self.pieceTimeout, true)
+ wire.setTimeout(PIECE_TIMEOUT, true)
// Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire
wire.setKeepAlive(true)
@@ -598,7 +625,6 @@ Torrent.prototype._onWire = function (wire, addr) {
Torrent.prototype._onWireWithMetadata = function (wire) {
var self = this
var timeoutId = null
- var timeoutMs = self.chokeTimeout
function onChokeTimeout () {
if (self.destroyed || wire.destroyed) return
@@ -607,15 +633,15 @@ Torrent.prototype._onWireWithMetadata = function (wire) {
wire.amInterested) {
wire.destroy()
} else {
- timeoutId = setTimeout(onChokeTimeout, timeoutMs)
+ timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
if (timeoutId.unref) timeoutId.unref()
}
}
var i = 0
function updateSeedStatus () {
- if (wire.peerPieces.length !== self.storage.pieces.length) return
- for (; i < self.storage.pieces.length; ++i) {
+ if (wire.peerPieces.length !== self.pieces.length) return
+ for (; i < self.pieces.length; ++i) {
if (!wire.peerPieces.get(i)) return
}
wire.isSeeder = true
@@ -642,7 +668,7 @@ Torrent.prototype._onWireWithMetadata = function (wire) {
wire.on('choke', function () {
clearTimeout(timeoutId)
- timeoutId = setTimeout(onChokeTimeout, timeoutMs)
+ timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
if (timeoutId.unref) timeoutId.unref()
})
@@ -652,22 +678,18 @@ Torrent.prototype._onWireWithMetadata = function (wire) {
})
wire.on('request', function (index, offset, length, cb) {
- // Disconnect from peers that request more than 128KB, per spec
if (length > MAX_BLOCK_LENGTH) {
- debug(
- 'got invalid block size request %s (from %s)',
- length, wire.remoteAddress + ':' + wire.remotePort
- )
+ // Per spec, disconnect from peers that request >128KB
return wire.destroy()
}
-
- self.storage.readBlock(index, offset, length, cb)
+ if (self.pieces[index]) return
+ self.storage.get(index, { offset: offset, length: length }, cb)
})
- wire.bitfield(self.storage.bitfield) // always send bitfield (required)
+ wire.bitfield(self.bitfield) // always send bitfield (required)
wire.interested() // always start out interested
- timeoutId = setTimeout(onChokeTimeout, timeoutMs)
+ timeoutId = setTimeout(onChokeTimeout, CHOKE_TIMEOUT)
if (timeoutId.unref) timeoutId.unref()
wire.isSeeder = false
@@ -675,45 +697,6 @@ Torrent.prototype._onWireWithMetadata = function (wire) {
}
/**
- * Called when the metadata, swarm, and underlying storage are all fully initialized.
- */
-Torrent.prototype._onStorage = function () {
- var self = this
- if (self.destroyed) return
- debug('on storage')
-
- // allow writes to storage only after initial piece verification is finished
- self.storage.readonly = false
-
- // start off selecting the entire torrent with low priority
- self.select(0, self.storage.pieces.length - 1, false)
-
- self._rechokeIntervalId = setInterval(self._rechoke.bind(self), RECHOKE_INTERVAL)
- if (self._rechokeIntervalId.unref) self._rechokeIntervalId.unref()
-
- process.nextTick(function () {
- self.ready = true
- self.emit('ready')
- })
-}
-
-/**
- * When a piece is fully downloaded, notify all peers with a HAVE message.
- * @param {Piece} piece
- */
-Torrent.prototype._onStoragePiece = function (piece) {
- var self = this
- debug('piece done %s', piece.index)
- self._reservations[piece.index] = null
-
- self.swarm.wires.forEach(function (wire) {
- wire.have(piece.index)
- })
-
- self._gcSelections()
-}
-
-/**
* Called on selection changes.
*/
Torrent.prototype._updateSelections = function () {
@@ -737,13 +720,13 @@ Torrent.prototype._gcSelections = function () {
var oldOffset = s.offset
// check for newly downloaded pieces in selection
- while (self.storage.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) {
+ while (self.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) {
s.offset++
}
if (oldOffset !== s.offset) s.notify()
if (s.to !== s.from + s.offset) continue
- if (!self.storage.bitfield.get(s.from + s.offset)) continue
+ if (!self.bitfield.get(s.from + s.offset)) continue
// remove fully downloaded selection
self._selections.splice(i--, 1) // decrement i to offset splice
@@ -816,7 +799,6 @@ Torrent.prototype._updateWire = function (wire) {
for (var i = self._selections.length; i--;) {
var next = self._selections[i]
-
var piece
if (self.strategy === 'rarest') {
var start = next.from + next.offset
@@ -849,15 +831,14 @@ Torrent.prototype._updateWire = function (wire) {
var speed = wire.downloadSpeed() || 1
if (speed > SPEED_THRESHOLD) return function () { return true }
- var secs = Math.max(1, wire.requests.length) * Storage.BLOCK_LENGTH / speed
+ var secs = Math.max(1, wire.requests.length) * Piece.BLOCK_LENGTH / speed
var tries = 10
var ptr = 0
return function (index) {
- if (!tries || self.storage.bitfield.get(index)) return true
+ if (!tries || self.bitfield.get(index)) return true
- var piece = self.storage.pieces[index]
- var missing = piece.blocks.length - piece.blocksWritten
+ var missing = self.pieces[index].missing
for (; ptr < self.swarm.wires.length; ptr++) {
var otherWire = self.swarm.wires[ptr]
@@ -1017,10 +998,9 @@ Torrent.prototype._rechoke = function () {
*/
Torrent.prototype._hotswap = function (wire, index) {
var self = this
- if (!self.hotswapEnabled) return false
var speed = wire.downloadSpeed()
- if (speed < Storage.BLOCK_LENGTH) return false
+ if (speed < Piece.BLOCK_LENGTH) return false
if (!self._reservations[index]) return false
var r = self._reservations[index]
@@ -1054,7 +1034,7 @@ Torrent.prototype._hotswap = function (wire, index) {
var req = minWire.requests[i]
if (req.piece !== index) continue
- self.storage.cancelBlock(index, req.offset)
+ self.pieces[index].cancel((req.offset / Piece.BLOCK_SIZE) | 0)
}
self.emit('hotswap', minWire, wire, index)
@@ -1068,66 +1048,121 @@ Torrent.prototype._request = function (wire, index, hotswap) {
var self = this
var numRequests = wire.requests.length
- if (self.storage.bitfield.get(index)) return false
+ if (self.bitfield.get(index)) return false
+
var maxOutstandingRequests = getPipelineLength(wire, PIPELINE_MAX_DURATION)
if (numRequests >= maxOutstandingRequests) return false
+ // var endGame = (wire.requests.length === 0 && self.storage.numMissing < 30)
- var endGame = (wire.requests.length === 0 && self.storage.numMissing < 30)
- var block = self.storage.reserveBlock(index, endGame)
+ var piece = self.pieces[index]
+ var reservation = piece.reserve()
- if (!block && !endGame && hotswap && self._hotswap(wire, index)) {
- block = self.storage.reserveBlock(index, false)
+ if (reservation === -1 && hotswap && self._hotswap(wire, index)) {
+ reservation = piece.reserve()
}
- if (!block) return false
+ if (reservation === -1) return false
var r = self._reservations[index]
- if (!r) {
- r = self._reservations[index] = []
- }
+ if (!r) r = self._reservations[index] = []
var i = r.indexOf(null)
if (i === -1) i = r.length
r[i] = wire
- function gotPiece (err, buffer) {
- if (!self.ready) {
- self.once('ready', function () {
- gotPiece(err, buffer)
- })
- return
- }
+ var chunkOffset = piece.chunkOffset(reservation)
+ var chunkLength = piece.chunkLength(reservation)
+
+ wire.request(index, chunkOffset, chunkLength, function onChunk (err, chunk) {
+ // TODO: what is this for?
+ if (!self.ready) return self.once('ready', function () { onChunk(err, chunk) })
if (r[i] === wire) r[i] = null
+ if (piece !== self.pieces[index]) return onUpdateTick()
+
if (err) {
debug(
'error getting piece %s (offset: %s length: %s) from %s: %s',
- index, block.offset, block.length, wire.remoteAddress + ':' + wire.remotePort,
+ index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort,
err.message
)
- self.storage.cancelBlock(index, block.offset)
- process.nextTick(self._update.bind(self))
- return false
- } else {
- debug(
- 'got piece %s (offset: %s length: %s) from %s',
- index, block.offset, block.length, wire.remoteAddress + ':' + wire.remotePort
- )
- self.storage.writeBlock(index, block.offset, buffer, function (err) {
- if (err) {
- debug('error writing block')
- self.storage.cancelBlock(index, block.offset)
+ piece.cancel(reservation)
+ onUpdateTick()
+ return
+ }
+
+ debug(
+ 'got piece %s (offset: %s length: %s) from %s',
+ index, chunkOffset, chunkLength, wire.remoteAddress + ':' + wire.remotePort
+ )
+
+ if (!piece.set(reservation, chunk, wire)) return onUpdateTick()
+
+ var buf = piece.flush()
+
+ // TODO: might need to set self.pieces[index] = null here since sha1 is async
+
+ sha1(buf, function (hash) {
+ if (hash === self._hashes[index]) {
+ if (!self.pieces[index]) return
+ debug('piece verified %s', index)
+
+ self.pieces[index] = null
+ self._reservations[index] = null
+
+ self.bitfield.set(index, true)
+ self.storage.put(index, buf)
+
+ self.swarm.wires.forEach(function (wire) {
+ wire.have(index)
+ })
+
+ // are any new files done?
+ self.files.forEach(function (file) {
+ if (file.done) return
+ for (var i = file._startPiece; i <= file._endPiece; ++i) {
+ if (!self.bitfield.get(i)) return
+ }
+ file.done = true
+ file.emit('done')
+ debug('file done: ' + file.name)
+ })
+
+ // is the torrent done?
+ if (self.files.every(function (file) { return file.done })) {
+ self.done = true
+ self.emit('done')
+ debug('torrent done: ' + self.infoHash)
+ if (self.discovery.tracker) self.discovery.tracker.complete()
}
- process.nextTick(self._update.bind(self))
- })
- }
- }
+ self._gcSelections()
+ } else {
+ self.pieces[index] = new Piece(piece.length)
+ self.emit('warning', new Error('Piece ' + index + ' failed verification'))
+ }
+ onUpdateTick()
+ })
+ })
- wire.request(index, block.offset, block.length, gotPiece)
+ function onUpdateTick () {
+ process.nextTick(function () { self._update() })
+ }
return true
}
+Torrent.prototype.load = function (streams, cb) {
+ var self = this
+ loadChunkStore(streams, this.storage, Piece.BLOCK_LENGTH, function (err) {
+ if (err) return cb(err)
+ self.pieces.forEach(function (piece, index) {
+ self.pieces[index] = null
+ self.bitfield.set(index, true)
+ })
+ cb(null)
+ })
+}
+
Torrent.prototype.createServer = function (opts) {
var self = this
if (typeof Server === 'function' /* browser exclude */) {
@@ -1145,7 +1180,7 @@ Torrent.prototype._onError = function (err) {
}
function getPipelineLength (wire, duration) {
- return Math.ceil(2 + duration * wire.downloadSpeed() / Storage.BLOCK_LENGTH)
+ return Math.ceil(2 + duration * wire.downloadSpeed() / Piece.BLOCK_LENGTH)
}
/**
@@ -1154,3 +1189,5 @@ function getPipelineLength (wire, duration) {
function randomInt (high) {
return Math.random() * high | 0
}
+
+function noop () {}
diff --git a/package.json b/package.json
index 0c03eaf..c056981 100644
--- a/package.json
+++ b/package.json
@@ -11,9 +11,9 @@
"webtorrent": "./bin/cmd.js"
},
"browser": {
- "./lib/fs-storage": false,
"./lib/server": false,
"bittorrent-dht/client": false,
+ "fs-chunk-store": "memory-chunk-store",
"load-ip-set": false,
"ut_pex": false
},
@@ -32,10 +32,13 @@
"dezalgo": "^1.0.1",
"end-of-stream": "^1.0.0",
"executable": "^1.1.0",
+ "fs-chunk-store": "^1.3.1",
"hat": "0.0.3",
+ "immediate-chunk-store": "^1.0.7",
"inherits": "^2.0.1",
"inquirer": "^0.8.0",
"load-ip-set": "^1.0.3",
+ "memory-chunk-store": "^1.1.2",
"mime": "^1.2.11",
"minimist": "^1.1.0",
"mkdirp": "^0.5.0",
@@ -44,6 +47,7 @@
"network-address": "^1.0.0",
"once": "^1.3.1",
"parse-torrent": "^5.1.0",
+ "path-exists": "^1.0.0",
"pretty-bytes": "^2.0.1",
"pump": "^1.0.0",
"random-access-file": "^0.3.1",
@@ -56,6 +60,7 @@
"speedometer": "^0.1.2",
"thunky": "^0.1.0",
"torrent-discovery": "^3.0.0",
+ "torrent-piece": "^1.0.0",
"uniq": "^1.0.1",
"ut_metadata": "^2.1.0",
"ut_pex": "^1.0.1",
diff --git a/test/download-dht-magnet.js b/test/download-dht-magnet.js
index 3c05b30..93e9534 100644
--- a/test/download-dht-magnet.js
+++ b/test/download-dht-magnet.js
@@ -55,7 +55,7 @@ test('Download using DHT (via magnet uri)', function (t) {
maybeDone()
})
- torrent.storage.load(fs.createReadStream(leavesPath), function (err) {
+ torrent.load(fs.createReadStream(leavesPath), function (err) {
t.error(err)
wroteStorage = true
maybeDone()
diff --git a/test/download-dht-torrent.js b/test/download-dht-torrent.js
index 63f90bd..cda235a 100644
--- a/test/download-dht-torrent.js
+++ b/test/download-dht-torrent.js
@@ -55,7 +55,7 @@ test('Download using DHT (via .torrent file)', function (t) {
maybeDone(null)
})
- torrent.storage.load(fs.createReadStream(leavesPath), function (err) {
+ torrent.load(fs.createReadStream(leavesPath), function (err) {
wroteStorage = true
maybeDone(err)
})
diff --git a/test/download-tracker-magnet.js b/test/download-tracker-magnet.js
index 079c162..43ed393 100644
--- a/test/download-tracker-magnet.js
+++ b/test/download-tracker-magnet.js
@@ -66,7 +66,7 @@ function magnetDownloadTest (t, serverType) {
t.deepEqual(torrent.files.map(function (file) { return file.name }), names)
- torrent.storage.load(fs.createReadStream(leavesPath), function (err) {
+ torrent.load(fs.createReadStream(leavesPath), function (err) {
cb(err, client1)
})
})
diff --git a/test/download-tracker-torrent.js b/test/download-tracker-torrent.js
index 3c9a8be..c724f0a 100644
--- a/test/download-tracker-torrent.js
+++ b/test/download-tracker-torrent.js
@@ -66,7 +66,7 @@ function torrentDownloadTest (t, serverType) {
t.deepEqual(torrent.files.map(function (file) { return file.name }), names)
- torrent.storage.load(fs.createReadStream(leavesPath), function (err) {
+ torrent.load(fs.createReadStream(leavesPath), function (err) {
cb(err, client1)
})
})
diff --git a/test/server.js b/test/server.js
index 920637a..3a3cd36 100644
--- a/test/server.js
+++ b/test/server.js
@@ -29,6 +29,6 @@ test('start http server programmatically', function (t) {
})
})
torrent.on('ready', function () {
- torrent.storage.load(fs.createReadStream(leavesPath))
+ torrent.load(fs.createReadStream(leavesPath))
})
})
diff --git a/test/storage.js b/test/storage.js
deleted file mode 100644
index 15c3987..0000000
--- a/test/storage.js
+++ /dev/null
@@ -1,61 +0,0 @@
-var fs = require('fs')
-var parseTorrent = require('parse-torrent')
-var Storage = require('../lib/storage')
-var test = require('tape')
-
-var torrents = [ 'leaves', 'pride' ].map(function (name) {
- var torrent = fs.readFileSync(__dirname + '/torrents/' + name + '.torrent')
-
- return {
- name: name,
- torrent: torrent,
- parsedTorrent: parseTorrent(torrent)
- }
-})
-
-torrents.forEach(function (torrent) {
- test('sanity check backing storage for ' + torrent.name + ' torrent', function (t) {
- var parsedTorrent = torrent.parsedTorrent
- var storage = new Storage(parsedTorrent)
-
- t.equal(storage.files.length, parsedTorrent.files.length)
- t.equal(storage.pieces.length, parsedTorrent.pieces.length)
-
- var length = 0
- var pieces = 0
-
- storage.pieces.forEach(function (piece) {
- t.notOk(piece.verified)
- length += piece.length
-
- // ensure all blocks start out empty
- for (var i = 0; i < piece.blocks.length; ++i) {
- t.equal(piece.blocks[i], 0)
- }
- })
-
- t.equal(length, parsedTorrent.length)
- length = 0
-
- storage.files.forEach(function (file) {
- t.notOk(file.done)
- length += file.length
- pieces += file.pieces.length
-
- t.assert(file.length >= 0)
- t.assert(file.pieces.length >= 0)
- })
-
- t.equal(length, parsedTorrent.length)
-
- if (parsedTorrent.files.length > 1) {
- // if the torrent contains multiple files, the pieces may overlap file boundaries,
- // so the aggregate number of file pieces will be at least the number of pieces.
- t.assert(pieces >= parsedTorrent.pieces.length)
- } else {
- t.equal(pieces, parsedTorrent.pieces.length)
- }
-
- t.end()
- })
-})