mirror of
https://github.com/nodejs/node.git
synced 2024-11-21 10:59:27 +00:00
b0f423390f
this is done so we don't wait for the first items to finish before starting new ones Fixes: https://github.com/nodejs/node/issues/46132 Co-authored-by: Robert Nagy <ronagy@icloud.com> PR-URL: https://github.com/nodejs/node/pull/49249 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com>
361 lines
9.7 KiB
JavaScript
361 lines
9.7 KiB
JavaScript
'use strict';
|
|
|
|
const common = require('../common');
|
|
const {
|
|
Readable,
|
|
} = require('stream');
|
|
const assert = require('assert');
|
|
const { once } = require('events');
|
|
const { setTimeout } = require('timers/promises');
|
|
|
|
function createDependentPromises(n) {
|
|
const promiseAndResolveArray = [];
|
|
|
|
for (let i = 0; i < n; i++) {
|
|
let res;
|
|
const promise = new Promise((resolve) => {
|
|
if (i === 0) {
|
|
res = resolve;
|
|
return;
|
|
}
|
|
res = () => promiseAndResolveArray[i - 1][0].then(resolve);
|
|
});
|
|
|
|
promiseAndResolveArray.push([promise, res]);
|
|
}
|
|
|
|
return promiseAndResolveArray;
|
|
}
|
|
|
|
{
|
|
// Map works on synchronous streams with a synchronous mapper
|
|
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
|
|
(async () => {
|
|
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
|
|
})().then(common.mustCall());
|
|
}
|
|
|
|
{
|
|
// Map works on synchronous streams with an asynchronous mapper
|
|
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
|
|
await Promise.resolve();
|
|
return x + x;
|
|
});
|
|
(async () => {
|
|
assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
|
|
})().then(common.mustCall());
|
|
}
|
|
|
|
{
|
|
// Map works on asynchronous streams with a asynchronous mapper
|
|
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
|
|
return x + x;
|
|
}).map((x) => x + x);
|
|
(async () => {
|
|
assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]);
|
|
})().then(common.mustCall());
|
|
}
|
|
|
|
{
|
|
// Map works on an infinite stream
|
|
const stream = Readable.from(async function* () {
|
|
while (true) yield 1;
|
|
}()).map(common.mustCall(async (x) => {
|
|
return x + x;
|
|
}, 5));
|
|
(async () => {
|
|
let i = 1;
|
|
for await (const item of stream) {
|
|
assert.strictEqual(item, 2);
|
|
if (++i === 5) break;
|
|
}
|
|
})().then(common.mustCall());
|
|
}
|
|
|
|
{
|
|
// Map works on non-objectMode streams
|
|
const stream = new Readable({
|
|
read() {
|
|
this.push(Uint8Array.from([1]));
|
|
this.push(Uint8Array.from([2]));
|
|
this.push(null);
|
|
}
|
|
}).map(async ([x]) => {
|
|
return x + x;
|
|
}).map((x) => x + x);
|
|
const result = [4, 8];
|
|
(async () => {
|
|
for await (const item of stream) {
|
|
assert.strictEqual(item, result.shift());
|
|
}
|
|
})().then(common.mustCall());
|
|
}
|
|
|
|
{
|
|
// Does not care about data events
|
|
const source = new Readable({
|
|
read() {
|
|
this.push(Uint8Array.from([1]));
|
|
this.push(Uint8Array.from([2]));
|
|
this.push(null);
|
|
}
|
|
});
|
|
setImmediate(() => stream.emit('data', Uint8Array.from([1])));
|
|
const stream = source.map(async ([x]) => {
|
|
return x + x;
|
|
}).map((x) => x + x);
|
|
const result = [4, 8];
|
|
(async () => {
|
|
for await (const item of stream) {
|
|
assert.strictEqual(item, result.shift());
|
|
}
|
|
})().then(common.mustCall());
|
|
}
|
|
|
|
{
|
|
// Emitting an error during `map`
|
|
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
|
|
if (x === 3) {
|
|
stream.emit('error', new Error('boom'));
|
|
}
|
|
return x + x;
|
|
});
|
|
assert.rejects(
|
|
stream.map((x) => x + x).toArray(),
|
|
/boom/,
|
|
).then(common.mustCall());
|
|
}
|
|
|
|
{
|
|
// Throwing an error during `map` (sync)
|
|
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => {
|
|
if (x === 3) {
|
|
throw new Error('boom');
|
|
}
|
|
return x + x;
|
|
});
|
|
assert.rejects(
|
|
stream.map((x) => x + x).toArray(),
|
|
/boom/,
|
|
).then(common.mustCall());
|
|
}
|
|
|
|
|
|
{
|
|
// Throwing an error during `map` (async)
|
|
const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
|
|
if (x === 3) {
|
|
throw new Error('boom');
|
|
}
|
|
return x + x;
|
|
});
|
|
assert.rejects(
|
|
stream.map((x) => x + x).toArray(),
|
|
/boom/,
|
|
).then(common.mustCall());
|
|
}
|
|
|
|
{
|
|
// Concurrency + AbortSignal
|
|
const ac = new AbortController();
|
|
const range = Readable.from([1, 2, 3, 4, 5]);
|
|
const stream = range.map(common.mustCall(async (_, { signal }) => {
|
|
await once(signal, 'abort');
|
|
throw signal.reason;
|
|
}, 2), { signal: ac.signal, concurrency: 2, highWaterMark: 0 });
|
|
// pump
|
|
assert.rejects(async () => {
|
|
for await (const item of stream) {
|
|
assert.fail('should not reach here, got ' + item);
|
|
}
|
|
}, {
|
|
name: 'AbortError',
|
|
}).then(common.mustCall());
|
|
|
|
setImmediate(() => {
|
|
ac.abort();
|
|
});
|
|
}
|
|
|
|
{
|
|
// Concurrency result order
|
|
const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
|
|
await setTimeout(10 - item, { signal });
|
|
return item;
|
|
}, { concurrency: 2 });
|
|
|
|
(async () => {
|
|
const expected = [1, 2];
|
|
for await (const item of stream) {
|
|
assert.strictEqual(item, expected.shift());
|
|
}
|
|
})().then(common.mustCall());
|
|
}
|
|
|
|
|
|
{
|
|
// highWaterMark with small concurrency
|
|
const finishOrder = [];
|
|
|
|
const promises = createDependentPromises(4);
|
|
|
|
const raw = Readable.from([2, 0, 1, 3]);
|
|
const stream = raw.map(async (item) => {
|
|
const [promise, resolve] = promises[item];
|
|
resolve();
|
|
|
|
await promise;
|
|
finishOrder.push(item);
|
|
return item;
|
|
}, { concurrency: 2 });
|
|
|
|
(async () => {
|
|
await stream.toArray();
|
|
|
|
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3]);
|
|
})().then(common.mustCall(), common.mustNotCall());
|
|
}
|
|
|
|
{
|
|
// highWaterMark with a lot of items and large concurrency
|
|
const finishOrder = [];
|
|
|
|
const promises = createDependentPromises(20);
|
|
|
|
const input = [10, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 11, 12, 13, 18, 15, 16, 17, 14, 19];
|
|
const raw = Readable.from(input);
|
|
// Should be
|
|
// 10, 1, 0, 3, 4, 2 | next: 0
|
|
// 10, 1, 3, 4, 2, 5 | next: 1
|
|
// 10, 3, 4, 2, 5, 7 | next: 2
|
|
// 10, 3, 4, 5, 7, 8 | next: 3
|
|
// 10, 4, 5, 7, 8, 9 | next: 4
|
|
// 10, 5, 7, 8, 9, 6 | next: 5
|
|
// 10, 7, 8, 9, 6, 11 | next: 6
|
|
// 10, 7, 8, 9, 11, 12 | next: 7
|
|
// 10, 8, 9, 11, 12, 13 | next: 8
|
|
// 10, 9, 11, 12, 13, 18 | next: 9
|
|
// 10, 11, 12, 13, 18, 15 | next: 10
|
|
// 11, 12, 13, 18, 15, 16 | next: 11
|
|
// 12, 13, 18, 15, 16, 17 | next: 12
|
|
// 13, 18, 15, 16, 17, 14 | next: 13
|
|
// 18, 15, 16, 17, 14, 19 | next: 14
|
|
// 18, 15, 16, 17, 19 | next: 15
|
|
// 18, 16, 17, 19 | next: 16
|
|
// 18, 17, 19 | next: 17
|
|
// 18, 19 | next: 18
|
|
// 19 | next: 19
|
|
//
|
|
|
|
const stream = raw.map(async (item) => {
|
|
const [promise, resolve] = promises[item];
|
|
resolve();
|
|
|
|
await promise;
|
|
finishOrder.push(item);
|
|
return item;
|
|
}, { concurrency: 6 });
|
|
|
|
(async () => {
|
|
const outputOrder = await stream.toArray();
|
|
|
|
assert.deepStrictEqual(outputOrder, input);
|
|
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
|
|
})().then(common.mustCall(), common.mustNotCall());
|
|
}
|
|
|
|
{
|
|
// Custom highWaterMark with a lot of items and large concurrency
|
|
const finishOrder = [];
|
|
|
|
const promises = createDependentPromises(20);
|
|
|
|
const input = [11, 1, 0, 3, 4, 2, 5, 7, 8, 9, 6, 10, 12, 13, 18, 15, 16, 17, 14, 19];
|
|
const raw = Readable.from(input);
|
|
// Should be
|
|
// 11, 1, 0, 3, 4 | next: 0, buffer: []
|
|
// 11, 1, 3, 4, 2 | next: 1, buffer: [0]
|
|
// 11, 3, 4, 2, 5 | next: 2, buffer: [0, 1]
|
|
// 11, 3, 4, 5, 7 | next: 3, buffer: [0, 1, 2]
|
|
// 11, 4, 5, 7, 8 | next: 4, buffer: [0, 1, 2, 3]
|
|
// 11, 5, 7, 8, 9 | next: 5, buffer: [0, 1, 2, 3, 4]
|
|
// 11, 7, 8, 9, 6 | next: 6, buffer: [0, 1, 2, 3, 4, 5]
|
|
// 11, 7, 8, 9, 10 | next: 7, buffer: [0, 1, 2, 3, 4, 5, 6] -- buffer full
|
|
// 11, 8, 9, 10, 12 | next: 8, buffer: [0, 1, 2, 3, 4, 5, 6]
|
|
// 11, 9, 10, 12, 13 | next: 9, buffer: [0, 1, 2, 3, 4, 5, 6]
|
|
// 11, 10, 12, 13, 18 | next: 10, buffer: [0, 1, 2, 3, 4, 5, 6]
|
|
// 11, 12, 13, 18, 15 | next: 11, buffer: [0, 1, 2, 3, 4, 5, 6]
|
|
// 12, 13, 18, 15, 16 | next: 12, buffer: [] -- all items flushed as 11 is consumed and all the items wait for it
|
|
// 13, 18, 15, 16, 17 | next: 13, buffer: []
|
|
// 18, 15, 16, 17, 14 | next: 14, buffer: []
|
|
// 18, 15, 16, 17, 19 | next: 15, buffer: [14]
|
|
// 18, 16, 17, 19 | next: 16, buffer: [14, 15]
|
|
// 18, 17, 19 | next: 17, buffer: [14, 15, 16]
|
|
// 18, 19 | next: 18, buffer: [14, 15, 16, 17]
|
|
// 19 | next: 19, buffer: [] -- all items flushed
|
|
//
|
|
|
|
const stream = raw.map(async (item) => {
|
|
const [promise, resolve] = promises[item];
|
|
resolve();
|
|
|
|
await promise;
|
|
finishOrder.push(item);
|
|
return item;
|
|
}, { concurrency: 5, highWaterMark: 7 });
|
|
|
|
(async () => {
|
|
const outputOrder = await stream.toArray();
|
|
|
|
assert.deepStrictEqual(outputOrder, input);
|
|
assert.deepStrictEqual(finishOrder, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19]);
|
|
})().then(common.mustCall(), common.mustNotCall());
|
|
}
|
|
|
|
{
|
|
// Where there is a delay between the first and the next item it should not wait for filled queue
|
|
// before yielding to the user
|
|
const promises = createDependentPromises(3);
|
|
|
|
const raw = Readable.from([0, 1, 2]);
|
|
|
|
const stream = raw
|
|
.map(async (item) => {
|
|
if (item !== 0) {
|
|
await promises[item][0];
|
|
}
|
|
|
|
return item;
|
|
}, { concurrency: 2 })
|
|
.map((item) => {
|
|
// eslint-disable-next-line no-unused-vars
|
|
for (const [_, resolve] of promises) {
|
|
resolve();
|
|
}
|
|
|
|
return item;
|
|
});
|
|
|
|
(async () => {
|
|
await stream.toArray();
|
|
})().then(common.mustCall(), common.mustNotCall());
|
|
}
|
|
|
|
{
|
|
// Error cases
|
|
assert.throws(() => Readable.from([1]).map(1), /ERR_INVALID_ARG_TYPE/);
|
|
assert.throws(() => Readable.from([1]).map((x) => x, {
|
|
concurrency: 'Foo'
|
|
}), /ERR_OUT_OF_RANGE/);
|
|
assert.throws(() => Readable.from([1]).map((x) => x, {
|
|
concurrency: -1
|
|
}), /ERR_OUT_OF_RANGE/);
|
|
assert.throws(() => Readable.from([1]).map((x) => x, 1), /ERR_INVALID_ARG_TYPE/);
|
|
assert.throws(() => Readable.from([1]).map((x) => x, { signal: true }), /ERR_INVALID_ARG_TYPE/);
|
|
}
|
|
{
|
|
// Test result is a Readable
|
|
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
|
|
assert.strictEqual(stream.readable, true);
|
|
}
|