node/test/parallel/test-readline-async-iterators-backpressure.js
Thiago Oliveira Santos 0f3e531096
lib: performance improvement on readline async iterator
Using a direct approach to create the readline async iterator
allowed an iteration over 20 to 58% faster.

**BREAKING CHANGE**: With that change, the async iteterator
obtained from the readline interface doesn't have the
property "stream" any longer. This happened because it's no
longer created through a Readable, instead, the async
iterator is created directly from the events of the readline
interface instance, so, if anyone is using that property,
this change will break their code.
Also, the Readable added a backpressure control that is
fairly compensated by the use of FixedQueue + monitoring
its size. This control wasn't really precise with readline
before, though, because it only pauses the reading of the
original stream, but the lines generated from the last
message received from it was still emitted. For example:
if the readable was paused at 1000 messages but the last one
received generated 10k lines, but no further messages were
emitted again until the queue was lower than the readable
highWaterMark. A similar  behavior still happens with the
new implementation, but the highWaterMark used is fixed: 1024,
and the original stream is resumed again only after the queue
is cleared.

Before making that change, I created a package implementing
the same concept used here to validate it. You can find it
[here](https://github.com/Farenheith/faster-readline-iterator)
if this helps anyhow.

PR-URL: https://github.com/nodejs/node/pull/41276
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
2022-10-24 12:49:16 +00:00

59 lines
1.7 KiB
JavaScript

'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable } = require('stream');
const readline = require('readline');
const CONTENT = 'content';
const LINES_PER_PUSH = 2051;
const REPETITIONS = 3;
(async () => {
const readable = new Readable({ read() {} });
let salt = 0;
for (let i = 0; i < REPETITIONS; i++) {
readable.push(`${CONTENT}\n`.repeat(LINES_PER_PUSH + i));
salt += i;
}
const TOTAL_LINES = LINES_PER_PUSH * REPETITIONS + salt;
const rli = readline.createInterface({
input: readable,
crlfDelay: Infinity
});
const it = rli[Symbol.asyncIterator]();
const watermarkData = it[Symbol.for('nodejs.watermarkData')];
const highWaterMark = watermarkData.high;
// For this test to work, we have to queue up more than the number of
// highWaterMark items in rli. Make sure that is the case.
assert(TOTAL_LINES > highWaterMark, `TOTAL_LINES (${TOTAL_LINES}) isn't greater than highWaterMark (${highWaterMark})`);
let iterations = 0;
let readableEnded = false;
let notPaused = 0;
for await (const line of it) {
assert.strictEqual(readableEnded, false);
assert.strictEqual(line, CONTENT);
assert.ok(watermarkData.size <= TOTAL_LINES);
assert.strictEqual(readable.isPaused(), watermarkData.size >= 1);
if (!readable.isPaused()) {
notPaused++;
}
iterations += 1;
// We have to end the input stream asynchronously for back pressure to work.
// Only end when we have reached the final line.
if (iterations === TOTAL_LINES) {
readable.push(null);
readableEnded = true;
}
}
assert.strictEqual(iterations, TOTAL_LINES);
assert.strictEqual(notPaused, REPETITIONS);
})().then(common.mustCall());