fix(streams): prevent earlyZipReadableStreams() from possibly using excessive memory (#5082)

* refactor(streams): `earlyZipReadableStreams`

* add(streams): test for cancelling stream

* chore(streams): fmt

* nit(streams): Make one line into two

Co-authored-by: Asher Gomez <ashersaupingomez@gmail.com>

* improve(streams): test to assert all streams were actually cancelled

* adjust(streams): reason for cancelling streams

* tweak cancel reason

* tweak

---------

Co-authored-by: Asher Gomez <ashersaupingomez@gmail.com>
This commit is contained in:
Doctor 2024-06-20 15:36:58 +10:00 committed by GitHub
parent b75d42a329
commit 5bcbb8f7f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 17 deletions

View File

@ -84,26 +84,25 @@
export function earlyZipReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
const readers = streams.map((s) => s.getReader());
const readers = streams.map((stream) => stream.getReader());
return new ReadableStream<T>({
async start(controller) {
try {
loop:
while (true) {
for (const reader of readers) {
const { value, done } = await reader.read();
if (!done) {
controller.enqueue(value!);
} else {
await Promise.all(readers.map((reader) => reader.cancel()));
break loop;
}
}
}
async pull(controller) {
for (let i = 0; i < readers.length; ++i) {
const { done, value } = await readers[i]!.read();
if (done) {
await Promise.all(
readers.map((reader) =>
reader.cancel(`Stream at index ${i} ended`)
),
);
controller.close();
} catch (e) {
controller.error(e);
return;
}
controller.enqueue(value);
}
},
async cancel(reason) {
await Promise.all(readers.map((reader) => reader.cancel(reason)));
},
});
}

View File

@ -61,6 +61,27 @@ Deno.test("earlyZipReadableStreams() can zip three streams", async () => {
]);
});
Deno.test("earlyZipReadableStreams() forwards cancel()", async () => {
const num = 10;
let cancelled = 0;
const streams = new Array(num).fill(false).map(() =>
new ReadableStream(
{
pull(controller) {
controller.enqueue("chunk");
},
cancel(reason) {
cancelled++;
assertEquals(reason, "I was cancelled!");
},
},
)
);
await earlyZipReadableStreams(...streams).cancel("I was cancelled!");
assertEquals(cancelled, num);
});
Deno.test("earlyZipReadableStreams() controller error", async () => {
const errorMsg = "Test error";
const stream = new ReadableStream({