diff options
author | Feross Aboukhadijeh <feross@feross.org> | 2015-05-16 05:27:27 +0300 |
---|---|---|
committer | Feross Aboukhadijeh <feross@feross.org> | 2015-05-16 05:27:27 +0300 |
commit | f559337a461bb949a0190948ab288f12743ab6ed (patch) | |
tree | 426f19eca61b6542949ac603b7bf294d23a733a4 /lib | |
parent | 9d6beb941fe56830a280d552e13f0a79d9a2e95e (diff) |
better stream cleanup in storage.load
Diffstat (limited to 'lib')
-rw-r--r-- | lib/storage.js | 77 |
1 files changed, 56 insertions, 21 deletions
diff --git a/lib/storage.js b/lib/storage.js index 7971651..30fe9a9 100644 --- a/lib/storage.js +++ b/lib/storage.js @@ -1,7 +1,7 @@ module.exports = Storage var BitField = require('bitfield') -var BlockStream = require('block-stream') +var BlockStream = require('block-stream2') var debug = require('debug')('webtorrent:storage') var dezalgo = require('dezalgo') var eos = require('end-of-stream') @@ -394,28 +394,63 @@ Storage.prototype.load = function (streams, cb) { if (!Array.isArray(streams)) streams = [ streams ] cb = once(cb || function () {}) - self.once('done', function () { + var pieceIndex = 0 + var multistream = new MultiStream(streams) + var blockstream = new BlockStream(self.pieceLength, { zeroPadding: false }) + + multistream.on('error', onError) + + self.once('done', onDone) + + multistream + .pipe(blockstream) + .on('data', onData) + .on('error', onError) + + function onData (piece) { + var index = pieceIndex + pieceIndex += 1 + + var blockIndex = 0 + var s = new BlockStream(BLOCK_LENGTH, { zeroPadding: false }) + + s.on('data', onBlockData) + s.on('end', onBlockEnd) + + function onBlockData (block) { + var offset = blockIndex * BLOCK_LENGTH + blockIndex += 1 + self.writeBlock(index, offset, block) + } + + function onBlockEnd () { + blockCleanup() + } + + function blockCleanup () { + s.removeListener('data', onBlockData) + s.removeListener('end', onBlockEnd) + } + + s.end(piece) + } + + function onError (err) { + cleanup() + cb(err) + } + + function onDone () { + cleanup() cb(null) - }) + } - var pieceIndex = 0 - ;new MultiStream(streams) - .pipe(new BlockStream(self.pieceLength, { nopad: true })) - .on('data', function (piece) { - var index = pieceIndex - pieceIndex += 1 - - var blockIndex = 0 - var s = new BlockStream(BLOCK_LENGTH, { nopad: true }) - s.on('data', function (block) { - var offset = blockIndex * BLOCK_LENGTH - blockIndex += 1 - - self.writeBlock(index, offset, block) - }) - s.end(piece) - }) - .on('error', cb) + function cleanup () { + multistream.removeListener('error', onError) + blockstream.removeListener('data', onData) + blockstream.removeListener('error', onError) + self.removeListener('done', onDone) + } } Object.defineProperty(Storage.prototype, 'downloaded', { |