Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2020-03-15 17:20:46 +0300
committerRobert Nagy <ronagy@icloud.com>2020-05-27 11:24:33 +0300
commit54b36e401d2b72d95e5f1dbbc787f6beed639347 (patch)
tree7007e41615b6e3fe09543a4f09f46231fbb10e26 /lib/internal/fs
parentfb8cc72e738f2854302bf270b2f3997bc273b9a6 (diff)
fs: reimplement read and write streams using stream.construct
Refs: #23133 PR-URL: https://github.com/nodejs/node/pull/29656 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net> Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Diffstat (limited to 'lib/internal/fs')
-rw-r--r--lib/internal/fs/streams.js230
1 files changed, 97 insertions, 133 deletions
diff --git a/lib/internal/fs/streams.js b/lib/internal/fs/streams.js
index 9ce21721faa..9e6050139dc 100644
--- a/lib/internal/fs/streams.js
+++ b/lib/internal/fs/streams.js
@@ -10,11 +10,11 @@ const {
const {
ERR_INVALID_ARG_TYPE,
- ERR_OUT_OF_RANGE,
- ERR_STREAM_DESTROYED
+ ERR_OUT_OF_RANGE
} = require('internal/errors').codes;
const { deprecate } = require('internal/util');
const { validateInteger } = require('internal/validators');
+const { errorOrDestroy } = require('internal/streams/destroy');
const fs = require('fs');
const { Buffer } = require('buffer');
const {
@@ -49,6 +49,57 @@ function roundUpToMultipleOf8(n) {
return (n + 7) & ~7; // Align to 8 byte boundary.
}
+function _construct(callback) {
+ const stream = this;
+ if (typeof stream.fd === 'number') {
+ callback();
+ return;
+ }
+
+ if (stream.open !== openWriteFs && stream.open !== openReadFs) {
+ // Backwards compat for monkey patching open().
+ const orgEmit = stream.emit;
+ stream.emit = function(...args) {
+ if (args[0] === 'open') {
+ this.emit = orgEmit;
+ callback();
+ orgEmit.apply(this, args);
+ } else if (args[0] === 'error') {
+ this.emit = orgEmit;
+ callback(args[1]);
+ } else {
+ orgEmit.apply(this, args);
+ }
+ };
+ stream.open();
+ } else {
+ stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
+ if (er) {
+ callback(er);
+ } else {
+ stream.fd = fd;
+ callback();
+ stream.emit('open', stream.fd);
+ stream.emit('ready');
+ }
+ });
+ }
+}
+
+function close(stream, err, cb) {
+ if (!stream.fd) {
+ // TODO(ronag)
+ // stream.closed = true;
+ cb(err);
+ } else {
+ stream[kFs].close(stream.fd, (er) => {
+ stream.closed = true;
+ cb(er || err);
+ });
+ stream.fd = null;
+ }
+}
+
function ReadStream(path, options) {
if (!(this instanceof ReadStream))
return new ReadStream(path, options);
@@ -79,7 +130,8 @@ function ReadStream(path, options) {
this[kFs].close);
}
- Readable.call(this, options);
+ options.autoDestroy = options.autoClose === undefined ?
+ true : options.autoClose;
// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
@@ -89,7 +141,6 @@ function ReadStream(path, options) {
this.start = options.start;
this.end = options.end;
- this.autoClose = options.autoClose === undefined ? true : options.autoClose;
this.pos = undefined;
this.bytesRead = 0;
this.closed = false;
@@ -115,56 +166,28 @@ function ReadStream(path, options) {
}
}
- if (typeof this.fd !== 'number')
- _openReadFs(this);
-
- this.on('end', function() {
- if (this.autoClose) {
- this.destroy();
- }
- });
+ Readable.call(this, options);
}
ObjectSetPrototypeOf(ReadStream.prototype, Readable.prototype);
ObjectSetPrototypeOf(ReadStream, Readable);
+ObjectDefineProperty(ReadStream.prototype, 'autoClose', {
+ get() {
+ return this._readableState.autoDestroy;
+ },
+ set(val) {
+ this._readableState.autoDestroy = val;
+ }
+});
+
const openReadFs = deprecate(function() {
- _openReadFs(this);
+ // Noop.
}, 'ReadStream.prototype.open() is deprecated', 'DEP0135');
ReadStream.prototype.open = openReadFs;
-function _openReadFs(stream) {
- // Backwards compat for overriden open.
- if (stream.open !== openReadFs) {
- stream.open();
- return;
- }
-
- stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
- if (er) {
- if (stream.autoClose) {
- stream.destroy();
- }
- stream.emit('error', er);
- return;
- }
-
- stream.fd = fd;
- stream.emit('open', fd);
- stream.emit('ready');
- // Start the flow of data.
- stream.read();
- });
-}
+ReadStream.prototype._construct = _construct;
ReadStream.prototype._read = function(n) {
- if (typeof this.fd !== 'number') {
- return this.once('open', function() {
- this._read(n);
- });
- }
-
- if (this.destroyed) return;
-
if (!pool || pool.length - pool.used < kMinPoolSpace) {
// Discard the old pool.
allocNewPool(this.readableHighWaterMark);
@@ -189,17 +212,14 @@ ReadStream.prototype._read = function(n) {
// the actual read.
this[kIsPerformingIO] = true;
- this[kFs].read(
- this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
+ this[kFs]
+ .read(this.fd, pool, pool.used, toRead, this.pos, (er, bytesRead) => {
this[kIsPerformingIO] = false;
// Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) return this.emit(kIoDone, er);
if (er) {
- if (this.autoClose) {
- this.destroy();
- }
- this.emit('error', er);
+ errorOrDestroy(this, er);
} else {
let b = null;
// Now that we know how much data we have actually read, re-wind the
@@ -235,28 +255,13 @@ ReadStream.prototype._read = function(n) {
};
ReadStream.prototype._destroy = function(err, cb) {
- if (typeof this.fd !== 'number') {
- this.once('open', closeFsStream.bind(null, this, cb, err));
- return;
- }
-
if (this[kIsPerformingIO]) {
- this.once(kIoDone, (er) => closeFsStream(this, cb, err || er));
- return;
+ this.once(kIoDone, (er) => close(this, err || er, cb));
+ } else {
+ close(this, err, cb);
}
-
- closeFsStream(this, cb, err);
};
-function closeFsStream(stream, cb, err) {
- stream[kFs].close(stream.fd, (er) => {
- stream.closed = true;
- cb(er || err);
- });
-
- stream.fd = null;
-}
-
ReadStream.prototype.close = function(cb) {
if (typeof cb === 'function') finished(this, cb);
this.destroy();
@@ -276,11 +281,6 @@ function WriteStream(path, options) {
// Only buffers are supported.
options.decodeStrings = true;
- if (options.autoDestroy === undefined) {
- options.autoDestroy = options.autoClose === undefined ?
- true : (options.autoClose || false);
- }
-
this[kFs] = options.fs || fs;
if (typeof this[kFs].open !== 'function') {
throw new ERR_INVALID_ARG_TYPE('options.fs.open', 'function',
@@ -315,7 +315,8 @@ function WriteStream(path, options) {
this._writev = null;
}
- Writable.call(this, options);
+ options.autoDestroy = options.autoClose === undefined ?
+ true : options.autoClose;
// Path will be ignored when fd is specified, so it can be falsy
this.path = toPathIfFileURL(path);
@@ -324,7 +325,6 @@ function WriteStream(path, options) {
this.mode = options.mode === undefined ? 0o666 : options.mode;
this.start = options.start;
- this.autoClose = options.autoDestroy;
this.pos = undefined;
this.bytesWritten = 0;
this.closed = false;
@@ -336,67 +336,36 @@ function WriteStream(path, options) {
this.pos = this.start;
}
+ Writable.call(this, options);
+
if (options.encoding)
this.setDefaultEncoding(options.encoding);
-
- if (typeof this.fd !== 'number')
- _openWriteFs(this);
}
ObjectSetPrototypeOf(WriteStream.prototype, Writable.prototype);
ObjectSetPrototypeOf(WriteStream, Writable);
-WriteStream.prototype._final = function(callback) {
- if (typeof this.fd !== 'number') {
- return this.once('open', function() {
- this._final(callback);
- });
+ObjectDefineProperty(WriteStream.prototype, 'autoClose', {
+ get() {
+ return this._writableState.autoDestroy;
+ },
+ set(val) {
+ this._writableState.autoDestroy = val;
}
-
- callback();
-};
+});
const openWriteFs = deprecate(function() {
- _openWriteFs(this);
+ // Noop.
}, 'WriteStream.prototype.open() is deprecated', 'DEP0135');
WriteStream.prototype.open = openWriteFs;
-function _openWriteFs(stream) {
- // Backwards compat for overriden open.
- if (stream.open !== openWriteFs) {
- stream.open();
- return;
- }
-
- stream[kFs].open(stream.path, stream.flags, stream.mode, (er, fd) => {
- if (er) {
- if (stream.autoClose) {
- stream.destroy();
- }
- stream.emit('error', er);
- return;
- }
-
- stream.fd = fd;
- stream.emit('open', fd);
- stream.emit('ready');
- });
-}
-
+WriteStream.prototype._construct = _construct;
WriteStream.prototype._write = function(data, encoding, cb) {
- if (typeof this.fd !== 'number') {
- return this.once('open', function() {
- this._write(data, encoding, cb);
- });
- }
-
- if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
-
this[kIsPerformingIO] = true;
this[kFs].write(this.fd, data, 0, data.length, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
- // Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
+ // Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}
@@ -404,6 +373,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
if (er) {
return cb(er);
}
+
this.bytesWritten += bytes;
cb();
});
@@ -412,16 +382,7 @@ WriteStream.prototype._write = function(data, encoding, cb) {
this.pos += data.length;
};
-
WriteStream.prototype._writev = function(data, cb) {
- if (typeof this.fd !== 'number') {
- return this.once('open', function() {
- this._writev(data, cb);
- });
- }
-
- if (this.destroyed) return cb(new ERR_STREAM_DESTROYED('write'));
-
const len = data.length;
const chunks = new Array(len);
let size = 0;
@@ -436,18 +397,16 @@ WriteStream.prototype._writev = function(data, cb) {
this[kIsPerformingIO] = true;
this[kFs].writev(this.fd, chunks, this.pos, (er, bytes) => {
this[kIsPerformingIO] = false;
- // Tell ._destroy() that it's safe to close the fd now.
if (this.destroyed) {
+ // Tell ._destroy() that it's safe to close the fd now.
cb(er);
return this.emit(kIoDone, er);
}
if (er) {
- if (this.autoClose) {
- this.destroy(er);
- }
return cb(er);
}
+
this.bytesWritten += bytes;
cb();
});
@@ -456,8 +415,13 @@ WriteStream.prototype._writev = function(data, cb) {
this.pos += size;
};
-
-WriteStream.prototype._destroy = ReadStream.prototype._destroy;
+WriteStream.prototype._destroy = function(err, cb) {
+ if (this[kIsPerformingIO]) {
+ this.once(kIoDone, (er) => close(this, err || er, cb));
+ } else {
+ close(this, err, cb);
+ }
+};
WriteStream.prototype.close = function(cb) {
if (cb) {
if (this.closed) {