From 39c2a4c07611004f008c2d687e102874cb023928 Mon Sep 17 00:00:00 2001 From: Doctor <44320105+BlackAsLight@users.noreply.github.com> Date: Tue, 21 May 2024 08:22:01 +1000 Subject: [PATCH] feat(streams): `concatReadableStreams()` (#4747) * feat(streams): `new ConcatStreams()` * refactor(streams): new ConcatStream to be a ReadableStream instead - Converted ConcatStream from a TransformStream into a ReadableStream, also now with proper cleaning up if the `.cancel()` method is called. * adjust(streams): ConcatStreams class into function * Adjust(streams): based off comments * adjust(streams): Remove redundant locking * adjust(streams): based off comments * tweaks * fix * tweak * add Leo as co-author Co-authored-by: crowlKats --------- Co-authored-by: Asher Gomez Co-authored-by: crowlKats --- streams/concat_readable_streams.ts | 50 +++++++++++++ streams/concat_readable_streams_test.ts | 95 +++++++++++++++++++++++++ streams/deno.json | 1 + streams/mod.ts | 1 + 4 files changed, 147 insertions(+) create mode 100644 streams/concat_readable_streams.ts create mode 100644 streams/concat_readable_streams_test.ts diff --git a/streams/concat_readable_streams.ts b/streams/concat_readable_streams.ts new file mode 100644 index 000000000..a49d4befe --- /dev/null +++ b/streams/concat_readable_streams.ts @@ -0,0 +1,50 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +/** + * Concatenates multiple `ReadableStream`s into a single ordered + * `ReadableStream`. + * + * Cancelling the resulting stream will cancel all the input streams. + * + * @typeParam T Type of the chunks in the streams. + * + * @param streams An iterable of `ReadableStream`s. + * + * @example Usage + * ```ts + * import { concatReadableStreams } from "@std/streams/concat-readable-streams"; + * import { assertEquals } from "@std/assert/assert-equals"; + * + * const stream1 = ReadableStream.from([1, 2, 3]); + * const stream2 = ReadableStream.from([4, 5, 6]); + * const stream3 = ReadableStream.from([7, 8, 9]); + * + * assertEquals( + * await Array.fromAsync(concatReadableStreams(stream1, stream2, stream3)), + * [1, 2, 3, 4, 5, 6, 7, 8, 9], + * ); + * ``` + */ +export function concatReadableStreams( + ...streams: ReadableStream[] +): ReadableStream { + let i = 0; + return new ReadableStream({ + async pull(controller) { + const reader = streams[i]!.getReader(); + const { done, value } = await reader.read(); + if (done) { + if (streams.length === ++i) { + return controller.close(); + } + return await this.pull!(controller); + } + controller.enqueue(value); + reader.releaseLock(); + }, + async cancel(reason) { + const promises = streams.map((stream) => stream.cancel(reason)); + await Promise.allSettled(promises); + }, + }); +} diff --git a/streams/concat_readable_streams_test.ts b/streams/concat_readable_streams_test.ts new file mode 100644 index 000000000..50123ddab --- /dev/null +++ b/streams/concat_readable_streams_test.ts @@ -0,0 +1,95 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { assertEquals, assertRejects } from "../assert/mod.ts"; +import { concatReadableStreams } from "./concat_readable_streams.ts"; + +Deno.test("concatStreams()", async () => { + const readable1 = ReadableStream.from([1, 2, 3]); + const readable2 = ReadableStream.from([4, 5, 6]); + const readable3 = ReadableStream.from([7, 8, 9]); + + assertEquals( + await Array.fromAsync( + concatReadableStreams(readable1, readable2, readable3), + ), + [ + 1, + 2, + 3, + 4, + 5, + 6, + 7, + 8, + 9, + ], + ); +}); + +Deno.test("concatStreams() with empty streams", async () => { + const readable1 = ReadableStream.from([]); + const readable2 = ReadableStream.from([]); + const readable3 = ReadableStream.from([]); + + assertEquals( + await Array.fromAsync( + concatReadableStreams(readable1, readable2, readable3), + ), + [], + ); +}); + +Deno.test("concatStreams() with one empty stream", async () => { + const readable1 = ReadableStream.from([1, 2, 3]); + const readable2 = ReadableStream.from([]); + const readable3 = ReadableStream.from([7, 8, 9]); + + assertEquals( + await Array.fromAsync( + concatReadableStreams(readable1, readable2, readable3), + ), + [ + 1, + 2, + 3, + 7, + 8, + 9, + ], + ); +}); + +Deno.test("concatStreams() handles errors", async () => { + const readable1 = ReadableStream.from([1, 2, 3]); + const readable2 = ReadableStream.from(async function* () { + yield 4; + yield 5; + yield 6; + throw new TypeError("I am an error!"); + }()); + const readable3 = ReadableStream.from([7, 8, 9]); + + const results: number[] = []; + await assertRejects( + async () => { + for await ( + const value of concatReadableStreams(readable1, readable2, readable3) + ) { + results.push(value); + } + }, + TypeError, + "I am an error!", + ); + assertEquals( + results, + [ + 1, + 2, + 3, + 4, + 5, + 6, + ], + ); +}); diff --git a/streams/deno.json b/streams/deno.json index d137ce64d..b48b6a5d7 100644 --- a/streams/deno.json +++ b/streams/deno.json @@ -5,6 +5,7 @@ ".": "./mod.ts", "./buffer": "./buffer.ts", "./byte-slice-stream": "./byte_slice_stream.ts", + "./concat-readable-streams": "./concat_readable_streams.ts", "./delimiter-stream": "./delimiter_stream.ts", "./early-zip-readable-streams": "./early_zip_readable_streams.ts", "./iterate-reader": "./iterate_reader.ts", diff --git a/streams/mod.ts b/streams/mod.ts index 05010eedb..cfd801e25 100644 --- a/streams/mod.ts +++ b/streams/mod.ts @@ -10,6 +10,7 @@ export * from "./buffer.ts"; export * from "./byte_slice_stream.ts"; +export * from "./concat_readable_streams.ts"; export * from "./delimiter_stream.ts"; export * from "./early_zip_readable_streams.ts"; export * from "./iterate_reader.ts";