stream: bump default highWaterMark

This should give a performance boost accross the board.

Given that the old limit is a decod old and memory capacity has
doubled many times since I think it is appropriate to slightly bump
the default limit.

PR-URL: https://github.com/nodejs/node/pull/52037
Refs: https://github.com/nodejs/node/pull/46608
Refs: https://github.com/nodejs/node/pull/50120
Reviewed-By: Rafael Gonzaga <rafael.nunu@hotmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz.nizipli@sentry.io>
Reviewed-By: Chengzhong Wu <legendecas@gmail.com>
Reviewed-By: Moshe Atlow <moshe@atlow.co.il>
Reviewed-By: Mohammed Keyvanzadeh <mohammadkeyvanzade94@gmail.com>
Reviewed-By: Trivikram Kamat <trivikr.dev@gmail.com>
Reviewed-By: Ruben Bridgewater <ruben@bridgewater.de>
This commit is contained in:
Robert Nagy 2024-03-13 20:02:14 +01:00 committed by GitHub
parent 57d2e4881c
commit 1abff07392
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 34 additions and 19 deletions

View File

@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT; const PORT = common.PORT;
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [64, 102400, 1024 * 1024 * 16], len: [64, 102400, 1024 * 64 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}, { }, {

View File

@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT; const PORT = common.PORT;
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [2, 64, 102400, 1024 * 1024 * 16], len: [2, 64, 102400, 1024 * 64 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}, { }, {

View File

@ -5,7 +5,7 @@ const common = require('../common.js');
const PORT = common.PORT; const PORT = common.PORT;
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 1024 * 1024], sendchunklen: [256, 32 * 1024, 128 * 1024, 16 * 64 * 1024],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
recvbuflen: [0, 64 * 1024, 1024 * 1024], recvbuflen: [0, 64 * 1024, 1024 * 1024],
recvbufgenfn: ['true', 'false'], recvbufgenfn: ['true', 'false'],

View File

@ -5,7 +5,7 @@ const common = require('../common.js');
const { PassThrough } = require('stream'); const { PassThrough } = require('stream');
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [64, 102400, 1024 * 1024 * 16], len: [64, 102400, 1024 * 64 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}, { }, {

View File

@ -9,7 +9,7 @@ const util = require('util');
// run the function with those settings. // run the function with those settings.
// if not, then queue up a bunch of child processes. // if not, then queue up a bunch of child processes.
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16], len: [102400, 1024 * 64 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}, { }, {

View File

@ -9,7 +9,7 @@ const util = require('util');
// run the function with those settings. // run the function with those settings.
// if not, then queue up a bunch of child processes. // if not, then queue up a bunch of child processes.
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16], len: [102400, 1024 * 64 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}, { }, {

View File

@ -9,7 +9,7 @@ const util = require('util');
// run the function with those settings. // run the function with those settings.
// If not, then queue up a bunch of child processes. // If not, then queue up a bunch of child processes.
const bench = common.createBenchmark(main, { const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16], len: [102400, 1024 * 64 * 16],
type: ['utf', 'asc', 'buf'], type: ['utf', 'asc', 'buf'],
dur: [5], dur: [5],
}, { }, {

View File

@ -3479,6 +3479,9 @@ method.
<!-- YAML <!-- YAML
changes: changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/52037
description: bump default highWaterMark.
- version: v15.5.0 - version: v15.5.0
pr-url: https://github.com/nodejs/node/pull/36431 pr-url: https://github.com/nodejs/node/pull/36431
description: support passing in an AbortSignal. description: support passing in an AbortSignal.
@ -3500,7 +3503,7 @@ changes:
* `options` {Object} * `options` {Object}
* `highWaterMark` {number} Buffer level when * `highWaterMark` {number} Buffer level when
[`stream.write()`][stream-write] starts returning `false`. **Default:** [`stream.write()`][stream-write] starts returning `false`. **Default:**
`16384` (16 KiB), or `16` for `objectMode` streams. `65536` (64 KiB), or `16` for `objectMode` streams.
* `decodeStrings` {boolean} Whether to encode `string`s passed to * `decodeStrings` {boolean} Whether to encode `string`s passed to
[`stream.write()`][stream-write] to `Buffer`s (with the encoding [`stream.write()`][stream-write] to `Buffer`s (with the encoding
specified in the [`stream.write()`][stream-write] call) before passing specified in the [`stream.write()`][stream-write] call) before passing
@ -3856,6 +3859,9 @@ constructor and implement the [`readable._read()`][] method.
<!-- YAML <!-- YAML
changes: changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/52037
description: bump default highWaterMark.
- version: v15.5.0 - version: v15.5.0
pr-url: https://github.com/nodejs/node/pull/36431 pr-url: https://github.com/nodejs/node/pull/36431
description: support passing in an AbortSignal. description: support passing in an AbortSignal.
@ -3873,7 +3879,7 @@ changes:
* `options` {Object} * `options` {Object}
* `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store * `highWaterMark` {number} The maximum [number of bytes][hwm-gotcha] to store
in the internal buffer before ceasing to read from the underlying resource. in the internal buffer before ceasing to read from the underlying resource.
**Default:** `16384` (16 KiB), or `16` for `objectMode` streams. **Default:** `65536` (64 KiB), or `16` for `objectMode` streams.
* `encoding` {string} If specified, then buffers will be decoded to * `encoding` {string} If specified, then buffers will be decoded to
strings using the specified encoding. **Default:** `null`. strings using the specified encoding. **Default:** `null`.
* `objectMode` {boolean} Whether this stream should behave * `objectMode` {boolean} Whether this stream should behave

View File

@ -8,7 +8,8 @@ const { validateInteger } = require('internal/validators');
const { ERR_INVALID_ARG_VALUE } = require('internal/errors').codes; const { ERR_INVALID_ARG_VALUE } = require('internal/errors').codes;
let defaultHighWaterMarkBytes = 16 * 1024; // TODO (fix): For some reason Windows CI fails with bigger hwm.
let defaultHighWaterMarkBytes = process.platform === 'win32' ? 16 * 1024 : 64 * 1024;
let defaultHighWaterMarkObjectMode = 16; let defaultHighWaterMarkObjectMode = 16;
function highWaterMarkFrom(options, isDuplex, duplexKey) { function highWaterMarkFrom(options, isDuplex, duplexKey) {

View File

@ -34,7 +34,7 @@ const server = http.createServer((req, res) => {
}, common.mustCall((res) => { }, common.mustCall((res) => {
res.resume(); res.resume();
post.write(Buffer.alloc(16 * 1024).fill('X')); post.write(Buffer.alloc(64 * 1024).fill('X'));
onPause = () => { onPause = () => {
post.end('something'); post.end('something');
}; };

View File

@ -62,5 +62,5 @@ const httpsServer = https.createServer({
port: this.address().port, port: this.address().port,
rejectUnauthorized: false, rejectUnauthorized: false,
highWaterMark: undefined, highWaterMark: undefined,
}, loadCallback(16 * 1024)).on('error', common.mustNotCall()).end(); }, loadCallback(process.platform === 'win32' ? 16 * 1024 : 64 * 1024)).on('error', common.mustNotCall()).end();
})); }));

View File

@ -7,6 +7,7 @@ const stream = require('stream');
let loops = 5; let loops = 5;
const src = new stream.Readable({ const src = new stream.Readable({
highWaterMark: 16 * 1024,
read() { read() {
if (loops--) if (loops--)
this.push(Buffer.alloc(20000)); this.push(Buffer.alloc(20000));
@ -14,6 +15,7 @@ const src = new stream.Readable({
}); });
const dst = new stream.Transform({ const dst = new stream.Transform({
highWaterMark: 16 * 1024,
transform(chunk, output, fn) { transform(chunk, output, fn) {
this.push(null); this.push(null);
fn(); fn();

View File

@ -4,6 +4,7 @@ const stream = require('stream');
const assert = require('assert'); const assert = require('assert');
const writable = new stream.Writable({ const writable = new stream.Writable({
highWaterMark: 16 * 1024,
write: common.mustCall(function(chunk, encoding, cb) { write: common.mustCall(function(chunk, encoding, cb) {
assert.strictEqual( assert.strictEqual(
readable._readableState.awaitDrainWriters, readable._readableState.awaitDrainWriters,
@ -26,6 +27,7 @@ const writable = new stream.Writable({
// A readable stream which produces two buffers. // A readable stream which produces two buffers.
const bufs = [Buffer.alloc(32 * 1024), Buffer.alloc(33 * 1024)]; // above hwm const bufs = [Buffer.alloc(32 * 1024), Buffer.alloc(33 * 1024)]; // above hwm
const readable = new stream.Readable({ const readable = new stream.Readable({
highWaterMark: 16 * 1024,
read: function() { read: function() {
while (bufs.length > 0) { while (bufs.length > 0) {
this.push(bufs.shift()); this.push(bufs.shift());

View File

@ -7,6 +7,7 @@ const { Readable } = require('stream');
const buf = Buffer.alloc(8192); const buf = Buffer.alloc(8192);
const readable = new Readable({ const readable = new Readable({
highWaterMark: 16 * 1024,
read: common.mustCall(function() { read: common.mustCall(function() {
this.push(buf); this.push(buf);
}, 31) }, 31)

View File

@ -2,9 +2,9 @@
require('../common'); require('../common');
const assert = require('assert'); const assert = require('assert');
const { Transform, Readable, Writable } = require('stream'); const { Transform, Readable, Writable, getDefaultHighWaterMark } = require('stream');
const DEFAULT = 16 * 1024; const DEFAULT = getDefaultHighWaterMark();
function testTransform(expectedReadableHwm, expectedWritableHwm, options) { function testTransform(expectedReadableHwm, expectedWritableHwm, options) {
const t = new Transform(options); const t = new Transform(options);

View File

@ -25,12 +25,14 @@ const assert = require('assert');
const Transform = require('stream').Transform; const Transform = require('stream').Transform;
const parser = new Transform({ readableObjectMode: true }); const parser = new Transform({
readableObjectMode: true
});
assert(parser._readableState.objectMode); assert(parser._readableState.objectMode);
assert(!parser._writableState.objectMode); assert(!parser._writableState.objectMode);
assert.strictEqual(parser.readableHighWaterMark, 16); assert.strictEqual(parser.readableHighWaterMark, 16);
assert.strictEqual(parser.writableHighWaterMark, 16 * 1024); assert.strictEqual(parser.writableHighWaterMark, process.platform === 'win32' ? 16 * 1024 : 64 * 1024);
assert.strictEqual(parser.readableHighWaterMark, assert.strictEqual(parser.readableHighWaterMark,
parser._readableState.highWaterMark); parser._readableState.highWaterMark);
assert.strictEqual(parser.writableHighWaterMark, assert.strictEqual(parser.writableHighWaterMark,
@ -57,7 +59,7 @@ const serializer = new Transform({ writableObjectMode: true });
assert(!serializer._readableState.objectMode); assert(!serializer._readableState.objectMode);
assert(serializer._writableState.objectMode); assert(serializer._writableState.objectMode);
assert.strictEqual(serializer.readableHighWaterMark, 16 * 1024); assert.strictEqual(serializer.readableHighWaterMark, process.platform === 'win32' ? 16 * 1024 : 64 * 1024);
assert.strictEqual(serializer.writableHighWaterMark, 16); assert.strictEqual(serializer.writableHighWaterMark, 16);
assert.strictEqual(parser.readableHighWaterMark, assert.strictEqual(parser.readableHighWaterMark,
parser._readableState.highWaterMark); parser._readableState.highWaterMark);

View File

@ -37,7 +37,7 @@ server.listen(0, common.mustCall(() => {
rejectUnauthorized: false, rejectUnauthorized: false,
highWaterMark: undefined, highWaterMark: undefined,
}, common.mustCall(() => { }, common.mustCall(() => {
assert.strictEqual(defaultHighBob.readableHighWaterMark, 16 * 1024); assert.strictEqual(defaultHighBob.readableHighWaterMark, process.platform === 'win32' ? 16 * 1024 : 64 * 1024);
defaultHighBob.end(); defaultHighBob.end();
})); }));

View File

@ -3,6 +3,7 @@
const common = require('../common'); const common = require('../common');
const { createBrotliDecompress } = require('node:zlib'); const { createBrotliDecompress } = require('node:zlib');
const strictEqual = require('node:assert').strictEqual; const strictEqual = require('node:assert').strictEqual;
const { getDefaultHighWaterMark } = require('stream');
// This tiny HEX string is a 16GB file. // This tiny HEX string is a 16GB file.
// This test verifies that the stream actually stops. // This test verifies that the stream actually stops.
@ -18,5 +19,5 @@ decoder.end(buf);
// to process the data and the buffer is not empty. // to process the data and the buffer is not empty.
setTimeout(common.mustCall(() => { setTimeout(common.mustCall(() => {
// There is only one chunk in the buffer // There is only one chunk in the buffer
strictEqual(decoder._readableState.buffer.length, 1); strictEqual(decoder._readableState.buffer.length, getDefaultHighWaterMark() / (16 * 1024));
}), common.platformTimeout(500)); }), common.platformTimeout(500));