Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFeross Aboukhadijeh <feross@feross.org>2014-03-03 10:48:43 +0400
committerFeross Aboukhadijeh <feross@feross.org>2014-03-03 10:48:43 +0400
commit8375be8c1ceb5cadf3f0c67eb6150e2bf9860f67 (patch)
tree64436c120b5ca10e2b9ebce9621ba0ad5a31492d /lib/torrent.js
parent67be9daf627bb7d9d068f3ef16011a7ae4df939f (diff)
add file fetching / piece verification / storage classes
Diffstat (limited to 'lib/torrent.js')
-rw-r--r--lib/torrent.js108
1 files changed, 98 insertions, 10 deletions
diff --git a/lib/torrent.js b/lib/torrent.js
index 183157a..ac7d809 100644
--- a/lib/torrent.js
+++ b/lib/torrent.js
@@ -5,9 +5,13 @@ var EventEmitter = require('events').EventEmitter
var inherits = require('inherits')
var magnet = require('magnet-uri')
var parseTorrent = require('parse-torrent')
+var Storage = require('./storage')
var Swarm = require('bittorrent-swarm')
-var METADATA_BLOCK_SIZE = 16 * 1024
+var BLOCK_LENGTH = 16 * 1024
+var MAX_BLOCK_LENGTH = 128 * 1024
+var MAX_OUTSTANDING_REQUESTS = 5
+var METADATA_BLOCK_LENGTH = 16 * 1024
var PIECE_TIMEOUT = 10000
var EXTENDED_MESSAGES = {
@@ -26,7 +30,6 @@ function Torrent (uri, opts) {
throw new Error('invalid torrent uri')
self.infoHash = info.infoHash
- self.file = null
self.name = info.name
self.peerId = opts.peerId
@@ -37,6 +40,7 @@ function Torrent (uri, opts) {
self.metadata = null
self.swarm = new Swarm(self.infoHash, self.peerId, { dht: true })
+ self.storage = null
if (self.torrentPort) {
self.swarm.listen(self.torrentPort, function (port) {
@@ -51,12 +55,6 @@ function Torrent (uri, opts) {
self.swarm.on('wire', self._onWire.bind(self))
}
-Object.defineProperty(Torrent.prototype, 'progress', {
- get: function () {
- return 0 // TODO
- }
-})
-
/**
* Add a peer to the swarm
* @param {string} addr
@@ -112,6 +110,70 @@ Torrent.prototype._onWire = function (wire) {
else if (ext === EXTENDED_MESSAGES.ut_metadata)
self._onUtMetadata(wire, buf)
})
+
+ if (self.metadata) {
+ self._onWireWithMetadata(wire)
+ }
+}
+
+Torrent.prototype._onWireWithMetadata = function (wire) {
+ var self = this
+
+ function requestPiece (index) {
+ var len = wire.requests.length
+ if (len >= MAX_OUTSTANDING_REQUESTS) return
+
+ var endGame = (len === 0 && self.storage.numMissing < 30)
+ var block = self.storage.selectBlock(index, endGame)
+ if (!block) return
+
+ console.log(wire.remoteAddress, 'requestPiece', index, 'offset', block.offset, 'length', block.length)
+ wire.request(index, block.offset, block.length, function (err, bufffer) {
+ if (err)
+ return self.storage.deselectBlock(index, block.offset)
+
+ self.storage.writeBlock(index, block.offset, bufffer)
+ requestPieces()
+ });
+ }
+
+ function requestPieces () {
+ for (var index = 0, len = wire.peerPieces.length; index < len; index++) {
+ if (wire.peerPieces[index] && self.storage.pieces[index]) {
+ // if peer has this piece AND it's a valid piece, then request blocks
+ requestPiece(index)
+ }
+ }
+ }
+
+ wire.on('have', function (index) {
+ if (wire.peerChoking || !self.storage.pieces[index])
+ return
+ requestPiece(index)
+ });
+
+ wire.on('unchoke', requestPieces)
+
+ wire.once('interested', function () {
+ wire.unchoke()
+ })
+
+ wire.on('request', function (index, offset, length, cb) {
+ // Disconnect from peers that request more than 128KB, per spec
+ if (length > MAX_BLOCK_LENGTH) {
+ console.error(wire.remoteAddress, 'requested invalid block size', length)
+ return wire.destroy()
+ }
+
+ process.nextTick(function () {
+ var block = self.storage.readBlock(index, offset, length)
+ if (!block) return cb(new Error('requested block not available'))
+ cb(null, block)
+ })
+ })
+
+ wire.bitfield(self.storage.bitfield) // always send bitfield (required)
+ wire.interested() // always start out interested
}
Torrent.prototype._onExtendedHandshake = function (wire, buf) {
@@ -129,7 +191,7 @@ Torrent.prototype._onExtendedHandshake = function (wire, buf) {
// If torrent is missing metadata and peer supports ut_metadata extension,
// then request all metadata pieces
if (!self.metadata && dict.metadata_size && dict.m && dict.m.ut_metadata) {
- var numPieces = Math.ceil(dict.metadata_size / METADATA_BLOCK_SIZE)
+ var numPieces = Math.ceil(dict.metadata_size / METADATA_BLOCK_LENGTH)
wire.metadata = new Buffer(dict.metadata_size)
console.log('metadata size: ' + dict.metadata_size)
@@ -173,7 +235,7 @@ Torrent.prototype._onUtMetadata = function (wire, buf) {
// ut_metadata data (in response to our request)
// example: {'msg_type': 1, 'piece': 0, 'total_size': 3425}
case 1:
- data.copy(wire.metadata, dict.piece * METADATA_BLOCK_SIZE)
+ data.copy(wire.metadata, dict.piece * METADATA_BLOCK_LENGTH)
self.metadataRaw = wire.metadata
self.metadata = bncode.decode(wire.metadata)
@@ -204,6 +266,32 @@ Torrent.prototype._onMetadata = function () {
}
console.log(self.parsedTorrent)
self.name = self.parsedTorrent.name
+ console.log('before storage instantiation')
+ self.storage = new Storage(self.parsedTorrent)
+ console.log('after storage instantiation')
+ self.storage.on('piece', self._onStoragePiece.bind(self))
+ self.storage.on('file', function (file) {
+ console.log('FILE', file.name)
+ })
+ self.storage.on('done', function () {
+ console.log('done with torrent!')
+ })
+
+ self.swarm.wires.forEach(function (wire) {
+ self._onWireWithMetadata(wire)
+ })
+}
+
+/**
+ * When a piece is fully downloaded, notify all peers with a HAVE message.
+ * @param {Piece} piece
+ */
+Torrent.prototype._onStoragePiece = function (piece) {
+ var self = this
+ console.log('PIECE', piece.index)
+ self.swarm.wires.forEach(function (wire) {
+ wire.have(piece.index)
+ })
}
//