mirror of
https://github.com/denoland/std.git
synced 2024-11-21 12:40:03 +00:00
feat(streams): concatReadableStreams()
(#4747)
* feat(streams): `new ConcatStreams()` * refactor(streams): new ConcatStream to be a ReadableStream instead - Converted ConcatStream from a TransformStream into a ReadableStream, also now with proper cleaning up if the `.cancel()` method is called. * adjust(streams): ConcatStreams class into function * Adjust(streams): based off comments * adjust(streams): Remove redundant locking * adjust(streams): based off comments * tweaks * fix * tweak * add Leo as co-author Co-authored-by: crowlKats <crowlkats@toaxl.com> --------- Co-authored-by: Asher Gomez <ashersaupingomez@gmail.com> Co-authored-by: crowlKats <crowlkats@toaxl.com>
This commit is contained in:
parent
52116b2dd6
commit
39c2a4c076
50
streams/concat_readable_streams.ts
Normal file
50
streams/concat_readable_streams.ts
Normal file
@ -0,0 +1,50 @@
|
||||
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
/**
|
||||
* Concatenates multiple `ReadableStream`s into a single ordered
|
||||
* `ReadableStream`.
|
||||
*
|
||||
* Cancelling the resulting stream will cancel all the input streams.
|
||||
*
|
||||
* @typeParam T Type of the chunks in the streams.
|
||||
*
|
||||
* @param streams An iterable of `ReadableStream`s.
|
||||
*
|
||||
* @example Usage
|
||||
* ```ts
|
||||
* import { concatReadableStreams } from "@std/streams/concat-readable-streams";
|
||||
* import { assertEquals } from "@std/assert/assert-equals";
|
||||
*
|
||||
* const stream1 = ReadableStream.from([1, 2, 3]);
|
||||
* const stream2 = ReadableStream.from([4, 5, 6]);
|
||||
* const stream3 = ReadableStream.from([7, 8, 9]);
|
||||
*
|
||||
* assertEquals(
|
||||
* await Array.fromAsync(concatReadableStreams(stream1, stream2, stream3)),
|
||||
* [1, 2, 3, 4, 5, 6, 7, 8, 9],
|
||||
* );
|
||||
* ```
|
||||
*/
|
||||
export function concatReadableStreams<T>(
|
||||
...streams: ReadableStream<T>[]
|
||||
): ReadableStream<T> {
|
||||
let i = 0;
|
||||
return new ReadableStream<T>({
|
||||
async pull(controller) {
|
||||
const reader = streams[i]!.getReader();
|
||||
const { done, value } = await reader.read();
|
||||
if (done) {
|
||||
if (streams.length === ++i) {
|
||||
return controller.close();
|
||||
}
|
||||
return await this.pull!(controller);
|
||||
}
|
||||
controller.enqueue(value);
|
||||
reader.releaseLock();
|
||||
},
|
||||
async cancel(reason) {
|
||||
const promises = streams.map((stream) => stream.cancel(reason));
|
||||
await Promise.allSettled(promises);
|
||||
},
|
||||
});
|
||||
}
|
95
streams/concat_readable_streams_test.ts
Normal file
95
streams/concat_readable_streams_test.ts
Normal file
@ -0,0 +1,95 @@
|
||||
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
||||
|
||||
import { assertEquals, assertRejects } from "../assert/mod.ts";
|
||||
import { concatReadableStreams } from "./concat_readable_streams.ts";
|
||||
|
||||
Deno.test("concatStreams()", async () => {
|
||||
const readable1 = ReadableStream.from([1, 2, 3]);
|
||||
const readable2 = ReadableStream.from([4, 5, 6]);
|
||||
const readable3 = ReadableStream.from([7, 8, 9]);
|
||||
|
||||
assertEquals(
|
||||
await Array.fromAsync(
|
||||
concatReadableStreams(readable1, readable2, readable3),
|
||||
),
|
||||
[
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
7,
|
||||
8,
|
||||
9,
|
||||
],
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test("concatStreams() with empty streams", async () => {
|
||||
const readable1 = ReadableStream.from([]);
|
||||
const readable2 = ReadableStream.from([]);
|
||||
const readable3 = ReadableStream.from([]);
|
||||
|
||||
assertEquals(
|
||||
await Array.fromAsync(
|
||||
concatReadableStreams(readable1, readable2, readable3),
|
||||
),
|
||||
[],
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test("concatStreams() with one empty stream", async () => {
|
||||
const readable1 = ReadableStream.from([1, 2, 3]);
|
||||
const readable2 = ReadableStream.from([]);
|
||||
const readable3 = ReadableStream.from([7, 8, 9]);
|
||||
|
||||
assertEquals(
|
||||
await Array.fromAsync(
|
||||
concatReadableStreams(readable1, readable2, readable3),
|
||||
),
|
||||
[
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
7,
|
||||
8,
|
||||
9,
|
||||
],
|
||||
);
|
||||
});
|
||||
|
||||
Deno.test("concatStreams() handles errors", async () => {
|
||||
const readable1 = ReadableStream.from([1, 2, 3]);
|
||||
const readable2 = ReadableStream.from(async function* () {
|
||||
yield 4;
|
||||
yield 5;
|
||||
yield 6;
|
||||
throw new TypeError("I am an error!");
|
||||
}());
|
||||
const readable3 = ReadableStream.from([7, 8, 9]);
|
||||
|
||||
const results: number[] = [];
|
||||
await assertRejects(
|
||||
async () => {
|
||||
for await (
|
||||
const value of concatReadableStreams(readable1, readable2, readable3)
|
||||
) {
|
||||
results.push(value);
|
||||
}
|
||||
},
|
||||
TypeError,
|
||||
"I am an error!",
|
||||
);
|
||||
assertEquals(
|
||||
results,
|
||||
[
|
||||
1,
|
||||
2,
|
||||
3,
|
||||
4,
|
||||
5,
|
||||
6,
|
||||
],
|
||||
);
|
||||
});
|
@ -5,6 +5,7 @@
|
||||
".": "./mod.ts",
|
||||
"./buffer": "./buffer.ts",
|
||||
"./byte-slice-stream": "./byte_slice_stream.ts",
|
||||
"./concat-readable-streams": "./concat_readable_streams.ts",
|
||||
"./delimiter-stream": "./delimiter_stream.ts",
|
||||
"./early-zip-readable-streams": "./early_zip_readable_streams.ts",
|
||||
"./iterate-reader": "./iterate_reader.ts",
|
||||
|
@ -10,6 +10,7 @@
|
||||
|
||||
export * from "./buffer.ts";
|
||||
export * from "./byte_slice_stream.ts";
|
||||
export * from "./concat_readable_streams.ts";
|
||||
export * from "./delimiter_stream.ts";
|
||||
export * from "./early_zip_readable_streams.ts";
|
||||
export * from "./iterate_reader.ts";
|
||||
|
Loading…
Reference in New Issue
Block a user