Welcome to mirror list, hosted at ThFree Co, Russian Federation.

github.com/nodejs/node.git - Unnamed repository; edit this file 'description' to name the repository.
summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnna Henningsen <anna@addaleax.net>2019-07-03 13:38:05 +0300
committerBeth Griggs <Bethany.Griggs@uk.ibm.com>2019-10-17 23:37:20 +0300
commite8c90bf4d1312b35f066001a183844356bf79e33 (patch)
treeb37496f91510773b17edde3879223c4c343cb585
parentddb5152e9b8a914fd62b85e19455d5885fe30faf (diff)
zlib: do not coalesce multiple `.flush()` calls
This is an approach to address the issue linked below. Previously, when `.write()` and `.flush()` calls to a zlib stream were interleaved synchronously (i.e. without waiting for these operations to finish), multiple flush calls would have been coalesced into a single flushing operation. This patch changes behaviour so that each `.flush()` all corresponds to one flushing operation on the underlying zlib resource, and the order of operations is as if the `.flush()` call were a `.write()` call. One test had to be removed because it specifically tested the previous behaviour. As a drive-by fix, this also makes sure that all flush callbacks are called. Previously, that was not the case. Fixes: https://github.com/nodejs/node/issues/28478 PR-URL: https://github.com/nodejs/node/pull/28520 Reviewed-By: Rich Trott <rtrott@gmail.com> Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de> Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
-rw-r--r--lib/zlib.js35
-rw-r--r--test/parallel/test-zlib-flush-multiple-scheduled.js39
-rw-r--r--test/parallel/test-zlib-flush-write-sync-interleaved.js57
-rw-r--r--test/parallel/test-zlib-write-after-flush.js1
4 files changed, 79 insertions, 53 deletions
diff --git a/lib/zlib.js b/lib/zlib.js
index 1b2b64f09cb..6cb3cd99dc1 100644
--- a/lib/zlib.js
+++ b/lib/zlib.js
@@ -45,6 +45,8 @@ const {
} = require('buffer');
const { owner_symbol } = require('internal/async_hooks').symbols;
+const kFlushFlag = Symbol('kFlushFlag');
+
const constants = process.binding('constants').zlib;
const {
// Zlib flush levels
@@ -259,7 +261,6 @@ function ZlibBase(opts, mode, handle, { flush, finishFlush, fullFlush }) {
this._chunkSize = chunkSize;
this._defaultFlushFlag = flush;
this._finishFlushFlag = finishFlush;
- this._nextFlush = -1;
this._defaultFullFlushFlag = fullFlush;
this.once('end', this.close);
this._info = opts && opts.info;
@@ -304,13 +305,16 @@ ZlibBase.prototype._flush = function(callback) {
// If a flush is scheduled while another flush is still pending, a way to figure
// out which one is the "stronger" flush is needed.
+// This is currently only used to figure out which flush flag to use for the
+// last chunk.
// Roughly, the following holds:
// Z_NO_FLUSH (< Z_TREES) < Z_BLOCK < Z_PARTIAL_FLUSH <
// Z_SYNC_FLUSH < Z_FULL_FLUSH < Z_FINISH
const flushiness = [];
let i = 0;
-for (const flushFlag of [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
- Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH]) {
+const kFlushFlagList = [Z_NO_FLUSH, Z_BLOCK, Z_PARTIAL_FLUSH,
+ Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH];
+for (const flushFlag of kFlushFlagList) {
flushiness[flushFlag] = i++;
}
@@ -318,7 +322,18 @@ function maxFlush(a, b) {
return flushiness[a] > flushiness[b] ? a : b;
}
-const flushBuffer = Buffer.alloc(0);
+// Set up a list of 'special' buffers that can be written using .write()
+// from the .flush() code as a way of introducing flushing operations into the
+// write sequence.
+const kFlushBuffers = [];
+{
+ const dummyArrayBuffer = new ArrayBuffer();
+ for (const flushFlag of kFlushFlagList) {
+ kFlushBuffers[flushFlag] = Buffer.from(dummyArrayBuffer);
+ kFlushBuffers[flushFlag][kFlushFlag] = flushFlag;
+ }
+}
+
ZlibBase.prototype.flush = function(kind, callback) {
var ws = this._writableState;
@@ -333,13 +348,8 @@ ZlibBase.prototype.flush = function(kind, callback) {
} else if (ws.ending) {
if (callback)
this.once('end', callback);
- } else if (this._nextFlush !== -1) {
- // This means that there is a flush currently in the write queue.
- // We currently coalesce this flush into the pending one.
- this._nextFlush = maxFlush(this._nextFlush, kind);
} else {
- this._nextFlush = kind;
- this.write(flushBuffer, '', callback);
+ this.write(kFlushBuffers[kind], '', callback);
}
};
@@ -357,9 +367,8 @@ ZlibBase.prototype._transform = function(chunk, encoding, cb) {
var flushFlag = this._defaultFlushFlag;
// We use a 'fake' zero-length chunk to carry information about flushes from
// the public API to the actual stream implementation.
- if (chunk === flushBuffer) {
- flushFlag = this._nextFlush;
- this._nextFlush = -1;
+ if (typeof chunk[kFlushFlag] === 'number') {
+ flushFlag = chunk[kFlushFlag];
}
// For the last chunk, also apply `_finishFlushFlag`.
diff --git a/test/parallel/test-zlib-flush-multiple-scheduled.js b/test/parallel/test-zlib-flush-multiple-scheduled.js
deleted file mode 100644
index 0b752557e44..00000000000
--- a/test/parallel/test-zlib-flush-multiple-scheduled.js
+++ /dev/null
@@ -1,39 +0,0 @@
-'use strict';
-
-const common = require('../common');
-const assert = require('assert');
-const zlib = require('zlib');
-
-const {
- Z_PARTIAL_FLUSH, Z_SYNC_FLUSH, Z_FULL_FLUSH, Z_FINISH
-} = zlib.constants;
-
-async function getOutput(...sequenceOfFlushes) {
- const zipper = zlib.createGzip({ highWaterMark: 16384 });
-
- zipper.write('A'.repeat(17000));
- for (const flush of sequenceOfFlushes) {
- zipper.flush(flush);
- }
-
- const data = [];
-
- return new Promise((resolve) => {
- zipper.on('data', common.mustCall((d) => {
- data.push(d);
- if (data.length === 2) resolve(Buffer.concat(data));
- }, 2));
- });
-}
-
-(async function() {
- assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH),
- await getOutput(Z_SYNC_FLUSH, Z_PARTIAL_FLUSH));
- assert.deepStrictEqual(await getOutput(Z_SYNC_FLUSH),
- await getOutput(Z_PARTIAL_FLUSH, Z_SYNC_FLUSH));
-
- assert.deepStrictEqual(await getOutput(Z_FINISH),
- await getOutput(Z_FULL_FLUSH, Z_FINISH));
- assert.deepStrictEqual(await getOutput(Z_FINISH),
- await getOutput(Z_SYNC_FLUSH, Z_FINISH));
-})();
diff --git a/test/parallel/test-zlib-flush-write-sync-interleaved.js b/test/parallel/test-zlib-flush-write-sync-interleaved.js
new file mode 100644
index 00000000000..9fed592a34b
--- /dev/null
+++ b/test/parallel/test-zlib-flush-write-sync-interleaved.js
@@ -0,0 +1,57 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+const { createGzip, createGunzip, Z_PARTIAL_FLUSH } = require('zlib');
+
+// Verify that .flush() behaves like .write() in terms of ordering, e.g. in
+// a sequence like .write() + .flush() + .write() + .flush() each .flush() call
+// only affects the data written before it.
+// Refs: https://github.com/nodejs/node/issues/28478
+
+const compress = createGzip();
+const decompress = createGunzip();
+decompress.setEncoding('utf8');
+
+const events = [];
+const compressedChunks = [];
+
+for (const chunk of ['abc', 'def', 'ghi']) {
+ compress.write(chunk, common.mustCall(() => events.push({ written: chunk })));
+ compress.flush(Z_PARTIAL_FLUSH, common.mustCall(() => {
+ events.push('flushed');
+ const chunk = compress.read();
+ if (chunk !== null)
+ compressedChunks.push(chunk);
+ }));
+}
+
+compress.end(common.mustCall(() => {
+ events.push('compress end');
+ writeToDecompress();
+}));
+
+function writeToDecompress() {
+ // Write the compressed chunks to a decompressor, one by one, in order to
+ // verify that the flushes actually worked.
+ const chunk = compressedChunks.shift();
+ if (chunk === undefined) return decompress.end();
+ decompress.write(chunk, common.mustCall(() => {
+ events.push({ read: decompress.read() });
+ writeToDecompress();
+ }));
+}
+
+process.on('exit', () => {
+ assert.deepStrictEqual(events, [
+ { written: 'abc' },
+ 'flushed',
+ { written: 'def' },
+ 'flushed',
+ { written: 'ghi' },
+ 'flushed',
+ 'compress end',
+ { read: 'abc' },
+ { read: 'def' },
+ { read: 'ghi' }
+ ]);
+});
diff --git a/test/parallel/test-zlib-write-after-flush.js b/test/parallel/test-zlib-write-after-flush.js
index 2fcae2a2139..6edcae2e2f1 100644
--- a/test/parallel/test-zlib-write-after-flush.js
+++ b/test/parallel/test-zlib-write-after-flush.js
@@ -39,7 +39,6 @@ for (const [ createCompress, createDecompress ] of [
gunz.on('data', (c) => output += c);
gunz.on('end', common.mustCall(() => {
assert.strictEqual(output, input);
- assert.strictEqual(gzip._nextFlush, -1);
}));
// Make sure that flush/write doesn't trigger an assert failure