Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/webtorrent/webtorrent.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorFeross Aboukhadijeh <feross@feross.org>2015-05-16 05:27:27 +0300
committerFeross Aboukhadijeh <feross@feross.org>2015-05-16 05:27:27 +0300
commitf559337a461bb949a0190948ab288f12743ab6ed (patch)
tree426f19eca61b6542949ac603b7bf294d23a733a4 /lib
parent9d6beb941fe56830a280d552e13f0a79d9a2e95e (diff)
better stream cleanup in storage.load
Diffstat (limited to 'lib')
-rw-r--r--lib/storage.js77
1 files changed, 56 insertions, 21 deletions
diff --git a/lib/storage.js b/lib/storage.js
index 7971651..30fe9a9 100644
--- a/lib/storage.js
+++ b/lib/storage.js
@@ -1,7 +1,7 @@
module.exports = Storage
var BitField = require('bitfield')
-var BlockStream = require('block-stream')
+var BlockStream = require('block-stream2')
var debug = require('debug')('webtorrent:storage')
var dezalgo = require('dezalgo')
var eos = require('end-of-stream')
@@ -394,28 +394,63 @@ Storage.prototype.load = function (streams, cb) {
if (!Array.isArray(streams)) streams = [ streams ]
cb = once(cb || function () {})
- self.once('done', function () {
+ var pieceIndex = 0
+ var multistream = new MultiStream(streams)
+ var blockstream = new BlockStream(self.pieceLength, { zeroPadding: false })
+
+ multistream.on('error', onError)
+
+ self.once('done', onDone)
+
+ multistream
+ .pipe(blockstream)
+ .on('data', onData)
+ .on('error', onError)
+
+ function onData (piece) {
+ var index = pieceIndex
+ pieceIndex += 1
+
+ var blockIndex = 0
+ var s = new BlockStream(BLOCK_LENGTH, { zeroPadding: false })
+
+ s.on('data', onBlockData)
+ s.on('end', onBlockEnd)
+
+ function onBlockData (block) {
+ var offset = blockIndex * BLOCK_LENGTH
+ blockIndex += 1
+ self.writeBlock(index, offset, block)
+ }
+
+ function onBlockEnd () {
+ blockCleanup()
+ }
+
+ function blockCleanup () {
+ s.removeListener('data', onBlockData)
+ s.removeListener('end', onBlockEnd)
+ }
+
+ s.end(piece)
+ }
+
+ function onError (err) {
+ cleanup()
+ cb(err)
+ }
+
+ function onDone () {
+ cleanup()
cb(null)
- })
+ }
- var pieceIndex = 0
- ;new MultiStream(streams)
- .pipe(new BlockStream(self.pieceLength, { nopad: true }))
- .on('data', function (piece) {
- var index = pieceIndex
- pieceIndex += 1
-
- var blockIndex = 0
- var s = new BlockStream(BLOCK_LENGTH, { nopad: true })
- s.on('data', function (block) {
- var offset = blockIndex * BLOCK_LENGTH
- blockIndex += 1
-
- self.writeBlock(index, offset, block)
- })
- s.end(piece)
- })
- .on('error', cb)
+ function cleanup () {
+ multistream.removeListener('error', onError)
+ blockstream.removeListener('data', onData)
+ blockstream.removeListener('error', onError)
+ self.removeListener('done', onDone)
+ }
}
Object.defineProperty(Storage.prototype, 'downloaded', {