Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2019-08-11 16:29:30 +0300
committerRich Trott <rtrott@gmail.com>2019-12-18 18:03:37 +0300
commit2b9847c63784c4adf65ed1fa8fcdee823da546ec (patch)
tree25e66c5d6353e5d625cc0f5d561b5f9d6fee2151 /lib
parente23aebc684a45f725811211f9740bcee3bcdbe26 (diff)
fs: allow overriding fs for streams
Allow overriding open, write, and close when using createReadStream() and createWriteStream(). PR-URL: https://github.com/nodejs/node/pull/29083 Refs: https://github.com/nodejs/node/issues/29050 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Rich Trott <rtrott@gmail.com>
Diffstat (limited to 'lib')
-rw-r--r--lib/internal/fs/streams.js128
1 files changed, 91 insertions, 37 deletions
diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js
index b17ebe5cb43..456b57d2af8 100644
--- a/lib/internal/fs/streams.js
+++ b/lib/internal/fs/streams.js
@@ -11,6 +11,7 @@ const {
} = primordials;
const {
+ ERR_INVALID_ARG_TYPE,
ERR_OUT_OF_RANGE,
ERR_STREAM_DESTROYED
} = require('internal/errors').codes;
@@ -28,6 +29,7 @@ const kIoDone = Symbol('kIoDone');
const kIsPerformingIO = Symbol('kIsPerformingIO');
const kMinPoolSpace = 128;
+const kFs = Symbol('kFs');
let pool;
// It can happen that we expect to read a large chunk of data, and reserve
@@ -76,6 +78,23 @@ function ReadStream(path, options) {
options.emitClose = false;
}
+ this[kFs] = options.fs || fs;
+
+ if (typeof this[kFs].open !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
+ this[kFs].open);
+ }
+
+ if (typeof this[kFs].read !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('options.fs.read', 'function',
+ this[kFs].read);
+ }
+
+ if (typeof this[kFs].close !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
+ this[kFs].close);
+ }
+
Readable.call(this, options);
// Path will be ignored when fd is specified, so it can be falsy
@@ -136,7 +155,7 @@ function _openReadFs(stream) {
return;
}
- fs.open(stream.path, stream.flags, stream.mode, (er, fd) => {
+ stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
if (er) {
if (stream.autoClose) {
stream.destroy();
@@ -186,42 +205,43 @@ ReadStream.prototype._read = function(n) {
// the actual read.
this[kIsPerformingIO] = true;
- fs.read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
- this[kIsPerformingIO] = false;
- // Tell ._destroy() that it's safe to close the fd now.
- if (this.destroyed) return this.emit(kIoDone, er);
-
- if (er) {
- if (this.autoClose) {
- this.destroy();
- }
- this.emit('error', er);
- } else {
- let b = null;
- // Now that we know how much data we have actually read, re-wind the
- // 'used' field if we can, and otherwise allow the remainder of our
- // reservation to be used as a new pool later.
- if (start + toRead === thisPool.used && thisPool === pool) {
- const newUsed = thisPool.used + bytesRead - toRead;
- thisPool.used = roundUpToMultipleOf8(newUsed);
+ this[kFs].read(
+ this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
+ this[kIsPerformingIO] = false;
+ // Tell ._destroy() that it's safe to close the fd now.
+ if (this.destroyed) return this.emit(kIoDone, er);
+
+ if (er) {
+ if (this.autoClose) {
+ this.destroy();
+ }
+ this.emit('error', er);
} else {
- // Round down to the next lowest multiple of 8 to ensure the new pool
- // fragment start and end positions are aligned to an 8 byte boundary.
- const alignedEnd = (start + toRead) & ~7;
- const alignedStart = roundUpToMultipleOf8(start + bytesRead);
- if (alignedEnd - alignedStart >= kMinPoolSpace) {
- poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
+ let b = null;
+ // Now that we know how much data we have actually read, re-wind the
+ // 'used' field if we can, and otherwise allow the remainder of our
+ // reservation to be used as a new pool later.
+ if (start + toRead === thisPool.used && thisPool === pool) {
+ const newUsed = thisPool.used + bytesRead - toRead;
+ thisPool.used = roundUpToMultipleOf8(newUsed);
+ } else {
+ // Round down to the next lowest multiple of 8 to ensure the new pool
+ // fragment start and end positions are aligned to an 8 byte boundary.
+ const alignedEnd = (start + toRead) & ~7;
+ const alignedStart = roundUpToMultipleOf8(start + bytesRead);
+ if (alignedEnd - alignedStart >= kMinPoolSpace) {
+ poolFragments.push(thisPool.slice(alignedStart, alignedEnd));
+ }
}
- }
- if (bytesRead > 0) {
- this.bytesRead += bytesRead;
- b = thisPool.slice(start, start + bytesRead);
- }
+ if (bytesRead > 0) {
+ this.bytesRead += bytesRead;
+ b = thisPool.slice(start, start + bytesRead);
+ }
- this.push(b);
- }
- });
+ this.push(b);
+ }
+ });
// Move the pool positions, and internal position for reading.
if (this.pos !== undefined)
@@ -245,7 +265,7 @@ ReadStream.prototype._destroy = function(err, cb) {
};
function closeFsStream(stream, cb, err) {
- fs.close(stream.fd, (er) => {
+ stream[kFs].close(stream.fd, (er) => {
er = er || err;
cb(er);
stream.closed = true;
@@ -279,6 +299,40 @@ function WriteStream(path, options) {
options.emitClose = false;
}
+ this[kFs] = options.fs || fs;
+ if (typeof this[kFs].open !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
+ this[kFs].open);
+ }
+
+ if (!this[kFs].write && !this[kFs].writev) {
+ throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
+ this[kFs].write);
+ }
+
+ if (this[kFs].write && typeof this[kFs].write !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('options.fs.write', 'function',
+ this[kFs].write);
+ }
+
+ if (this[kFs].writev && typeof this[kFs].writev !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('options.fs.writev', 'function',
+ this[kFs].writev);
+ }
+
+ if (typeof this[kFs].close !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE('options.fs.close', 'function',
+ this[kFs].close);
+ }
+
+ // It's enough to override either, in which case only one will be used.
+ if (!this[kFs].write) {
+ this._write = null;
+ }
+ if (!this[kFs].writev) {
+ this._writev = null;
+ }
+
Writable.call(this, options);
// Path will be ignored when fd is specified, so it can be falsy
@@ -335,7 +389,7 @@ function _openWriteFs(stream) {
return;
}
- fs.open(stream.path, stream.flags, stream.mode, (er, fd) => {
+ stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
if (er) {
if (stream.autoClose) {
stream.destroy();
@@ -361,7 +415,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
this[kIsPerformingIO] = true;
- fs.write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
+ this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
@@ -405,7 +459,7 @@ WriteStream.prototype._writev = function(data, cb) {
}
this[kIsPerformingIO] = true;
- fs.writev(this.fd, chunks, this.pos, (er, bytes) => {
+ this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {