From 5bcbb8f7f1d59494c6df32738d9d0a1aea49d6b8 Mon Sep 17 00:00:00 2001 From: Doctor <44320105+BlackAsLight@users.noreply.github.com> Date: Thu, 20 Jun 2024 15:36:58 +1000 Subject: [PATCH] fix(streams): prevent `earlyZipReadableStreams()` from possibly using excessive memory (#5082) * refactor(streams): `earlyZipReadableStreams` * add(streams): test for cancelling stream * chore(streams): fmt * nit(streams): Make one line into two Co-authored-by: Asher Gomez * improve(streams): test to assert all streams were actually cancelled * adjust(streams): reason for cancelling streams * tweak cancel reason * tweak --------- Co-authored-by: Asher Gomez --- streams/early_zip_readable_streams.ts | 33 +++++++++++----------- streams/early_zip_readable_streams_test.ts | 21 ++++++++++++++ 2 files changed, 37 insertions(+), 17 deletions(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 800392dab..cb857244d 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -84,26 +84,25 @@ export function earlyZipReadableStreams( ...streams: ReadableStream[] ): ReadableStream { - const readers = streams.map((s) => s.getReader()); + const readers = streams.map((stream) => stream.getReader()); return new ReadableStream({ - async start(controller) { - try { - loop: - while (true) { - for (const reader of readers) { - const { value, done } = await reader.read(); - if (!done) { - controller.enqueue(value!); - } else { - await Promise.all(readers.map((reader) => reader.cancel())); - break loop; - } - } + async pull(controller) { + for (let i = 0; i < readers.length; ++i) { + const { done, value } = await readers[i]!.read(); + if (done) { + await Promise.all( + readers.map((reader) => + reader.cancel(`Stream at index ${i} ended`) + ), + ); + controller.close(); + return; } - controller.close(); - } catch (e) { - controller.error(e); + controller.enqueue(value); } }, + async cancel(reason) { + await Promise.all(readers.map((reader) => reader.cancel(reason))); + }, }); } diff --git a/streams/early_zip_readable_streams_test.ts b/streams/early_zip_readable_streams_test.ts index 315a17600..0efdf55a1 100644 --- a/streams/early_zip_readable_streams_test.ts +++ b/streams/early_zip_readable_streams_test.ts @@ -61,6 +61,27 @@ Deno.test("earlyZipReadableStreams() can zip three streams", async () => { ]); }); +Deno.test("earlyZipReadableStreams() forwards cancel()", async () => { + const num = 10; + let cancelled = 0; + const streams = new Array(num).fill(false).map(() => + new ReadableStream( + { + pull(controller) { + controller.enqueue("chunk"); + }, + cancel(reason) { + cancelled++; + assertEquals(reason, "I was cancelled!"); + }, + }, + ) + ); + + await earlyZipReadableStreams(...streams).cancel("I was cancelled!"); + assertEquals(cancelled, num); +}); + Deno.test("earlyZipReadableStreams() controller error", async () => { const errorMsg = "Test error"; const stream = new ReadableStream({