diff options
author | Robert Nagy <ronagy@icloud.com> | 2019-08-11 16:29:30 +0300 |
---|---|---|
committer | Rich Trott <rtrott@gmail.com> | 2019-12-18 18:03:37 +0300 |
commit | 2b9847c63784c4adf65ed1fa8fcdee823da546ec (patch) | |
tree | 25e66c5d6353e5d625cc0f5d561b5f9d6fee2151 /lib | |
parent | e23aebc684a45f725811211f9740bcee3bcdbe26 (diff) |
fs: allow overriding fs for streams
Allow overriding open, write, and close when using createReadStream()
and createWriteStream().
PR-URL: https://github.com/nodejs/node/pull/29083
Refs: https://github.com/nodejs/node/issues/29050
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Rich Trott <rtrott@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/internal/fs/streams.js | 128 |
1 files changed, 91 insertions, 37 deletions
diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js index b17ebe5cb43..456b57d2af8 100644 --- a/lib/internal/fs/streams.js +++ b/lib/internal/fs/streams.js @@ -11,6 +11,7 @@ const { } = primordials; const { + ERR_INVALID_ARG_TYPE, ERR_OUT_OF_RANGE, ERR_STREAM_DESTROYED } = require('internal/errors').codes; @@ -28,6 +29,7 @@ const kIoDone = Symbol('kIoDone'); const kIsPerformingIO = Symbol('kIsPerformingIO'); const kMinPoolSpace = 128; +const kFs = Symbol('kFs'); let pool; // It can happen that we expect to read a large chunk of data, and reserve @@ -76,6 +78,23 @@ function ReadStream(path, options) { options.emitClose = false; } + this[kFs] = options.fs || fs; + + if (typeof this[kFs].open !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', + this[kFs].open); + } + + if (typeof this[kFs].read !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function', + this[kFs].read); + } + + if (typeof this[kFs].close !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', + this[kFs].close); + } + Readable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy @@ -136,7 +155,7 @@ function _openReadFs(stream) { return; } - fs.open(stream.path, stream.flags, stream.mode, (er, fd) => { + stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { if (er) { if (stream.autoClose) { stream.destroy(); @@ -186,42 +205,43 @@ ReadStream.prototype._read = function(n) { // the actual read. this[kIsPerformingIO] = true; - fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { - this[kIsPerformingIO] = false; - // Tell ._destroy() that it's safe to close the fd now. - if (this.destroyed) return this.emit(kIoDone, er); - - if (er) { - if (this.autoClose) { - this.destroy(); - } - this.emit('error', er); - } else { - let b = null; - // Now that we know how much data we have actually read, re-wind the - // 'used' field if we can, and otherwise allow the remainder of our - // reservation to be used as a new pool later. - if (start + toRead === thisPool.used && thisPool === pool) { - const newUsed = thisPool.used + bytesRead - toRead; - thisPool.used = roundUpToMultipleOf8(newUsed); + this[kFs].read( + this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => { + this[kIsPerformingIO] = false; + // Tell ._destroy() that it's safe to close the fd now. + if (this.destroyed) return this.emit(kIoDone, er); + + if (er) { + if (this.autoClose) { + this.destroy(); + } + this.emit('error', er); } else { - // Round down to the next lowest multiple of 8 to ensure the new pool - // fragment start and end positions are aligned to an 8 byte boundary. - const alignedEnd = (start + toRead) & ~7; - const alignedStart = roundUpToMultipleOf8(start + bytesRead); - if (alignedEnd - alignedStart >= kMinPoolSpace) { - poolFragments.push(thisPool.slice(alignedStart, alignedEnd)); + let b = null; + // Now that we know how much data we have actually read, re-wind the + // 'used' field if we can, and otherwise allow the remainder of our + // reservation to be used as a new pool later. + if (start + toRead === thisPool.used && thisPool === pool) { + const newUsed = thisPool.used + bytesRead - toRead; + thisPool.used = roundUpToMultipleOf8(newUsed); + } else { + // Round down to the next lowest multiple of 8 to ensure the new pool + // fragment start and end positions are aligned to an 8 byte boundary. + const alignedEnd = (start + toRead) & ~7; + const alignedStart = roundUpToMultipleOf8(start + bytesRead); + if (alignedEnd - alignedStart >= kMinPoolSpace) { + poolFragments.push(thisPool.slice(alignedStart, alignedEnd)); + } } - } - if (bytesRead > 0) { - this.bytesRead += bytesRead; - b = thisPool.slice(start, start + bytesRead); - } + if (bytesRead > 0) { + this.bytesRead += bytesRead; + b = thisPool.slice(start, start + bytesRead); + } - this.push(b); - } - }); + this.push(b); + } + }); // Move the pool positions, and internal position for reading. if (this.pos !== undefined) @@ -245,7 +265,7 @@ ReadStream.prototype._destroy = function(err, cb) { }; function closeFsStream(stream, cb, err) { - fs.close(stream.fd, (er) => { + stream[kFs].close(stream.fd, (er) => { er = er || err; cb(er); stream.closed = true; @@ -279,6 +299,40 @@ function WriteStream(path, options) { options.emitClose = false; } + this[kFs] = options.fs || fs; + if (typeof this[kFs].open !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function', + this[kFs].open); + } + + if (!this[kFs].write && !this[kFs].writev) { + throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', + this[kFs].write); + } + + if (this[kFs].write && typeof this[kFs].write !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function', + this[kFs].write); + } + + if (this[kFs].writev && typeof this[kFs].writev !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function', + this[kFs].writev); + } + + if (typeof this[kFs].close !== 'function') { + throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function', + this[kFs].close); + } + + // It's enough to override either, in which case only one will be used. + if (!this[kFs].write) { + this._write = null; + } + if (!this[kFs].writev) { + this._writev = null; + } + Writable.call(this, options); // Path will be ignored when fd is specified, so it can be falsy @@ -335,7 +389,7 @@ function _openWriteFs(stream) { return; } - fs.open(stream.path, stream.flags, stream.mode, (er, fd) => { + stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => { if (er) { if (stream.autoClose) { stream.destroy(); @@ -361,7 +415,7 @@ WriteStream.prototype._write = function(data, encoding, cb) { if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write')); this[kIsPerformingIO] = true; - fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { + this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) { @@ -405,7 +459,7 @@ WriteStream.prototype._writev = function(data, cb) { } this[kIsPerformingIO] = true; - fs.writev(this.fd, chunks, this.pos, (er, bytes) => { + this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => { this[kIsPerformingIO] = false; // Tell ._destroy() that it's safe to close the fd now. if (this.destroyed) { |