diff --git a/streams/concat_readable_streams_test.ts b/streams/concat_readable_streams_test.ts index 50123ddab..7bacda986 100644 --- a/streams/concat_readable_streams_test.ts +++ b/streams/concat_readable_streams_test.ts @@ -93,3 +93,24 @@ Deno.test("concatStreams() handles errors", async () => { ], ); }); + +Deno.test("concatReadableStreams cancels all streams when concatenated stream is cancelled", async () => { + const reasons: string[] = []; + const createMockStream = () => + new ReadableStream({ + start(controller) { + controller.enqueue("data"); + }, + cancel(error) { + reasons.push(error); + }, + }); + + const stream1 = createMockStream(); + const stream2 = createMockStream(); + const concatenatedStream = concatReadableStreams(stream1, stream2); + + await concatenatedStream.cancel("Test cancel"); + + assertEquals(reasons, ["Test cancel", "Test cancel"]); +}); diff --git a/streams/early_zip_readable_streams_test.ts b/streams/early_zip_readable_streams_test.ts index b6d4529ff..315a17600 100644 --- a/streams/early_zip_readable_streams_test.ts +++ b/streams/early_zip_readable_streams_test.ts @@ -1,7 +1,7 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. import { earlyZipReadableStreams } from "./early_zip_readable_streams.ts"; -import { assertEquals } from "@std/assert"; +import { assertEquals, assertRejects } from "@std/assert"; Deno.test("earlyZipReadableStreams() handles short first", async () => { const textStream = ReadableStream.from(["1", "2", "3"]); @@ -60,3 +60,24 @@ Deno.test("earlyZipReadableStreams() can zip three streams", async () => { "3", ]); }); + +Deno.test("earlyZipReadableStreams() controller error", async () => { + const errorMsg = "Test error"; + const stream = new ReadableStream({ + start(controller) { + controller.enqueue("This will succeed"); + }, + pull() { + throw new Error(errorMsg); + }, + }); + + const zippedStream = earlyZipReadableStreams(stream); + const reader = zippedStream.getReader(); + + assertEquals(await reader.read(), { + value: "This will succeed", + done: false, + }); + await assertRejects(async () => await reader.read(), Error, errorMsg); +}); diff --git a/streams/text_delimiter_stream.ts b/streams/text_delimiter_stream.ts index a78cd31fb..84dc1ad73 100644 --- a/streams/text_delimiter_stream.ts +++ b/streams/text_delimiter_stream.ts @@ -86,7 +86,7 @@ export class TextDelimiterStream extends TransformStream { */ constructor( delimiter: string, - options: DelimiterStreamOptions = { disposition: "discard" }, + options?: DelimiterStreamOptions, ) { super({ transform: (chunk, controller) => { @@ -99,7 +99,7 @@ export class TextDelimiterStream extends TransformStream { this.#delimiter = delimiter; this.#delimLPS = createLPS(new TextEncoder().encode(delimiter)); - this.#disp = options.disposition ?? "discard"; + this.#disp = options?.disposition ?? "discard"; } #handle( diff --git a/streams/zip_readable_streams_test.ts b/streams/zip_readable_streams_test.ts index 31ba218c1..8aa0773ed 100644 --- a/streams/zip_readable_streams_test.ts +++ b/streams/zip_readable_streams_test.ts @@ -1,6 +1,6 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -import { assertEquals } from "@std/assert"; +import { assertEquals, assertRejects } from "@std/assert"; import { zipReadableStreams } from "./zip_readable_streams.ts"; Deno.test("zipReadableStreams()", async () => { @@ -29,3 +29,25 @@ Deno.test("zipReadableStreams()", async () => { "qwertzuiopasq123d", ]); }); + +Deno.test("zipReadableStreams handles errors by closing the stream with an error", async () => { + const errorStream = new ReadableStream({ + start(controller) { + controller.enqueue("Initial data"); + }, + pull() { + throw new Error("Test error during read"); + }, + }); + const normalStream = ReadableStream.from(["Normal data"]); + const zippedStream = zipReadableStreams(errorStream, normalStream); + const reader = zippedStream.getReader(); + + assertEquals(await reader.read(), { value: "Initial data", done: false }); + assertEquals(await reader.read(), { value: "Normal data", done: false }); + await assertRejects( + async () => await reader.read(), + Error, + "Test error during read", + ); +});