diff options
author | Feross Aboukhadijeh <feross@feross.org> | 2014-09-21 05:41:24 +0400 |
---|---|---|
committer | Feross Aboukhadijeh <feross@feross.org> | 2014-09-21 05:41:24 +0400 |
commit | 93686505fbc90522c75b6c151ec7261aa76098de (patch) | |
tree | 5b474b920e79b7b39b6804fac5e6a641de6ae843 | |
parent | 2e14192c311f64c20496a72af7ffce36495be92b (diff) |
merge `bittorrent-client` into this module
When I started the WebTorrent project I thought there were going to
need to be two separate client implementations (bittorrent-client and
webtorrent-client) that would get tied together in a higher-level
module.
Fortunately, this was not necessary because of the awesome “browser”
field support in browserify. By substituting just a few modules, we can
make the same module (webtorrent) work in node AND the browser, with
the same codebase!
So, from now on, you can just `require(‘webtorrent’)` in node or the
browser, and it will just work. You can also `npm install webtorrent`
if you want to use bittorrent in a node app or script. Lastly, you can
`npm install webtorrent -g` if you want to use webtorrent as a command
line app (it installs a `webtorrent` command).
-rw-r--r-- | .travis.yml | 2 | ||||
-rw-r--r-- | .zuul.yml | 6 | ||||
-rw-r--r-- | README.md | 191 | ||||
-rwxr-xr-x | bin/clone.sh | 1 | ||||
-rw-r--r-- | index.js | 271 | ||||
-rw-r--r-- | lib/file-stream.js | 99 | ||||
-rw-r--r-- | lib/fs-storage.js | 8 | ||||
-rw-r--r-- | lib/rarity-map.js | 86 | ||||
-rw-r--r-- | lib/storage.js | 554 | ||||
-rw-r--r-- | lib/torrent.js | 991 | ||||
-rw-r--r-- | package.json | 47 | ||||
-rw-r--r-- | test/cmd.js (renamed from test/basic.js) | 15 | ||||
-rw-r--r-- | test/download.js | 307 | ||||
-rw-r--r-- | test/metadata.js | 47 | ||||
-rw-r--r-- | test/multiple.js | 60 | ||||
-rw-r--r-- | test/package.json | 7 | ||||
-rw-r--r-- | test/rarity-map.js | 113 | ||||
-rw-r--r-- | test/storage.js | 60 | ||||
-rw-r--r-- | test/torrents/Leaves of Grass by Walt Whitman.epub | bin | 0 -> 362017 bytes | |||
-rw-r--r-- | test/torrents/pride.torrent | bin | 0 -> 3666 bytes |
20 files changed, 2786 insertions, 79 deletions
diff --git a/.travis.yml b/.travis.yml index 2083806..18ae2d8 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,4 @@ language: node_js node_js: - "0.11" - - "0.10"
\ No newline at end of file + - "0.10" diff --git a/.zuul.yml b/.zuul.yml new file mode 100644 index 0000000..6f21af4 --- /dev/null +++ b/.zuul.yml @@ -0,0 +1,6 @@ +ui: tape +browsers: + - name: chrome + version: 35..latest + - name: firefox + version: 30..latest @@ -5,6 +5,8 @@ ### WebTorrent – Streaming torrent client for node & the browser +[![Sauce Test Status](https://saucelabs.com/browser-matrix/webtorrent.svg)](https://saucelabs.com/u/webtorrent-client) + > Warning: This is pre-alpha software. **Watch/star to follow along with progress.** ### Features @@ -26,6 +28,7 @@ **[extension api](https://github.com/feross/bittorrent-protocol#extension-api)** for adding new extensions - **Comprehensive test suite** (completely offline, so it's reliable and fast) +- Modular design (see [module list](#modules)) #### Web-specific features @@ -34,6 +37,18 @@ other domain – no same origin policy. WebTorrent is a P2P network for the entire web! - Stream video torrents into a `<video>` tag (`webm (vp8, vp9)` or `mp4 (h.264)`) +### Ways to help + +- Report bugs! +- Fix an **[open issue](https://github.com/feross/webtorrent/issues?state=open)** in this + repo or **[one of it's many dependencies](#modules)**. WebTorrent is an + **[OPEN Open Source Project](CONTRIBUTING.md)**! +- If you believe in the vision, send bitcoin to *1B6aystcqu8fd6ejzpmMFMPRqH9b86iiwh* or + **[donate](https://coinbase.com/checkouts/7c683397e33166651dedfebee6fb0f96)** via + Coinbase to support the project. + +Join us in IRC on freenode at `#webtorrent` if you want to help with development, or you just want to hang out with some cool mad science hackers :) + ### Install With [npm](https://npmjs.org/), run: @@ -149,17 +164,172 @@ There are many supported streaming options: --xbmc stream to XBMC ``` -### Ways to help +### API -- Report bugs! -- Fix an **[open issue](https://github.com/feross/webtorrent/issues?state=open)** in this - repo or **[one of it's many dependencies](#modules)**. WebTorrent is an - **[OPEN Open Source Project](CONTRIBUTING.md)**! -- If you believe in the vision, send bitcoin to *1B6aystcqu8fd6ejzpmMFMPRqH9b86iiwh* or - **[donate](https://coinbase.com/checkouts/7c683397e33166651dedfebee6fb0f96)** via - Coinbase to support the project. +#### `client = new WebTorrent([opts])` -Join us in IRC on freenode at `#webtorrent` if you want to help with development, or you just want to hang out with some cool mad science hackers :) +Create a new `WebTorrent` instance. + +If `opts` is specified, then the default options (shown below) will be overridden. + +``` js +{ + dht: true, // Whether or not to enable DHT + maxPeers: 100, // Max number of peers to connect to (per torrent) + nodeId: '', // DHT protocol node ID (otherwise, randomly generated) + peerId: '', // Wire protocol peer ID (otherwise, randomly generated) + storage: function // custom storage engine, or false for in-memory engine + tracker: true, // Whether or not to enable trackers + verify: true, // Verify previously stored data before starting +} +``` + +#### `client.add(torrentId, [opts], [function ontorrent (torrent) {}])` + +Start downloading a new torrent. Aliased as `client.download`. + +`torrentId` can be any of the following: + +- info hash (as a hex string or Buffer) +- magnet uri (as a utf8 string) +- .torrent file (as a Buffer) +- parsed torrent (from [parse-torrent](https://github.com/feross/parse-torrent)) +- http/https url to a .torrent file (string) +- filesystem path to a .torrent file (string) + +If `ontorrent` is specified, then it will be called when **this** torrent is ready to be +used (i.e. metadata is available). Note: this is distinct from the 'torrent' event which +will fire for **all** torrents. + +If you want access to the torrent object immediately in order to listen to events as the +metadata is fetched from the network, then use the return value of `client.add`. If you +just want the file data, then use `ontorrent` or the 'torrent' event. + +#### `client.seed(input, [opts], [function onseed (torrent) {}])` + +Start seeding a new torrent. + +`input` can be any of the following: + +- path to the file or folder on filesystem (string) +- W3C [File](https://developer.mozilla.org/en-US/docs/Web/API/File) object (from an `<input>` or drag and drop) +- W3C [FileList](https://developer.mozilla.org/en-US/docs/Web/API/FileList) object (basically an array of `File` objects) +- Array of `File` objects + +If `opts` is specified, it should contain the following types of options: + +- options for [create-torrent](https://github.com/feross/create-torrent#createtorrentinput-opts-function-callback-err-torrent-) (to allow configuration of the .torrent file that is created) +- options for `client.add` (see above) + +If `onseed` is specified, it will be called when the client has begun seeding the file. + +#### `client.on('torrent', function (torrent) {})` + +Emitted when a torrent is ready to be used (i.e. metadata is available and storage is +ready). See the torrent section for more info on what methods a `torrent` has. + +#### `client.remove(torrentId, [function callback (err) {}])` + +Remove a torrent from the client. Destroy all connections to peers and delete all saved +file data. If `callback` is specified, it will be called when file data is removed. + +#### `client.destroy()` + +Destroy the client, including all torrents and connections to peers. + +#### `client.listen([port], function () {})` + +Listen for incoming peers on the specified port. Port defaults to `6881` + +#### `client.torrents[...]` + +An array of all torrents in the client. + +#### `client.get(torrentId)` + +Returns the torrent with the given `torrentId`. Convenience method. Easier than +searching through the `client.torrents` array. + +#### `client.ratio` + +Seed ratio for all torrents in the client. + + +### torrent api + +#### `torrent.files[...]` + +An array of all files in the torrent. See the file section for more info on what methods +the file has. + +#### `torrent.swarm` + +The attached [bittorrent-swarm](https://github.com/feross/bittorrent-swarm) instance. + +#### `torrent.remove()` + +Alias for `client.remove(torrent)`. + +#### `torrent.addPeer(addr)` + +Adds a peer to the underlying [bittorrent-swarm](https://github.com/feross/bittorrent-swarm) instance. + +#### `torrent.select(start, end, [priority], [notify])` + +Selects a range of pieces to prioritize starting with `start` and ending with `end` (both inclusive) +at the given `priority`. `notify` is an optional callback to be called when the selection is updated +with new data. + +#### `torrent.deselect(start, end, priority)` + +Deprioritizes a range of previously selected pieces. + +#### `torrent.critical(start, end)` + +Marks a range of pieces as critical priority to be downloaded ASAP. From `start` to `end` +(both inclusive). + + +### file api + +#### `file.name` + +File name, as specified by the torrent. *Example: 'some-filename.txt'* + +#### `file.path` + +File path, as specified by the torrent. *Example: 'some-folder/some-filename.txt'* + +#### `file.length` + +File length (in bytes), as specified by the torrent. *Example: 12345* + +#### `file.select()` + +Selects the file to be downloaded, but at a lower priority than files with streams. +Useful if you know you need the file at a later stage. + +#### `file.deselect()` + +Deselects the file, which means it won't be downloaded unless someone creates a stream +for it. + +#### `stream = file.createReadStream([opts])` + +Create a [readable stream](http://nodejs.org/api/stream.html#stream_class_stream_readable) +to the file. Pieces needed by the stream will be prioritized highly and fetched from the +swarm first. + +You can pass `opts` to stream only a slice of a file. + +``` js +{ + start: startByte, + end: endByte +} +``` + +Both `start` and `end` are inclusive. ### Modules @@ -179,7 +349,6 @@ These are the modules I am writing to make WebTorrent work: |---|---|---|---| | **[webtorrent](https://github.com/feross/webtorrent)** | [![](https://img.shields.io/travis/feross/webtorrent.svg)](https://travis-ci.org/feross/webtorrent) | [![](https://img.shields.io/npm/v/webtorrent.svg)](https://npmjs.org/package/webtorrent) | **torrent client (this module)** | [addr-to-ip-port](https://github.com/feross/addr-to-ip-port) | [![](https://img.shields.io/travis/feross/addr-to-ip-port.svg)](https://travis-ci.org/feross/addr-to-ip-port) | [![](https://img.shields.io/npm/v/addr-to-ip-port.svg)](https://npmjs.org/package/addr-to-ip-port) | cache for addr->ip:port -| [bittorrent-client](https://github.com/feross/bittorrent-client) | [![](https://img.shields.io/travis/feross/bittorrent-client.svg)](https://travis-ci.org/feross/bittorrent-client) | [![](https://img.shields.io/npm/v/bittorrent-client.svg)](https://npmjs.org/package/bittorrent-client) | access torrents as stream | [bittorrent-dht](https://github.com/feross/bittorrent-dht) | [![](https://img.shields.io/travis/feross/bittorrent-dht.svg)](https://travis-ci.org/feross/bittorrent-dht) | [![](https://img.shields.io/npm/v/bittorrent-dht.svg)](https://npmjs.org/package/bittorrent-dht) | bittorrent dht client | [bittorrent-peerid](https://github.com/fisch0920/bittorrent-peerid) | [![](https://img.shields.io/travis/fisch0920/bittorrent-peerid.svg)](https://travis-ci.org/fisch0920/bittorrent-peerid) | [![](https://img.shields.io/npm/v/bittorrent-peerid.svg)](https://npmjs.org/package/bittorrent-peerid) | identify client name/version | [bittorrent-protocol](https://github.com/feross/bittorrent-protocol) | [![](https://img.shields.io/travis/feross/bittorrent-protocol.svg)](https://travis-ci.org/feross/bittorrent-protocol) | [![](https://img.shields.io/npm/v/bittorrent-protocol.svg)](https://npmjs.org/package/bittorrent-protocol) | bittorrent protocol stream @@ -304,7 +473,7 @@ Since WebTorrent is web-first, it's simple for users who do not understand .torr ### Known issues -#### Disable default Chromebook firewall +#### Downloads don't start on Chromebook Chromebooks are set to refuse all incoming connections by default. To change this, run: diff --git a/bin/clone.sh b/bin/clone.sh index 7da1260..7b7229b 100755 --- a/bin/clone.sh +++ b/bin/clone.sh @@ -7,7 +7,6 @@ fi pushd $1 git clone git@github.com:feross/addr-to-ip-port.git -git clone git@github.com:feross/bittorrent-client.git git clone git@github.com:feross/bittorrent-dht.git git clone git@github.com:fisch0920/bittorrent-peerid.git git clone git@github.com:feross/bittorrent-protocol.git @@ -1,27 +1,77 @@ +// TODO: dhtPort and torrentPort should be consistent between restarts +// TODO: peerId and nodeId should be consistent between restarts + module.exports = WebTorrent -var Client = require('bittorrent-client') +var createTorrent = require('create-torrent') var debug = require('debug')('webtorrent') +var DHT = require('bittorrent-dht/client') // browser exclude +var EventEmitter = require('events').EventEmitter var extend = require('extend.js') -var FSStorage = require('./lib/fs-storage') +var FSStorage = require('./lib/fs-storage') // browser exclude +var hat = require('hat') var inherits = require('inherits') +var loadIPSet = require('load-ip-set') // browser exclude var parallel = require('run-parallel') -var Server = require('./lib/server') +var parseTorrent = require('parse-torrent') +var Server = require('./lib/server') // browser exclude +var speedometer = require('speedometer') +var Storage = require('./lib/storage') +var Torrent = require('./lib/torrent') -inherits(WebTorrent, Client) +inherits(WebTorrent, EventEmitter) +/** + * WebTorrent Client + * @param {Object} opts + */ function WebTorrent (opts) { var self = this + if (!(self instanceof WebTorrent)) return new WebTorrent(opts) if (!opts) opts = {} - debug('new webtorrent') - - Client.call(self, opts) + EventEmitter.call(self) self.listening = false + self.torrentPort = opts.torrentPort || 0 + self.tracker = (opts.tracker !== undefined) ? opts.tracker : true + self.torrents = [] + + self.downloadSpeed = speedometer() + self.uploadSpeed = speedometer() + + self.storage = typeof opts.storage === 'function' + ? opts.storage + : (opts.storage !== false && typeof FSStorage === 'function' /* browser exclude */) + ? FSStorage + : Storage + + self.peerId = opts.peerId === undefined + ? new Buffer('-WW0001-' + hat(48), 'utf8') + : typeof opts.peerId === 'string' + ? new Buffer(opts.peerId, 'utf8') + : opts.peerId + self.peerIdHex = self.peerId.toString('hex') + + self.nodeId = opts.nodeId === undefined + ? new Buffer(hat(160), 'hex') + : typeof opts.nodeId === 'string' + ? new Buffer(opts.nodeId, 'hex') + : opts.nodeId + self.nodeIdHex = self.nodeId.toString('hex') + // TODO: implement webtorrent-dht + if (opts.dht !== false && typeof DHT === 'function' /* browser exclude */) { + // use a single DHT instance for all torrents, so the routing table can be reused + self.dht = new DHT(extend({ nodeId: self.nodeId }, opts.dht)) + self.dht.listen(opts.dhtPort) + } + + debug('new webtorrent (peerId %s, nodeId %s)', self.peerIdHex, self.nodeIdHex) + + // TODO: this is probably broken if (opts.list) return - if (opts.port !== false && typeof Server === 'function') { + if (opts.port !== false && typeof Server === 'function' /* browser exclude */) { self.server = new Server(self, opts.port) self.server.on('listening', function () { self.listening = true @@ -29,23 +79,68 @@ function WebTorrent (opts) { }) } - self.on('torrent', self._onTorrent.bind(self)) + if (typeof loadIPSet === 'function') { + loadIPSet(opts.blocklist, function (err, ipSet) { + self.blocked = ipSet + ready() + }) + } else process.nextTick(ready) + + function ready () { + self.ready = true + self.emit('ready') + } +} + +/** + * Seed ratio for all torrents in the client. + * @type {number} + */ +Object.defineProperty(WebTorrent.prototype, 'ratio', { + get: function () { + var self = this + var uploaded = self.torrents.reduce(function (total, torrent) { + return total + torrent.uploaded + }, 0) + var downloaded = self.torrents.reduce(function (total, torrent) { + return total + torrent.downloaded + }, 0) || 1 + return uploaded / downloaded + } +}) + +/** + * Returns the torrent with the given `torrentId`. Convenience method. Easier than + * searching through the `client.torrents` array. + * + * @param {string|Buffer|Object} torrentId + * @return {Torrent} + */ +WebTorrent.prototype.get = function (torrentId) { + var self = this + var parsed = parseTorrent(torrentId) + if (!parsed || !parsed.infoHash) return null + for (var i = 0, len = self.torrents.length; i < len; i++) { + var torrent = self.torrents[i] + if (torrent.infoHash === parsed.infoHash) return torrent + } + return null } /** - * Add a new torrent to the client. `torrentId` can be one of: + * Start downloading a new torrent. Aliased as `client.download`. * - * - magnet uri (utf8 string) - * - torrent file (buffer) - * - info hash (hex string or buffer) - * - parsed torrent (from parse-torrent module) - * - http/https url to a .torrent file (string) - * - filesystem path to a .torrent file (string) + * `torrentId` can be one of: + * - magnet uri (utf8 string) + * - torrent file (buffer) + * - info hash (hex string or buffer) + * - parsed torrent (from [parse-torrent](https://github.com/feross/parse-torrent)) + * - http/https url to a .torrent file (string) + * - filesystem path to a .torrent file (string) * - * @override - * @param {string|Buffer|Object} torrentId torrent (choose from above list) - * @param {Object} opts optional torrent-specific options - * @param {function=} ontorrent called when the torrent is ready (has metadata) + * @param {string|Buffer|Object} torrentId + * @param {Object} opts torrent-specific options + * @param {function=} ontorrent called when the torrent is ready (has metadata) */ WebTorrent.prototype.add = WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { @@ -55,15 +150,106 @@ WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { ontorrent = opts opts = {} } + if (!opts) opts = {} - opts = extend({ - storage: typeof FSStorage === 'function' && FSStorage - }, opts) + opts.client = self + opts.storage = opts.storage || self.storage - // TODO: fix this to work with multiple torrents + // TODO: fix this to work with multiple torrents. this should probably be in cmd.js self.index = opts.index - return Client.prototype.add.call(self, torrentId, opts, ontorrent) + var torrent = new Torrent(torrentId, extend({ client: self }, opts)) + self.torrents.push(torrent) + + function clientOnTorrent (_torrent) { + if (torrent.infoHash === _torrent.infoHash) { + ontorrent(torrent) + self.removeListener('torrent', clientOnTorrent) + } + } + if (ontorrent) self.on('torrent', clientOnTorrent) + + torrent.on('error', function (err) { + self.emit('error', err, torrent) + }) + + torrent.on('listening', function (port) { + self.emit('listening', port, torrent) + }) + + torrent.on('ready', function () { + // Emit 'torrent' when a torrent is ready to be used + debug('torrent') + self.emit('torrent', torrent) + self._onTorrent(torrent) + }) + + return torrent +} + +/** + * Start seeding a new torrent. + * + * `input` can be any of the following: + * - path to the file or folder on filesystem (string) + * - W3C File object (from an `<input>` or drag and drop) + * - W3C FileList object (basically an array of `File` objects) + * - Array of `File` objects + * + * @param {string|File|FileList|Array.<File>|Blob|Array.<Blob>} input + * @param {Object} opts + * @param {function} onseed + */ +WebTorrent.prototype.seed = function (input, opts, onseed) { + var self = this + if (typeof opts === 'function') { + onseed = opts + opts = {} + } + // TODO: support `input` as filesystem path string + var buffer = Buffer.concat(input.map(function (file) { + return file.buffer + })) + + var torrent + function clientOnSeed (_torrent) { + if (torrent.infoHash === _torrent.infoHash) { + onseed(torrent) + self.removeListener('seed', clientOnSeed) + } + } + if (onseed) self.on('seed', clientOnSeed) + + createTorrent(input, opts, function (err, torrentBuf) { + if (err) return self.emit('error', err) + var parsedTorrent = parseTorrent(torrentBuf) + self.add(torrentBuf, opts, function (_torrent) { + torrent = _torrent + Storage.writeToStorage( + torrent.storage, + buffer, + parsedTorrent.pieceLength, + function (err) { + if (err) return self.emit('error', err) + self.emit('seed', torrent) + }) + }) + }) +} + +/** + * Remove a torrent from the client. + * + * @param {string|Buffer} torrentId + * @param {function} cb + */ +WebTorrent.prototype.remove = function (torrentId, cb) { + var self = this + var torrent = self.get(torrentId) + if (!torrent) throw new Error('No torrent with id ' + torrentId) + debug('remove') + self.torrents.splice(self.torrents.indexOf(torrent), 1) + torrent.destroy(cb) } /** @@ -75,25 +261,30 @@ WebTorrent.prototype.download = function (torrentId, opts, ontorrent) { WebTorrent.prototype.destroy = function (cb) { var self = this debug('destroy') - var tasks = [ - Client.prototype.destroy.bind(self) - ] - - if (self.server) { - tasks.push(function (cb) { - try { - self.server.close(cb) - } catch (err) { - // ignore error, server was already closed or not listening - cb(null) - } - }) - } + + var tasks = self.torrents.map(function (torrent) { + return function (cb) { + self.remove(torrent.infoHash, cb) + } + }) + + if (self.dht) tasks.push(function (cb) { + self.dht.destroy(cb) + }) + + if (self.server) tasks.push(function (cb) { + try { + self.server.close(cb) + } catch (err) { + // ignore error, server was already closed or not listening + cb(null) + } + }) parallel(tasks, cb) - return self } +// TODO: this probably belongs in cmd.js WebTorrent.prototype._onTorrent = function (torrent) { var self = this debug('on torrent') diff --git a/lib/file-stream.js b/lib/file-stream.js new file mode 100644 index 0000000..6d86ea8 --- /dev/null +++ b/lib/file-stream.js @@ -0,0 +1,99 @@ +module.exports = FileStream + +var debug = require('debug')('webtorrent:file-stream') +var inherits = require('inherits') +var stream = require('stream') + +inherits(FileStream, stream.Readable) + +/** + * A readable stream of a torrent file. + * + * @param {Object} file + * @param {number} opts.start stream slice of file, starting from this byte (inclusive) + * @param {number} opts.end stream slice of file, ending with this byte (inclusive) + * @param {number} opts.pieceLength length of an individual piece + */ +function FileStream (file, opts) { + var self = this + if (!(self instanceof FileStream)) return new FileStream(file, opts) + stream.Readable.call(self, opts) + debug('new filestream %s', JSON.stringify(opts)) + + if (!opts) opts = {} + if (!opts.start) opts.start = 0 + if (!opts.end) opts.end = file.length - 1 + + self.length = opts.end - opts.start + 1 + + var offset = opts.start + file.offset + var pieceLength = opts.pieceLength + + self.startPiece = offset / pieceLength | 0 + self.endPiece = (opts.end + file.offset) / pieceLength | 0 + + self._storage = file.storage + self._piece = self.startPiece + self._missing = self.length + self._reading = false + self._notifying = false + self._destroyed = false + self._criticalLength = Math.min((1024 * 1024 / pieceLength) | 0, 2) + self._offset = offset - (self.startPiece * pieceLength) +} + +FileStream.prototype._read = function () { + debug('_read') + var self = this + if (self._reading) return + self._reading = true + self.notify() +} + +FileStream.prototype.notify = function () { + debug('notify') + var self = this + + if (!self._reading || self._missing === 0) return + if (!self._storage.bitfield.get(self._piece)) + return self._storage.emit('critical', self._piece, self._piece + self._criticalLength) + + if (self._notifying) return + self._notifying = true + + var p = self._piece + debug('before read %s', p) + self._storage.read(self._piece++, function (err, buffer) { + debug('after read %s (buffer.length %s) (err %s)', p, buffer.length, (err && err.message) || err) + self._notifying = false + + if (self._destroyed) return + + if (err) { + self._storage.emit('error', err) + return self.destroy(err) + } + + if (self._offset) { + buffer = buffer.slice(self._offset) + self._offset = 0 + } + + if (self._missing < buffer.length) { + buffer = buffer.slice(0, self._missing) + } + self._missing -= buffer.length + + debug('pushing buffer of length %s', buffer.length) + self._reading = false + self.push(buffer) + + if (self._missing === 0) self.push(null) + }) +} + +FileStream.prototype.destroy = function () { + var self = this + if (self._destroyed) return + self._destroyed = true +} diff --git a/lib/fs-storage.js b/lib/fs-storage.js index 068a024..7afe92e 100644 --- a/lib/fs-storage.js +++ b/lib/fs-storage.js @@ -1,14 +1,14 @@ module.exports = FSStorage -var Storage = require('bittorrent-client').Storage -var inherits = require('inherits') var extend = require('extend.js') -var os = require('os') var fs = require('fs') +var inherits = require('inherits') +var mkdirp = require('mkdirp') +var os = require('os') var path = require('path') var raf = require('random-access-file') -var mkdirp = require('mkdirp') var rimraf = require('rimraf') +var Storage = require('./storage') var thunky = require('thunky') var TMP = fs.existsSync('/tmp') ? '/tmp' : os.tmpDir() diff --git a/lib/rarity-map.js b/lib/rarity-map.js new file mode 100644 index 0000000..2d7a2ce --- /dev/null +++ b/lib/rarity-map.js @@ -0,0 +1,86 @@ +module.exports = RarityMap + +/** + * Mapping of torrent pieces to their respective availability in the swarm. Used by + * the torrent manager for implementing the rarest piece first selection strategy. + * + * @param {Swarm} swarm bittorrent-swarm to track availability + * @param {number} numPieces number of pieces in the torrent + */ +function RarityMap (swarm, numPieces) { + var self = this + + self.swarm = swarm + self.numPieces = numPieces + + function initWire (wire) { + wire.on('have', function (index) { + self.pieces[index]++ + }) + wire.on('bitfield', self.recalculate.bind(self)) + wire.on('close', function () { + for (var i = 0; i < self.numPieces; ++i) { + self.pieces[i] -= wire.peerPieces.get(i) + } + }) + } + + self.swarm.wires.forEach(initWire) + self.swarm.on('wire', function (wire) { + self.recalculate() + initWire(wire) + }) + + self.recalculate() +} + +/** + * Recalculates piece availability across all peers in the swarm. + */ +RarityMap.prototype.recalculate = function () { + var self = this + + self.pieces = [] + for (var i = 0; i < self.numPieces; ++i) { + self.pieces[i] = 0 + } + + self.swarm.wires.forEach(function (wire) { + for (var i = 0; i < self.numPieces; ++i) { + self.pieces[i] += wire.peerPieces.get(i) + } + }) +} + +/** + * Get the index of the rarest piece. Optionally, pass a filter function to exclude + * certain pieces (for instance, those that we already have). + * + * @param {function} pieceFilterFunc + * @return {number} index of rarest piece, or -1 + */ +RarityMap.prototype.getRarestPiece = function (pieceFilterFunc) { + var self = this + var candidates = [] + var min = Infinity + pieceFilterFunc = pieceFilterFunc || function () { return true } + + for (var i = 0; i < self.numPieces; ++i) { + if (!pieceFilterFunc(i)) continue + + var availability = self.pieces[i] + if (availability === min) { + candidates.push(i) + } else if (availability < min) { + candidates = [ i ] + min = availability + } + } + + if (candidates.length > 0) { + // if there are multiple pieces with the same availability, choose one randomly + return candidates[Math.random() * candidates.length | 0] + } else { + return -1 + } +} diff --git a/lib/storage.js b/lib/storage.js new file mode 100644 index 0000000..45839d4 --- /dev/null +++ b/lib/storage.js @@ -0,0 +1,554 @@ +module.exports = Storage + +var BitField = require('bitfield') +var BlockStream = require('block-stream') +var debug = require('debug')('webtorrent:storage') +var dezalgo = require('dezalgo') +var eos = require('end-of-stream') +var EventEmitter = require('events').EventEmitter +var extend = require('extend.js') +var FileStream = require('./file-stream') +var inherits = require('inherits') +var sha1 = require('git-sha1') +var stream = require('stream') + +var BLOCK_LENGTH = 16 * 1024 + +var BLOCK_BLANK = 0 +var BLOCK_RESERVED = 1 +var BLOCK_WRITTEN = 2 +function noop () {} + +inherits(Piece, EventEmitter) + +/** + * A torrent piece + * + * @param {number} index piece index + * @param {string} hash sha1 hash (hex) for this piece + * @param {Buffer|number} buffer backing buffer for this piece or length of piece if the backing buffer is lazy + */ +function Piece (index, hash, buffer) { + var self = this + EventEmitter.call(self) + + self.index = index + self.hash = hash + + if (typeof buffer === 'number') { + // alloc buffer lazily + self.buffer = null + self.length = buffer + } else { + // use buffer provided + self.buffer = buffer + self.length = buffer.length + } + + self._reset() +} + +Piece.prototype.readBlock = function (offset, length, cb) { + var self = this + cb = dezalgo(cb) + if (!self.buffer || !self._verifyOffset(offset)) { + return cb(new Error('invalid block offset ' + offset)) + } + cb(null, self.buffer.slice(offset, offset + length)) +} + +Piece.prototype.writeBlock = function (offset, buffer, cb) { + var self = this + cb = dezalgo(cb) + if (!self._verifyOffset(offset) || !self._verifyBlock(offset, buffer)) { + return cb(new Error('invalid block ' + offset + ':' + buffer.length)) + } + self._lazyAllocBuffer() + + var i = offset / BLOCK_LENGTH + if (self.blocks[i] === BLOCK_WRITTEN) { + return cb(null) + } + + buffer.copy(self.buffer, offset) + self.blocks[i] = BLOCK_WRITTEN + self.blocksWritten += 1 + + if (self.blocksWritten === self.blocks.length) { + self.verify() + } + + cb(null) +} + +Piece.prototype.reserveBlock = function (endGame) { + var self = this + var len = self.blocks.length + for (var i = 0; i < len; i++) { + if ((self.blocks[i] && !endGame) || self.blocks[i] === BLOCK_WRITTEN) { + continue + } + self.blocks[i] = BLOCK_RESERVED + return { + offset: i * BLOCK_LENGTH, + length: (i === len - 1) + ? self.length - (i * BLOCK_LENGTH) + : BLOCK_LENGTH + } + } + return null +} + +Piece.prototype.cancelBlock = function (offset) { + var self = this + if (!self.buffer || !self._verifyOffset(offset)) { + return false + } + + var i = offset / BLOCK_LENGTH + if (self.blocks[i] === BLOCK_RESERVED) { + self.blocks[i] = BLOCK_BLANK + } + + return true +} + +Piece.prototype._reset = function () { + var self = this + self.verified = false + self.blocks = new Buffer(Math.ceil(self.length / BLOCK_LENGTH)) + self.blocks.fill(0) + self.blocksWritten = 0 +} + +Piece.prototype.verify = function (buffer) { + var self = this + buffer = buffer || self.buffer + if (self.verified || !buffer) { + return + } + + self.verified = (sha1(buffer) === self.hash) + if (self.verified) { + self.emit('done') + } else { + self.emit('warning', new Error('piece ' + self.index + ' failed verification; ' + sha1(buffer) + ' expected ' + self.hash)) + self._reset() + } +} + +Piece.prototype._verifyOffset = function (offset) { + var self = this + if (offset % BLOCK_LENGTH === 0) { + return true + } else { + self.emit('warning', new Error('piece ' + self.index + ' invalid offset ' + offset + ' not multiple of ' + BLOCK_LENGTH + ' bytes')) + return false + } +} + +Piece.prototype._verifyBlock = function (offset, buffer) { + var self = this + if (buffer.length === BLOCK_LENGTH) { + // normal block length + return true + + } else if (buffer.length === self.length - offset + && self.length - offset < BLOCK_LENGTH) { + // last block in piece is allowed to be less than block length + return true + } else { + self.emit('warning', new Error('piece ' + self.index + ' invalid block of size ' + buffer.length + ' bytes')) + return false + } +} + +Piece.prototype._lazyAllocBuffer = function () { + var self = this + if (!self.buffer) { + self.buffer = new Buffer(self.length) + } +} + +inherits(File, EventEmitter) + +/** + * A torrent file + * + * @param {Storage} storage Storage container object + * @param {Object} file the file object from the parsed torrent + * @param {Array.<Piece>} pieces backing pieces for this file + * @param {number} pieceLength the length in bytes of a non-terminal piece + */ +function File (storage, file, pieces, pieceLength) { + var self = this + EventEmitter.call(self) + + self.storage = storage + self.name = file.name + self.path = file.path + self.length = file.length + self.offset = file.offset + self.pieces = pieces + self.pieceLength = pieceLength + + self.done = false + + self.pieces.forEach(function (piece) { + piece.on('done', function () { + self._checkDone() + }) + }) + + // if the file is zero-length, it will be done upon initialization + self._checkDone() +} + +/** + * Selects the file to be downloaded, but at a lower priority than files with streams. + * Useful if you know you need the file at a later stage. + */ +File.prototype.select = function () { + var self = this + if (self.pieces.length > 0) { + self.storage.emit('select', self.pieces[0].index, self.pieces[self.pieces.length - 1].index, false) + } +} + +/** + * Deselects the file, which means it won't be downloaded unless someone creates a stream + * for it. + */ +File.prototype.deselect = function () { + var self = this + if (self.pieces.length > 0) { + self.storage.emit('deselect', self.pieces[0].index, self.pieces[self.pieces.length - 1].index, false) + } +} + +/** + * Create a readable stream to the file. Pieces needed by the stream will be prioritized + * highly and fetched from the swarm first. + * + * @param {Object} opts + * @param {number} opts.start stream slice of file, starting from this byte (inclusive) + * @param {number} opts.end stream slice of file, ending with this byte (inclusive) + * @return {stream.Readable} + */ +File.prototype.createReadStream = function (opts) { + var self = this + debug('createReadStream') + opts = extend({ + pieceLength: self.pieceLength + }, opts) + + var stream = new FileStream(self, opts) + self.storage.emit('select', stream.startPiece, stream.endPiece, true, stream.notify.bind(stream)) + eos(stream, function () { + self.storage.emit('deselect', stream.startPiece, stream.endPiece, true) + }) + + return stream +} + +File.prototype._checkDone = function () { + var self = this + self.done = self.pieces.every(function (piece) { + return piece.verified + }) + + if (self.done) { + process.nextTick(function () { + self.emit('done') + }) + } +} + +inherits(Storage, EventEmitter) + +/** + * Storage for a torrent download. Handles the complexities of reading and writing + * to pieces and files. + * + * @param {Object} parsedTorrent + * @param {Object} opts + */ +function Storage (parsedTorrent, opts) { + var self = this + EventEmitter.call(self) + opts = opts || {} + + self.bitfield = new BitField(parsedTorrent.pieces.length) + + self.done = false + self.closed = false + self.readonly = true + + if (!opts.nobuffer) { + self.buffer = new Buffer(parsedTorrent.length) + } + + var pieceLength = parsedTorrent.pieceLength + var lastPieceLength = parsedTorrent.lastPieceLength + var numPieces = parsedTorrent.pieces.length + + self.pieces = parsedTorrent.pieces.map(function (hash, index) { + var start = index * pieceLength + var end = start + (index === numPieces - 1 ? lastPieceLength : pieceLength) + + // if we're backed by a buffer, the piece's buffer will reference the same memory. + // otherwise, the piece's buffer will be lazily created on demand + var buffer = (self.buffer ? self.buffer.slice(start, end) : end - start) + + var piece = new Piece(index, hash, buffer) + piece.on('done', self._onPieceDone.bind(self, piece)) + return piece + }) + + self.files = parsedTorrent.files.map(function (fileObj) { + var start = fileObj.offset + var end = start + fileObj.length - 1 + + var startPiece = start / pieceLength | 0 + var endPiece = end / pieceLength | 0 + var pieces = self.pieces.slice(startPiece, endPiece + 1) + + var file = new File(self, fileObj, pieces, pieceLength) + file.on('done', self._onFileDone.bind(self, file)) + return file + }) +} + +Storage.BLOCK_LENGTH = BLOCK_LENGTH + +Storage.writeToStorage = function (storage, buf, pieceLength, cb) { + var pieceIndex = 0 + var bufStream = new stream.Readable() + bufStream._read = function () {} + bufStream + .pipe(new BlockStream(pieceLength, { nopad: true })) + .on('data', function (piece) { + var index = pieceIndex + pieceIndex += 1 + + var blockIndex = 0 + var s = new BlockStream(BLOCK_LENGTH, { nopad: true }) + s.on('data', function (block) { + var offset = blockIndex * BLOCK_LENGTH + blockIndex += 1 + + storage.writeBlock(index, offset, block) + }) + s.write(piece) + s.end() + }) + .on('end', function () { + cb(null) + }) + .on('error', function (err) { + cb(err) + }) + + bufStream.push(buf) + bufStream.push(null) +} + +Object.defineProperty(Storage.prototype, 'downloaded', { + get: function () { + var self = this + return self.pieces.reduce(function (total, piece) { + return total + (piece.verified ? piece.length : piece.blocksWritten * BLOCK_LENGTH) + }, 0) + } +}) + +/** + * The number of missing pieces. Used to implement 'end game' mode. + */ +Object.defineProperty(Storage.prototype, 'numMissing', { + get: function () { + var self = this + var numMissing = self.pieces.length + for (var index = 0, len = self.pieces.length; index < len; index++) { + numMissing -= self.bitfield.get(index) + } + return numMissing + } +}) + +/** + * Reads a block from a piece. + * + * @param {number} index piece index + * @param {number} offset byte offset within piece + * @param {number} length length in bytes to read from piece + * @param {function} cb + */ +Storage.prototype.readBlock = function (index, offset, length, cb) { + var self = this + cb = dezalgo(cb) + var piece = self.pieces[index] + if (!piece) return cb(new Error('invalid piece index ' + index)) + piece.readBlock(offset, length, cb) +} + +/** + * Writes a block to a piece. + * + * @param {number} index piece index + * @param {number} offset byte offset within piece + * @param {Buffer} buffer buffer to write + * @param {function} cb + */ +Storage.prototype.writeBlock = function (index, offset, buffer, cb) { + var self = this + if (!cb) cb = noop + cb = dezalgo(cb) + + if (self.readonly) return cb(new Error('cannot write to readonly storage')) + var piece = self.pieces[index] + if (!piece) return cb(new Error('invalid piece index ' + index)) + piece.writeBlock(offset, buffer, cb) +} + +/** + * Reads a piece or a range of a piece. + * + * @param {number} index piece index + * @param {Object=} range optional range within piece + * @param {number} range.offset byte offset within piece + * @param {number} range.length length in bytes to read from piece + * @param {function} cb + * @param {boolean} force optionally overrides default check preventing reading + * from unverified piece + */ +Storage.prototype.read = function (index, range, cb, force) { + var self = this + + if (typeof range === 'function') { + force = cb + cb = range + range = null + } + cb = dezalgo(cb) + + var piece = self.pieces[index] + if (!piece) { + return cb(new Error('invalid piece index ' + index)) + } + + if (!piece.verified && !force) { + return cb(new Error('Storage.read called on incomplete piece ' + index)) + } + + var offset = 0 + var length = piece.length + + if (range) { + offset = range.offset || 0 + length = range.length || length + } + + if (piece.buffer) { + // shortcut for piece with static backing buffer + return cb(null, piece.buffer.slice(offset, offset + length)) + } + + var blocks = [] + function readNextBlock () { + if (length <= 0) return cb(null, Buffer.concat(blocks)) + + var blockOffset = offset + var blockLength = Math.min(BLOCK_LENGTH, length) + + offset += blockLength + length -= blockLength + + self.readBlock(index, blockOffset, blockLength, function (err, block) { + if (err) return cb(err) + + blocks.push(block) + readNextBlock() + }) + } + + readNextBlock() +} + +/** + * Reserves a block from the given piece. + * + * @param {number} index piece index + * @param {Boolean} endGame whether or not end game mode is enabled + * + * @returns {Object|null} reservation with offset and length or null if failed. + */ +Storage.prototype.reserveBlock = function (index, endGame) { + var self = this + var piece = self.pieces[index] + if (!piece) return null + + return piece.reserveBlock(endGame) +} + +/** + * Cancels a previous block reservation from the given piece. + * + * @param {number} index piece index + * @param {number} offset byte offset of block in piece + * + * @returns {Boolean} + */ +Storage.prototype.cancelBlock = function (index, offset) { + var self = this + var piece = self.pieces[index] + if (!piece) return false + + return piece.cancelBlock(offset) +} + +/** + * Removes and cleans up any backing store for this storage. + * @param {function=} cb + */ +Storage.prototype.remove = function (cb) { + if (cb) dezalgo(cb)(null) +} + +/** + * Closes the backing store for this storage. + * @param {function=} cb + */ +Storage.prototype.close = function (cb) { + var self = this + self.closed = true + if (cb) dezalgo(cb)(null) +} + +// +// HELPER METHODS +// + +Storage.prototype._onPieceDone = function (piece) { + var self = this + self.bitfield.set(piece.index) + debug('piece done ' + piece.index + ' (' + self.numMissing + ' still missing)') + self.emit('piece', piece) +} + +Storage.prototype._onFileDone = function (file) { + var self = this + debug('file done ' + file.name) + self.emit('file', file) + + self._checkDone() +} + +Storage.prototype._checkDone = function () { + var self = this + + if (!self.done && self.files.every(function (file) { return file.done })) { + self.done = true + self.emit('done') + } +} diff --git a/lib/torrent.js b/lib/torrent.js new file mode 100644 index 0000000..039f0e9 --- /dev/null +++ b/lib/torrent.js @@ -0,0 +1,991 @@ +module.exports = Torrent + +var addrToIPPort = require('addr-to-ip-port') +var concat = require('concat-stream') // browser exclude +var debug = require('debug')('webtorrent:torrent') +var Discovery = require('torrent-discovery') +var EventEmitter = require('events').EventEmitter +var fs = require('fs') // browser exclude +var hh = require('http-https') // browser exclude +var inherits = require('inherits') +var parallel = require('run-parallel') +var parseTorrent = require('parse-torrent') +var RarityMap = require('./rarity-map') +var reemit = require('re-emitter') +var Storage = require('./storage') +var Swarm = require('bittorrent-swarm') // `webtorrent-swarm` in browser +var ut_metadata = require('ut_metadata') +var ut_pex = require('ut_pex') // browser exclude + +var MAX_BLOCK_LENGTH = 128 * 1024 +var MAX_OUTSTANDING_REQUESTS = 5 +var PIECE_TIMEOUT = 10000 +var CHOKE_TIMEOUT = 5000 +var SPEED_THRESHOLD = 3 * Storage.BLOCK_LENGTH + +var RECHOKE_INTERVAL = 10000 // 10 seconds +var RECHOKE_OPTIMISTIC_DURATION = 2 // 30 seconds + +function noop () {} + +inherits(Torrent, EventEmitter) + +/** + * A torrent + * + * @param {string|Buffer|Object} torrentId + * @param {Object} opts + */ +function Torrent (torrentId, opts) { + var self = this + EventEmitter.call(self) + debug('new torrent') + + self.client = opts.client + + self.hotswapEnabled = ('hotswap' in opts ? opts.hotswap : true) + self.verify = opts.verify + self.storageOpts = opts.storageOpts + + self.chokeTimeout = opts.chokeTimeout || CHOKE_TIMEOUT + self.pieceTimeout = opts.pieceTimeout || PIECE_TIMEOUT + self.strategy = opts.strategy || 'sequential' + + self._rechokeNumSlots = (opts.uploads === false || opts.uploads === 0) ? 0 : (+opts.uploads || 10) + self._rechokeOptimisticWire = null + self._rechokeOptimisticTime = 0 + self._rechokeIntervalId = null + + self.ready = false + self.files = [] + self.metadata = null + self.parsedTorrent = null + self.storage = null + self.numBlockedPeers = 0 + self._amInterested = false + self._destroyed = false + self._selections = [] + self._critical = [] + self._storageImpl = opts.storage || Storage + + var parsedTorrent = parseTorrent(torrentId) + if (parsedTorrent && parsedTorrent.infoHash) { + onTorrentId(parsedTorrent) + + } else if (typeof hh.get === 'function' && /^https?:/.test(torrentId)) { + // http or https url to torrent file + hh.get(torrentId, function (res) { + res.pipe(concat(function (torrent) { + onTorrentId(torrent) + })) + }).on('error', function (err) { + self.emit('error', new Error('error downloading torrent: ' + err.message)) + }) + + } else if (typeof fs.readFile === 'function') { + // assume it's a filesystem path + fs.readFile(torrentId, function (err, torrent) { + if (err) return self.emit('error', new Error('invalid torrent id')) + onTorrentId(torrent) + }) + + } else throw new Error('invalid torrent id') + + function onTorrentId (torrentId) { + parsedTorrent = parseTorrent(torrentId) + self.infoHash = parsedTorrent.infoHash + if (parsedTorrent.name) self.name = parsedTorrent.name // preliminary name + + // create swarm + self.swarm = new Swarm(self.infoHash, self.client.peerId, { + handshake: { dht: !!self.client.dht } + }) + reemit(self.swarm, self, ['warning', 'error']) + self.swarm.on('wire', self._onWire.bind(self)) + + // update overall client stats + self.swarm.on('download', self.client.downloadSpeed.bind(self.client)) + self.swarm.on('upload', self.client.uploadSpeed.bind(self.client)) + + if (process.browser) { + // in browser, swarm does not listen + self._onSwarmListening(parsedTorrent) + } else { + // listen for peers + self.swarm.listen(self.client.torrentPort, self._onSwarmListening.bind(self, parsedTorrent)) + } + process.nextTick(function () { + self.emit('infoHash') + }) + } +} + +// torrent size (in bytes) +Object.defineProperty(Torrent.prototype, 'length', { + get: function () { + return (this.parsedTorrent && this.parsedTorrent.length) || 0 + } +}) + +// time remaining (in milliseconds) +Object.defineProperty(Torrent.prototype, 'timeRemaining', { + get: function () { + if (this.swarm.downloadSpeed() === 0) return Infinity + else return ((this.length - this.downloaded) / this.swarm.downloadSpeed()) * 1000 + } +}) + +// percentage complete, represented as a number between 0 and 1 +Object.defineProperty(Torrent.prototype, 'progress', { + get: function () { + return (this.parsedTorrent && (this.downloaded / this.parsedTorrent.length)) || 0 + } +}) + +// bytes downloaded (not necessarily verified) +Object.defineProperty(Torrent.prototype, 'downloaded', { + get: function () { + return (this.storage && this.storage.downloaded) || 0 + } +}) + +// bytes uploaded +Object.defineProperty(Torrent.prototype, 'uploaded', { + get: function () { + return this.swarm.uploaded + } +}) + +// ratio of bytes downloaded to uploaded +Object.defineProperty(Torrent.prototype, 'ratio', { + get: function () { + return (this.uploaded && (this.downloaded / this.uploaded)) || 0 + } +}) + +Torrent.prototype._onSwarmListening = function (parsed, port) { + var self = this + if (self._destroyed) return + + self.client.torrentPort = port + + // begin discovering peers via the DHT and tracker servers + self.discovery = new Discovery({ + announce: parsed.announce, + dht: self.client.dht, + tracker: self.client.tracker, + peerId: self.client.peerId, + port: port + }) + self.discovery.setTorrent(self.infoHash) + self.discovery.on('peer', self.addPeer.bind(self)) + + // expose discovery events + reemit(self.discovery, self, ['dhtAnnounce', 'warning', 'error']) + + // if full metadata was included in initial torrent id, use it + if (parsed.info) self._onMetadata(parsed) + + self.emit('listening', port) +} + +/** + * Called when the metadata is received. + */ +Torrent.prototype._onMetadata = function (metadata) { + var self = this + if (self.metadata || self._destroyed) return + debug('got metadata') + + if (metadata && metadata.infoHash) { + // `metadata` is a parsed torrent (from parse-torrent module) + self.metadata = parseTorrent.toBuffer(metadata) + self.parsedTorrent = metadata + } else { + self.metadata = metadata + try { + self.parsedTorrent = parseTorrent(self.metadata) + } catch (err) { + return self.emit('error', err) + } + } + + // update preliminary torrent name + self.name = self.parsedTorrent.name + + // update discovery module with full torrent metadata + self.discovery.setTorrent(self.parsedTorrent) + + self.rarityMap = new RarityMap(self.swarm, self.parsedTorrent.pieces.length) + + self.storage = new self._storageImpl(self.parsedTorrent, self.storageOpts) + self.storage.on('piece', self._onStoragePiece.bind(self)) + self.storage.on('file', function (file) { + self.emit('file', file) + }) + + self._reservations = self.storage.pieces.map(function () { + return [] + }) + + self.storage.on('done', function () { + if (self.discovery.tracker) + self.discovery.tracker.complete() + + debug('torrent ' + self.infoHash + ' done') + self.emit('done') + }) + + self.storage.on('select', self.select.bind(self)) + self.storage.on('deselect', self.deselect.bind(self)) + self.storage.on('critical', self.critical.bind(self)) + + self.storage.files.forEach(function (file) { + self.files.push(file) + }) + + self.swarm.wires.forEach(function (wire) { + // If we didn't have the metadata at the time ut_metadata was initialized for this + // wire, we still want to make it available to the peer in case they request it. + if (wire.ut_metadata) wire.ut_metadata.setMetadata(self.metadata) + + self._onWireWithMetadata(wire) + }) + + if (self.verify) { + process.nextTick(function () { + debug('verifying existing torrent data') + var numPieces = 0 + var numVerified = 0 + + // TODO: move storage verification to storage.js? + parallel(self.storage.pieces.map(function (piece) { + return function (cb) { + self.storage.read(piece.index, function (err, buffer) { + numPieces += 1 + self.emit('verifying', { + percentDone: 100 * numPieces / self.storage.pieces.length, + percentVerified: 100 * numVerified / self.storage.pieces.length, + }) + + if (!err && buffer) { + // TODO: this is a bit hacky; figure out a cleaner way of verifying the buffer + piece.verify(buffer) + numVerified += piece.verified + debug('piece ' + (piece.verified ? 'verified' : 'invalid') + ' ' + piece.index) + } + // continue regardless of whether piece verification failed + cb() + }, true) // forces override to allow reading from non-verified pieces + } + }), self._onStorage.bind(self)) + }) + } else { + process.nextTick(self._onStorage.bind(self)) + } + + process.nextTick(function () { + self.emit('metadata') + }) +} + +/** + * Destroy and cleanup this torrent. + */ +Torrent.prototype.destroy = function (cb) { + var self = this + debug('destroy') + self._destroyed = true + clearInterval(self._rechokeIntervalId) + + var tasks = [] + if (self.swarm) tasks.push(function (cb) { + self.swarm.destroy(cb) + }) + if (self.discovery) tasks.push(function (cb) { + self.discovery.stop(cb) + }) + if (self.storage) tasks.push(function (cb) { + self.storage.close(cb) + }) + parallel(tasks, cb) +} + +/** + * Add a peer to the swarm + * @param {string|SimplePeer} peer + */ +Torrent.prototype.addPeer = function (peer) { + var self = this + + // TODO: extract IP address from peer object and check blocklist + if (typeof peer === 'string' && + self.client.blocked && self.client.blocked.contains(addrToIPPort(peer)[0])) { + self.numBlockedPeers += 1 + self.emit('blocked-peer', peer) + } else { + self.emit('peer', peer) + self.swarm.addPeer(peer) + } +} + +/** + * Select a range of pieces to prioritize. + * + * @param {number} start start piece index (inclusive) + * @param {number} end end piece index (inclusive) + * @param {number} priority priority associated with this selection + * @param {function} notify callback when selection is updated with new data + */ +Torrent.prototype.select = function (start, end, priority, notify) { + var self = this + if (start > end || start < 0 || end >= self.storage.pieces.length) + throw new Error('invalid selection ', start, ':', end) + priority = Number(priority) || 0 + + debug('select %s-%s (priority %s)', start, end, priority) + + self._selections.push({ + from: start, + to: end, + offset: 0, + priority: priority, + notify: notify || noop + }) + + self._selections.sort(function (a, b) { + return b.priority - a.priority + }) + + self._updateSelections() +} + +/** + * Deprioritizes a range of previously selected pieces. + * + * @param {number} start start piece index (inclusive) + * @param {number} end end piece index (inclusive) + * @param {number} priority priority associated with the selection + */ +Torrent.prototype.deselect = function (start, end, priority) { + var self = this + priority = Number(priority) || 0 + debug('deselect %s-%s (priority %s)', start, end, priority) + + for (var i = 0; i < self._selections.length; ++i) { + var s = self._selections[i] + if (s.from === start && s.to === end && s.priority === priority) { + self._selections.splice(i--, 1) + break + } + } + + self._updateSelections() +} + +/** + * Marks a range of pieces as critical priority to be downloaded ASAP. + * + * @param {number} start start piece index (inclusive) + * @param {number} end end piece index (inclusive) + */ +Torrent.prototype.critical = function (start, end) { + var self = this + debug('critical %s-%s', start, end) + + for (var i = start; i <= end; ++i) { + self._critical[i] = true + } + + self._updateSelections() +} + +Torrent.prototype._onWire = function (wire) { + var self = this + + // use ut_metadata extension + wire.use(ut_metadata(self.metadata)) + + if (!self.metadata) { + wire.ut_metadata.on('metadata', function (metadata) { + debug('got metadata via ut_metadata') + self._onMetadata(metadata) + }) + wire.ut_metadata.fetch() + } + + // use ut_pex extension + if (typeof ut_pex === 'function') wire.use(ut_pex()) + + //wire.ut_pex.start() // TODO two-way communication + if (wire.ut_pex) wire.ut_pex.on('peer', function (peer) { + debug('got peer via ut_pex ' + peer) + self.addPeer(peer) + }) + + if (wire.ut_pex) wire.ut_pex.on('dropped', function (peer) { + // the remote peer believes a given peer has been dropped from the swarm. + // if we're not currently connected to it, then remove it from the swarm's queue. + if (!(peer in self.swarm._peers)) self.swarm.removePeer(peer) + }) + + // Send KEEP-ALIVE (every 60s) so peers will not disconnect the wire + wire.setKeepAlive(true) + + // If peer supports DHT, send PORT message to report DHT node listening port + if (wire.peerExtensions.dht && self.client.dht && self.client.dht.port) { + wire.port(self.client.dht.port) + } + + // When peer sends PORT, add them to the routing table + wire.on('port', function (port) { + debug('port message from ' + wire.remoteAddress) + // TODO: dht should support adding a node when you don't know the nodeId + // dht.addNode(wire.remoteAddress + ':' + port) + }) + + wire.on('timeout', function () { + debug('wire timeout from ' + wire.remoteAddress) + // TODO: this might be destroying wires too eagerly + wire.destroy() + }) + + // Timeout for piece requests to this peer + wire.setTimeout(self.pieceTimeout) + + if (self.metadata) { + self._onWireWithMetadata(wire) + } +} + +Torrent.prototype._onWireWithMetadata = function (wire) { + var self = this + var timeoutId = null + var timeoutMs = self.chokeTimeout + + function onChokeTimeout () { + if (self._destroyed || wire._destroyed) return + + if (self.swarm.numQueued > 2 * (self.swarm.numConns - self.swarm.numPeers) && wire.amInterested) { + wire.destroy() + } else { + timeoutId = setTimeout(onChokeTimeout, timeoutMs) + } + } + + var i = 0 + function updateSeedStatus () { + if (wire.peerPieces.length !== self.storage.pieces.length) return + for (; i < self.storage.pieces.length; ++i) { + if (!wire.peerPieces.get(i)) return + } + wire.isSeeder = true + wire.choke() // always choke seeders + } + + wire.on('bitfield', function () { + updateSeedStatus() + self._update() + }) + + wire.on('have', function () { + updateSeedStatus() + self._update() + }) + + wire.once('interested', function () { + wire.unchoke() + }) + + wire.on('close', function () { + clearTimeout(timeoutId) + }) + + wire.on('choke', function () { + clearTimeout(timeoutId) + timeoutId = setTimeout(onChokeTimeout, timeoutMs) + }) + + wire.on('unchoke', function () { + clearTimeout(timeoutId) + self._update() + }) + + wire.on('request', function (index, offset, length, cb) { + // Disconnect from peers that request more than 128KB, per spec + if (length > MAX_BLOCK_LENGTH) { + debug(wire.remoteAddress, 'requested invalid block size', length) + return wire.destroy() + } + + self.storage.readBlock(index, offset, length, cb) + }) + + wire.bitfield(self.storage.bitfield) // always send bitfield (required) + wire.interested() // always start out interested + + timeoutId = setTimeout(onChokeTimeout, timeoutMs) + + wire.isSeeder = false + updateSeedStatus() +} + +/** + * Called when the metadata, swarm, and underlying storage are all fully initialized. + */ +Torrent.prototype._onStorage = function () { + var self = this + debug('on storage') + + // allow writes to storage only after initial piece verification is finished + self.storage.readonly = false + + // start off selecting the entire torrent with low priority + self.select(0, self.storage.pieces.length - 1, false) + + self._rechokeIntervalId = setInterval(self._rechoke.bind(self), RECHOKE_INTERVAL) + self._rechokeIntervalId.unref && self._rechokeIntervalId.unref() + + process.nextTick(function () { + self.ready = true + self.emit('ready') + }) +} + +/** + * When a piece is fully downloaded, notify all peers with a HAVE message. + * @param {Piece} piece + */ +Torrent.prototype._onStoragePiece = function (piece) { + var self = this + debug('piece done %s', piece.index) + self._reservations[piece.index] = null + + self.swarm.wires.forEach(function (wire) { + wire.have(piece.index) + }) + + self._gcSelections() +} + +/** + * Called on selection changes. + */ +Torrent.prototype._updateSelections = function () { + var self = this + if (!self.swarm || self._destroyed) return + if (!self.metadata) return self.once('metadata', self._updateSelections.bind(self)) + + process.nextTick(self._gcSelections.bind(self)) + self._updateInterest() + self._update() +} + +/** + * Garbage collect selections with respect to the storage's current state. + */ +Torrent.prototype._gcSelections = function () { + var self = this + + for (var i = 0; i < self._selections.length; i++) { + var s = self._selections[i] + var oldOffset = s.offset + + // check for newly downloaded pieces in selection + while (self.storage.bitfield.get(s.from + s.offset) && s.from + s.offset < s.to) { + s.offset++ + } + + if (oldOffset !== s.offset) s.notify() + if (s.to !== s.from + s.offset) continue + if (!self.storage.bitfield.get(s.from + s.offset)) continue + + // remove fully downloaded selection + self._selections.splice(i--, 1) // decrement i to offset splice + s.notify() // TODO: this may notify twice in a row. is this a problem? + self._updateInterest() + } + + if (!self._selections.length) self.emit('idle') +} + +/** + * Update interested status for all peers. + */ +Torrent.prototype._updateInterest = function () { + var self = this + + var prev = self._amInterested + self._amInterested = !!self._selections.length + + self.swarm.wires.forEach(function (wire) { + // TODO: only call wire.interested if the wire has at least one piece we need + if (self._amInterested) wire.interested() + else wire.uninterested() + }) + + if (prev === self._amInterested) return + if (self._amInterested) self.emit('interested') + else self.emit('uninterested') +} + +/** + * Heartbeat to update all peers and their requests. + */ +Torrent.prototype._update = function () { + var self = this + if (self._destroyed) return + + // update wires in random order for better request distribution + randomizedForEach(self.swarm.wires, self._updateWire.bind(self)) +} + +/** + * Attempts to update a peer's requests + */ +Torrent.prototype._updateWire = function (wire) { + var self = this + + if (wire.peerChoking) return + if (!wire.downloaded) return validateWire() + + trySelectWire(false) || trySelectWire(true) + + function genPieceFilterFunc (start, end, tried, rank) { + return function (i) { + return i >= start && i <= end && !(i in tried) && wire.peerPieces.get(i) && (!rank || rank(i)) + } + } + + // TODO: Do we need both validateWire and trySelectWire? + function validateWire () { + if (wire.requests.length) return + + for (var i = self._selections.length; i--;) { + var next = self._selections[i] + + var piece + if (self.strategy === 'rarest') { + var start = next.from + next.offset + var end = next.to + var len = end - start + 1 + var tried = {} + var tries = 0 + var filter = genPieceFilterFunc(start, end, tried) + + while (tries < len) { + piece = self.rarityMap.getRarestPiece(filter) + if (piece < 0) break + if (self._request(wire, piece, false)) return + tried[piece] = true + tries += 1 + } + } else { + for (piece = next.to; piece >= next.from + next.offset; --piece) { + if (!wire.peerPieces.get(piece)) continue + if (self._request(wire, piece, false)) return + } + } + } + + // TODO: wire failed to validate as useful; should we close it? + } + + function speedRanker () { + var speed = wire.downloadSpeed() || 1 + if (speed > SPEED_THRESHOLD) return function () { return true } + + var secs = MAX_OUTSTANDING_REQUESTS * Storage.BLOCK_LENGTH / speed + var tries = 10 + var ptr = 0 + + return function (index) { + if (!tries || self.storage.bitfield.get(index)) return true + + var piece = self.storage.pieces[index] + var missing = piece.blocks.length - piece.blocksWritten + + for (; ptr < self.swarm.wires.length; ptr++) { + var otherWire = self.swarm.wires[ptr] + var otherSpeed = otherWire.downloadSpeed() + + if (otherSpeed < SPEED_THRESHOLD) continue + if (otherSpeed <= speed) continue + if (!otherWire.peerPieces.get(index)) continue + if ((missing -= otherSpeed * secs) > 0) continue + + tries-- + return false + } + + return true + } + } + + function shufflePriority (i) { + var last = i + for (var j = i; j < self._selections.length && self._selections[j].priority; j++) { + last = j + } + var tmp = self._selections[i] + self._selections[i] = self._selections[last] + self._selections[last] = tmp + } + + function trySelectWire (hotswap) { + if (wire.requests.length >= MAX_OUTSTANDING_REQUESTS) return true + var rank = speedRanker() + + for (var i = 0; i < self._selections.length; i++) { + var next = self._selections[i] + + var piece + if (self.strategy === 'rarest') { + var start = next.from + next.offset + var end = next.to + var len = end - start + 1 + var tried = {} + var tries = 0 + var filter = genPieceFilterFunc(start, end, tried, rank) + + while (tries < len) { + piece = self.rarityMap.getRarestPiece(filter) + if (piece < 0) break + + // request all non-reserved blocks in this piece + while (self._request(wire, piece, self._critical[piece] || hotswap)) {} + + if (wire.requests.length < MAX_OUTSTANDING_REQUESTS) { + tried[piece] = true + tries++ + continue + } + + if (next.priority) shufflePriority(i) + return true + } + } else { + for (piece = next.from + next.offset; piece <= next.to; piece++) { + if (!wire.peerPieces.get(piece) || !rank(piece)) continue + + // request all non-reserved blocks in piece + while (self._request(wire, piece, self._critical[piece] || hotswap)) {} + + if (wire.requests.length < MAX_OUTSTANDING_REQUESTS) continue + + if (next.priority) shufflePriority(i) + return true + } + } + } + + return false + } +} + +/** + * Called periodically to update the choked status of all peers, handling optimistic + * unchoking as described in BEP3. + */ +Torrent.prototype._rechoke = function () { + var self = this + + if (self._rechokeOptimisticTime > 0) + self._rechokeOptimisticTime -= 1 + else + self._rechokeOptimisticWire = null + + var peers = [] + + self.swarm.wires.forEach(function (wire) { + if (!wire.isSeeder && wire !== self._rechokeOptimisticWire) { + peers.push({ + wire: wire, + downloadSpeed: wire.downloadSpeed(), + uploadSpeed: wire.uploadSpeed(), + salt: Math.random(), + isChoked: true + }) + } + }) + + peers.sort(rechokeSort) + + var unchokeInterested = 0 + var i = 0 + for (; i < peers.length && unchokeInterested < self._rechokeNumSlots; ++i) { + peers[i].isChoked = false + if (peers[i].wire.peerInterested) unchokeInterested += 1 + } + + // Optimistically unchoke a peer + if (!self._rechokeOptimisticWire && i < peers.length && self._rechokeNumSlots) { + var candidates = peers.slice(i).filter(function (peer) { return peer.wire.peerInterested }) + var optimistic = candidates[randomInt(candidates.length)] + + if (optimistic) { + optimistic.isChoked = false + self._rechokeOptimisticWire = optimistic.wire + self._rechokeOptimisticTime = RECHOKE_OPTIMISTIC_DURATION + } + } + + // Unchoke best peers + peers.forEach(function (peer) { + if (peer.wire.amChoking !== peer.isChoked) { + if (peer.isChoked) peer.wire.choke() + else peer.wire.unchoke() + } + }) + + function rechokeSort (peerA, peerB) { + // Prefer higher download speed + if (peerA.downloadSpeed !== peerB.downloadSpeed) + return peerB.downloadSpeed - peerA.downloadSpeed + + // Prefer higher upload speed + if (peerA.uploadSpeed !== peerB.uploadSpeed) + return peerB.uploadSpeed - peerA.uploadSpeed + + // Prefer unchoked + if (peerA.wire.amChoking !== peerB.wire.amChoking) + return peerA.wire.amChoking ? 1 : -1 + + // Random order + return peerA.salt - peerB.salt + } +} + +/** + * Attempts to cancel a slow block request from another wire such that the + * given wire may effectively swap out the request for one of its own. + */ +Torrent.prototype._hotswap = function (wire, index) { + var self = this + if (!self.hotswapEnabled) return false + + var speed = wire.downloadSpeed() + if (speed < Storage.BLOCK_LENGTH) return false + if (!self._reservations[index]) return false + + var r = self._reservations[index] + if (!r) { + return false + } + + var minSpeed = Infinity + var minWire + + var i + for (i = 0; i < r.length; i++) { + var otherWire = r[i] + if (!otherWire || otherWire === wire) continue + + var otherSpeed = otherWire.downloadSpeed() + if (otherSpeed >= SPEED_THRESHOLD) continue + if (2 * otherSpeed > speed || otherSpeed > minSpeed) continue + + minWire = otherWire + minSpeed = otherSpeed + } + + if (!minWire) return false + + for (i = 0; i < r.length; i++) { + if (r[i] === minWire) r[i] = null + } + + for (i = 0; i < minWire.requests.length; i++) { + var req = minWire.requests[i] + if (req.piece !== index) continue + + self.storage.cancelBlock(index, req.offset) + } + + self.emit('hotswap', minWire, wire, index) + return true +} + +/** + * Attempts to request a block from the given wire. + */ +Torrent.prototype._request = function (wire, index, hotswap) { + var self = this + var numRequests = wire.requests.length + + if (self.storage.bitfield.get(index)) return false + if (numRequests >= MAX_OUTSTANDING_REQUESTS) return false + + var endGame = (wire.requests.length === 0 && self.storage.numMissing < 30) + var block = self.storage.reserveBlock(index, endGame) + + if (!block && !endGame && hotswap && self._hotswap(wire, index)) + block = self.storage.reserveBlock(index, false) + if (!block) return false + + var r = self._reservations[index] + if (!r) { + r = self._reservations[index] = [] + } + var i = r.indexOf(null) + if (i === -1) i = r.length + r[i] = wire + + function gotPiece (err, buffer) { + if (!self.ready) { + self.once('ready', function () { + gotPiece(err, buffer) + }) + return + } + + if (r[i] === wire) r[i] = null + + if (err) { + debug('error getting piece ' + index + '(offset: ' + block.offset + ' length: ' + block.length + ') from ' + wire.remoteAddress + ' ' + err.message) + self.storage.cancelBlock(index, block.offset) + process.nextTick(self._update.bind(self)) + return false + } else { + // debug('got piece ' + index + '(offset: ' + block.offset + ' length: ' + block.length + ') from ' + wire.remoteAddress) + self.storage.writeBlock(index, block.offset, buffer, function (err) { + if (err) { + debug('error writing block') + self.storage.cancelBlock(index, block.offset) + } + + process.nextTick(self._update.bind(self)) + }) + } + } + + wire.request(index, block.offset, block.length, gotPiece) + + return true +} + +/** + * Returns a random integer in [0,high) + */ +function randomInt (high) { + return Math.random() * high | 0 +} + +/** + * Iterates through the given array in a random order, calling the given + * callback for each element. + */ +function randomizedForEach (array, cb) { + var indices = array.map(function (value, index) { return index }) + + for (var i = 0, len = indices.length; i < len; ++i) { + var j = randomInt(len) + var tmp = indices[i] + indices[i] = indices[j] + indices[j] = tmp + } + + indices.forEach(function (index) { + cb(array[index], index, array) + }) +} diff --git a/package.json b/package.json index a24420b..952e4bd 100644 --- a/package.json +++ b/package.json @@ -7,24 +7,43 @@ "email": "feross@feross.org", "url": "http://feross.org/" }, - "browser": { - "./lib/fs-storage": false, - "./lib/server": false - }, "bin": { "webtorrent": "./bin/cmd.js" }, + "browser": { + "./lib/fs-storage": false, + "./lib/server": false, + "bittorrent-dht/client": false, + "bittorrent-swarm": "webtorrent-swarm", + "concat-stream": false, + "http-https": false, + "load-ip-set": false, + "ut_pex": false + }, "bugs": { "url": "https://github.com/feross/webtorrent/issues" }, "dependencies": { + "addr-to-ip-port": "^1.0.1", "airplay-js": "^0.2.1", - "bittorrent-client": "0.x", + "bitfield": "^1.0.2", + "bittorrent-dht": "^2.0.0", + "bittorrent-swarm": "0.x", + "block-stream": "0.0.7", "chromecast-js": "^0.1.2", "clivas": "^0.1.4", + "concat-stream": "^1.4.6", + "create-torrent": "^2.5.0", "debug": "^2.0.0", + "dezalgo": "^1.0.0", + "end-of-stream": "^1.0.0", "extend.js": "^0.0.1", + "git-sha1": "^0.1.2", + "hat": "0.0.3", + "http-https": "^1.0.0", "inherits": "^2.0.1", + "ip-set": "^1.0.0", + "load-ip-set": "^1.0.3", "mime": "^1.2.11", "minimist": "^1.1.0", "mkdirp": "^0.5.0", @@ -32,20 +51,31 @@ "network-address": "0.0.4", "nodebmc": "^0.0.3", "once": "^1.3.0", + "parse-torrent": "^2.1.0", "prettysize": "0.0.3", "pump": "^0.3.2", "random-access-file": "^0.3.1", "range-parser": "^1.0.2", + "re-emitter": "^1.0.0", "rimraf": "^2.2.5", "run-parallel": "^1.0.0", + "speedometer": "^0.1.2", "thunky": "^0.1.0", + "torrent-discovery": "^2.0.1", + "ut_metadata": "^2.1.0", + "ut_pex": "^1.0.1", + "webtorrent-swarm": "^0.3.0", "windows-no-runnable": "~0.0.6" }, "devDependencies": { + "bittorrent-tracker": "^2.5.0", + "brfs": "^1.2.0", "browserify": "^5.11.2", + "run-auto": "^1.0.0", "tape": "2.x", "uglify-js": "^2.4.15", - "zelda": "^2.0.0" + "zelda": "^2.0.0", + "zuul": "^1.11.0" }, "homepage": "http://webtorrent.io", "keywords": [ @@ -70,6 +100,9 @@ "build-debug": "browserify -s WebTorrent -e ./ > webtorrent.debug.js", "size": "npm run build && cat webtorrent.min.js | gzip | wc -c", "start": "./bin/cmd.js", - "test": "tape test/*.js" + "test": "tape test/*.js && zuul -- test/basic.js" + }, + "testling": { + "files": "test/basic.js" } } diff --git a/test/basic.js b/test/cmd.js index 0801452..b7c11c0 100644 --- a/test/basic.js +++ b/test/cmd.js @@ -1,20 +1,5 @@ var cp = require('child_process') var test = require('tape') -var WebTorrent = require('../') - -/** - * Extensive bittorrent functionality tests are contained within dependencies like - * `bittorrent-client`, `bitorrent-protocol`, etc. - */ - -test('Module usage (sanity check)', function (t) { - var client = new WebTorrent() - t.equal(typeof client.add, 'function', 'client.add exists') - client.destroy(function () { - t.pass('client.destroy works') - t.end() - }) -}) test('Command line: --help', function (t) { t.plan(2) diff --git a/test/download.js b/test/download.js new file mode 100644 index 0000000..34997e7 --- /dev/null +++ b/test/download.js @@ -0,0 +1,307 @@ +var auto = require('run-auto') +var BitTorrentClient = require('../') +var BlockStream = require('block-stream') +var DHT = require('bittorrent-dht/client') +var fs = require('fs') +var parseTorrent = require('parse-torrent') +var test = require('tape') +var TrackerServer = require('bittorrent-tracker').Server + +var leavesFile = __dirname + '/torrents/Leaves of Grass by Walt Whitman.epub' +var leavesTorrent = fs.readFileSync(__dirname + '/torrents/leaves.torrent') +var leavesParsed = parseTorrent(leavesTorrent) + +var BLOCK_LENGTH = 16 * 1024 +function writeToStorage (storage, file, cb) { + var pieceIndex = 0 + fs.createReadStream(file) + .pipe(new BlockStream(leavesParsed.pieceLength, { nopad: true })) + .on('data', function (piece) { + var index = pieceIndex + pieceIndex += 1 + + var blockIndex = 0 + var s = new BlockStream(BLOCK_LENGTH, { nopad: true }) + s.on('data', function (block) { + var offset = blockIndex * BLOCK_LENGTH + blockIndex += 1 + + storage.writeBlock(index, offset, block) + }) + s.write(piece) + s.end() + }) + .on('end', function () { + cb(null) + }) + .on('error', function (err) { + cb(err) + }) +} + +function downloadTrackerTest (t, serverType) { + t.plan(8) + + var trackerStartCount = 0 + + auto({ + tracker: function (cb) { + var tracker = new TrackerServer( + serverType === 'udp' ? { http: false } : { udp: false } + ) + + tracker.on('error', function (err) { + t.fail(err) + }) + + tracker.on('start', function () { + trackerStartCount += 1 + }) + + tracker.listen(function (port) { + var announceUrl = serverType === 'http' + ? 'http://127.0.0.1:' + port + '/announce' + : 'udp://127.0.0.1:' + port + + // Overwrite announce with our local tracker + leavesParsed.announce = [ announceUrl ] + leavesParsed.announceList = [[ announceUrl ]] + + cb(null, tracker) + }) + }, + + client1: ['tracker', function (cb) { + var client1 = new BitTorrentClient({ dht: false }) + client1.on('error', function (err) { t.fail(err) }) + + client1.add(leavesParsed) + + client1.on('torrent', function (torrent) { + // torrent metadata has been fetched -- sanity check it + t.equal(torrent.name, 'Leaves of Grass by Walt Whitman.epub') + + var names = [ + 'Leaves of Grass by Walt Whitman.epub' + ] + + t.deepEqual(torrent.files.map(function (file) { return file.name }), names) + + writeToStorage(torrent.storage, leavesFile, function (err) { + cb(err, client1) + }) + }) + }], + + client2: ['client1', function (cb) { + var client2 = new BitTorrentClient({ dht: false }) + client2.on('error', function (err) { t.fail(err) }) + + client2.add(leavesParsed) + + client2.on('torrent', function (torrent) { + torrent.files.forEach(function (file) { + file.createReadStream() + }) + + torrent.once('done', function () { + t.pass('client2 downloaded torrent from client1') + cb(null, client2) + }) + }) + }] + + }, function (err, r) { + t.error(err) + t.equal(trackerStartCount, 2) + + r.tracker.close(function () { + t.pass('tracker closed') + }) + r.client1.destroy(function () { + t.pass('client1 destroyed') + }) + r.client2.destroy(function () { + t.pass('client2 destroyed') + }) + }) +} + +test('Simple download using UDP tracker', function (t) { + downloadTrackerTest(t, 'udp') +}) + +test('Simple download using HTTP tracker', function (t) { + downloadTrackerTest(t, 'http') +}) + +test('Simple download using a tracker (only) via a magnet uri', function (t) { + t.plan(8) + + var trackerStartCount = 0 + + var magnetUri + auto({ + tracker: function (cb) { + var tracker = new TrackerServer('udp') + + tracker.on('error', function (err) { + t.fail(err) + }) + + tracker.on('start', function () { + trackerStartCount += 1 + }) + + tracker.listen(function (port) { + var announceUrl = 'udp://127.0.0.1:' + port + leavesParsed.announce = [ announceUrl ] + leavesParsed.announceList = [[ announceUrl ]] + magnetUri = 'magnet:?xt=urn:btih:' + leavesParsed.infoHash + '&tr=' + encodeURIComponent(announceUrl) + cb(null, tracker) + }) + }, + + client1: ['tracker', function (cb) { + var client1 = new BitTorrentClient({ dht: false }) + client1.on('error', function (err) { t.fail(err) }) + + client1.add(leavesParsed) + + client1.on('torrent', function (torrent) { + // torrent metadata has been fetched -- sanity check it + t.equal(torrent.name, 'Leaves of Grass by Walt Whitman.epub') + + var names = [ + 'Leaves of Grass by Walt Whitman.epub' + ] + + t.deepEqual(torrent.files.map(function (file) { return file.name }), names) + + writeToStorage(torrent.storage, leavesFile, function (err) { + cb(err, client1) + }) + }) + }], + + client2: ['client1', function (cb) { + var client2 = new BitTorrentClient({ dht: false }) + client2.on('error', function (err) { t.fail(err) }) + + client2.add(magnetUri) + + client2.on('torrent', function (torrent) { + torrent.files.forEach(function (file) { + file.createReadStream() + }) + + torrent.once('done', function () { + t.pass('client2 downloaded torrent from client1') + cb(null, client2) + }) + }) + }] + + }, function (err, r) { + t.error(err) + t.equal(trackerStartCount, 2) + + r.tracker.close(function () { + t.pass('tracker closed') + }) + r.client1.destroy(function () { + t.pass('client1 destroyed') + }) + r.client2.destroy(function () { + t.pass('client2 destroyed') + }) + }) +}) + +test('Simple download using DHT', function (t) { + t.plan(7) + + // no trackers + leavesParsed.announce = [] + leavesParsed.announceList = [] + + // TODO: use actual DHT server here, instead of client + var dhtServer = new DHT({ bootstrap: false }) + + dhtServer.on('error', function (err) { + t.fail(err) + }) + + auto({ + dhtPort: function (cb) { + dhtServer.listen(function (port) { + cb(null, port) + }) + }, + client1: ['dhtPort', function (cb, r) { + var client1 = new BitTorrentClient({ + trackers: false, + dht: { bootstrap: '127.0.0.1:' + r.dhtPort } + }) + client1.on('error', function (err) { t.fail(err) }) + + client1.add(leavesParsed) + + var announced, wroteStorage + function maybeDone (err) { + if ((announced && wroteStorage) || err) cb(err, client1) + } + + client1.on('torrent', function (torrent) { + // torrent metadata has been fetched -- sanity check it + t.equal(torrent.name, 'Leaves of Grass by Walt Whitman.epub') + + var names = [ 'Leaves of Grass by Walt Whitman.epub' ] + t.deepEqual(torrent.files.map(function (file) { return file.name }), names) + + torrent.on('dhtAnnounce', function () { + announced = true + maybeDone(null) + }) + + writeToStorage(torrent.storage, leavesFile, function (err) { + wroteStorage = true + maybeDone(err) + }) + }) + }], + + client2: ['client1', function (cb, r) { + var client2 = new BitTorrentClient({ + trackers: false, + dht: { bootstrap: '127.0.0.1:' + r.dhtPort } + }) + client2.on('error', function (err) { t.fail(err) }) + + client2.add(leavesParsed) + + client2.on('torrent', function (torrent) { + torrent.files.forEach(function (file) { + file.createReadStream() + }) + + torrent.once('done', function () { + t.pass('client2 downloaded torrent from client1') + cb(null, client2) + }) + }) + }], + + }, function (err, r) { + t.error(err) + r.client1.destroy(function () { + t.pass('client1 destroyed') + }) + r.client2.destroy(function () { + t.pass('client2 destroyed') + }) + dhtServer.destroy(function () { + t.pass('dht server destroyed') + }) + }) +}) diff --git a/test/metadata.js b/test/metadata.js new file mode 100644 index 0000000..5c86b79 --- /dev/null +++ b/test/metadata.js @@ -0,0 +1,47 @@ +var BitTorrentClient = require('../') +var parseTorrent = require('parse-torrent') +var test = require('tape') +var fs = require('fs') + +var leaves = fs.readFileSync(__dirname + '/torrents/leaves.torrent') +var leavesTorrent = parseTorrent(leaves) + +test('ut_metadata transfer', function (t) { + t.plan(5) + + var client1 = new BitTorrentClient({ dht: false, trackers: false }) + var client2 = new BitTorrentClient({ dht: false, trackers: false }) + + client1.on('torrent', function (torrent) { + t.pass('client1 emits torrent event') // even though it started with metadata + }) + + // client1 starts with metadata from torrent file + client1.add(leaves) + + client1.on('error', function (err) { t.fail(err) }) + client2.on('error', function (err) { t.fail(err) }) + + client1.on('torrent', function (torrent1) { + t.deepEqual(torrent1.parsedTorrent.info, leavesTorrent.info) + + // client2 starts with infohash + client2.add(leavesTorrent.infoHash) + + client2.on('listening', function (port, torrent2) { + // manually add the peer + torrent2.addPeer('127.0.0.1:' + client1.torrentPort) + + client2.on('torrent', function () { + t.deepEqual(torrent1.parsedTorrent.info, torrent2.parsedTorrent.info) + + client1.destroy(function () { + t.pass('client1 destroyed') + }) + client2.destroy(function () { + t.pass('client2 destroyed') + }) + }) + }) + }) +}) diff --git a/test/multiple.js b/test/multiple.js new file mode 100644 index 0000000..cf92266 --- /dev/null +++ b/test/multiple.js @@ -0,0 +1,60 @@ +/* +var BitTorrentClient = require('../') +var test = require('tape') +var fs = require('fs') + +var torrents = [ 'leaves', 'pride' ].map(function (name) { + return fs.readFileSync(__dirname + '/torrents/' + name + '.torrent') +}) + +// TODO: replace this with a test that can run offline +test('two simultaneous downloads with dht disabled', function (t) { + t.plan(torrents.length * 2) + + var client = new BitTorrentClient({ dht: false }) + var numDone = 0 + + client.on('error', function (err) { t.fail(err.message) }) + + torrents.forEach(function (torrent) { + client.add(torrent) + }) + + client.on('torrent', function (torrent) { + t.pass('received metadata for torrent ' + torrent.name) + + torrent.once('done', function () { + t.pass('done downloading torrent ' + torrent.name) + + if (++numDone >= torrents.length) { + client.destroy() + } + }) + }) +}) + +test('two simultaneous downloads with dht enabled', function (t) { + t.plan(torrents.length * 2) + + var client = new BitTorrentClient() + var numDone = 0 + + client.on('error', function (err) { t.fail(err.message) }) + + torrents.forEach(function (torrent) { + client.add(torrent) + }) + + client.on('torrent', function (torrent) { + t.pass('received metadata for torrent ' + torrent.name) + + torrent.once('done', function () { + t.pass('done downloading torrent ' + torrent.name) + + if (++numDone >= torrents.length) { + client.destroy() + } + }) + }) +}) +*/ diff --git a/test/package.json b/test/package.json new file mode 100644 index 0000000..3464324 --- /dev/null +++ b/test/package.json @@ -0,0 +1,7 @@ +{ + "name": "test", + "version": "0.0.0", + "browserify": { + "transform": ["brfs"] + } +} diff --git a/test/rarity-map.js b/test/rarity-map.js new file mode 100644 index 0000000..5e04ddc --- /dev/null +++ b/test/rarity-map.js @@ -0,0 +1,113 @@ +var RarityMap = require('../lib/rarity-map') +var BitField = require('bitfield') +var Swarm = require('bittorrent-swarm') +var EventEmitter = require('events').EventEmitter +var test = require('tape') +var hat = require('hat') + +var infoHash = 'd2474e86c95b19b8bcfdb92bc12c9d44667cfa36' +var peerId1 = '-WW0001-' + hat(48) + +test('Rarity map usage', function (t) { + t.plan(16) + + var swarm = new Swarm(infoHash, peerId1) + var numPieces = 4 + swarm.wires = [ new EventEmitter(), new EventEmitter() ] + swarm.wires.forEach(function (wire) { + wire.peerPieces = new BitField(numPieces) + }) + var rarityMap = new RarityMap(swarm, numPieces) + + function validateInitial () { + // note that getRarestPiece will return a random piece since they're all equal + // so repeat the test several times to reasonably ensure its correctness. + var piece = rarityMap.getRarestPiece() + t.ok(piece >= 0 && piece < numPieces) + + piece = rarityMap.getRarestPiece() + t.ok(piece >= 0 && piece < numPieces) + + piece = rarityMap.getRarestPiece() + t.ok(piece >= 0 && piece < numPieces) + + piece = rarityMap.getRarestPiece() + t.ok(piece >= 0 && piece < numPieces) + } + + // test initial / empty case + validateInitial() + + rarityMap.recalculate() + + // test initial / empty case after recalc + validateInitial() + + function setPiece (wire, index) { + wire.peerPieces.set(index) + wire.emit('have', index) + } + + setPiece(swarm.wires[0], 0) + setPiece(swarm.wires[1], 0) + + setPiece(swarm.wires[0], 1) + setPiece(swarm.wires[1], 3) + + // test rarest piece after setting pieces and handling 'have' events + var piece = rarityMap.getRarestPiece() + t.equal(piece, 2) + + rarityMap.recalculate() + + // test rarest piece after recalc to ensure its the same + piece = rarityMap.getRarestPiece() + t.equal(piece, 2) + + function addWire () { + var wire = new EventEmitter() + wire.peerPieces = new BitField(numPieces) + wire.peerPieces.set(1) + wire.peerPieces.set(2) + swarm.wires.push(wire) + swarm.emit('wire', wire) + } + + addWire() + addWire() + + // test rarest piece after adding wires + piece = rarityMap.getRarestPiece() + t.equal(piece, 3) + + rarityMap.recalculate() + + // test rarest piece after adding wires and recalc + piece = rarityMap.getRarestPiece() + t.equal(piece, 3) + + function removeWire (index) { + var wire = swarm.wires.splice(index, 1)[0] + wire.emit('close') + } + + removeWire(3) + removeWire(1) + + // test rarest piece after removing wires + piece = rarityMap.getRarestPiece() + t.equal(piece, 3) + + rarityMap.recalculate() + + // test rarest piece after removing wires and recalc + piece = rarityMap.getRarestPiece() + t.equal(piece, 3) + + // test piece filter func + piece = rarityMap.getRarestPiece(function (i) { return i <= 1 }) + t.equal(piece, 0) + + piece = rarityMap.getRarestPiece(function (i) { return i === 1 || i === 2 }) + t.equal(piece, 2) +}) diff --git a/test/storage.js b/test/storage.js new file mode 100644 index 0000000..6dfc04b --- /dev/null +++ b/test/storage.js @@ -0,0 +1,60 @@ +var Storage = require('../lib/storage') +var parseTorrent = require('parse-torrent') +var test = require('tape') +var fs = require('fs') + +var torrents = [ 'leaves', 'pride' ].map(function (name) { + var torrent = fs.readFileSync(__dirname + '/torrents/' + name + '.torrent') + + return { + name : name, + torrent : torrent, + parsedTorrent : parseTorrent(torrent) + } +}) + +torrents.forEach(function (torrent) { + test('sanity check backing storage for ' + torrent.name + ' torrent', function (t) { + var parsedTorrent = torrent.parsedTorrent + var storage = new Storage(parsedTorrent) + + t.equal(storage.files.length, parsedTorrent.files.length) + t.equal(storage.pieces.length, parsedTorrent.pieces.length) + + var length = 0, pieces = 0 + + storage.pieces.forEach(function (piece, index) { + t.notOk(piece.verified) + length += piece.length + + // ensure all blocks start out empty + for (var i = 0; i < piece.blocks.length; ++i) { + t.equal(piece.blocks[i], 0) + } + }) + + t.equal(length, parsedTorrent.length) + length = 0 + + storage.files.forEach(function (file, index) { + t.notOk(file.done) + length += file.length + pieces += file.pieces.length + + t.assert(file.length >= 0) + t.assert(file.pieces.length >= 0) + }) + + t.equal(length, parsedTorrent.length) + + if (parsedTorrent.files.length > 1) { + // if the torrent contains multiple files, the pieces may overlap file boundaries, + // so the aggregate number of file pieces will be at least the number of pieces. + t.assert(pieces >= parsedTorrent.pieces.length) + } else { + t.equal(pieces, parsedTorrent.pieces.length) + } + + t.end() + }) +}) diff --git a/test/torrents/Leaves of Grass by Walt Whitman.epub b/test/torrents/Leaves of Grass by Walt Whitman.epub Binary files differnew file mode 100644 index 0000000..66791ed --- /dev/null +++ b/test/torrents/Leaves of Grass by Walt Whitman.epub diff --git a/test/torrents/pride.torrent b/test/torrents/pride.torrent Binary files differnew file mode 100644 index 0000000..a9bf635 --- /dev/null +++ b/test/torrents/pride.torrent |