From 77345498a2c9863dce006f13782f2ee18445b718 Mon Sep 17 00:00:00 2001 From: Doctor <44320105+BlackAsLight@users.noreply.github.com> Date: Wed, 7 Aug 2024 18:34:54 +1000 Subject: [PATCH] feat(streams/unstable): `FixedChunkStream` (#4995) * feat(streams): new ResizeStreams(size) * chore(streams): Renamed ResizeStream to FixedChunkStream * Update streams/fixed_chunk_stream_test.ts * update * tweaks * tweak test * fix(streams): chunks getting lost if size was equally divisible by `size` * work * work --------- Co-authored-by: Yoshiya Hinosawa Co-authored-by: Asher Gomez --- streams/deno.json | 1 + streams/fixed_chunk_stream.ts | 64 ++++++++++++++++++++++++++++++ streams/fixed_chunk_stream_test.ts | 18 +++++++++ streams/mod.ts | 1 + 4 files changed, 84 insertions(+) create mode 100644 streams/fixed_chunk_stream.ts create mode 100644 streams/fixed_chunk_stream_test.ts diff --git a/streams/deno.json b/streams/deno.json index 01ca3346c..dd1479816 100644 --- a/streams/deno.json +++ b/streams/deno.json @@ -11,6 +11,7 @@ "./limited-bytes-transform-stream": "./limited_bytes_transform_stream.ts", "./limited-transform-stream": "./limited_transform_stream.ts", "./merge-readable-streams": "./merge_readable_streams.ts", + "./fixed-chunk-stream": "./fixed_chunk_stream.ts", "./text-delimiter-stream": "./text_delimiter_stream.ts", "./text-line-stream": "./text_line_stream.ts", "./to-array-buffer": "./to_array_buffer.ts", diff --git a/streams/fixed_chunk_stream.ts b/streams/fixed_chunk_stream.ts new file mode 100644 index 000000000..6431a1fcf --- /dev/null +++ b/streams/fixed_chunk_stream.ts @@ -0,0 +1,64 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +/** + * A transform stream that resize {@linkcode Uint8Array} chunks into perfectly + * `size` chunks with the exception of the last chunk. + * + * > [!WARNING] + * > **UNSTABLE**: New API, yet to be vetted. + * + * @experimental + * + * @example Usage + * ```ts + * import { FixedChunkStream } from "@std/streams/fixed-chunk-stream"; + * import { assertEquals } from "@std/assert/equals"; + * + * const readable = ReadableStream.from(function* () { + * let count = 0 + * for (let i = 0; i < 100; ++i) { + * const array = new Uint8Array(Math.floor(Math.random() * 1000)); + * count += array.length; + * yield array; + * } + * yield new Uint8Array(512 - count % 512) + * }()) + * .pipeThrough(new FixedChunkStream(512)) + * .pipeTo(new WritableStream({ + * write(chunk, _controller) { + * assertEquals(chunk.length, 512) + * } + * })) + * ``` + */ +export class FixedChunkStream extends TransformStream { + /** + * Constructs a new instance. + * + * @param size The size of the chunks to be resized to. + */ + constructor(size: number) { + let push: Uint8Array | undefined; + super({ + transform(chunk, controller) { + if (push !== undefined) { + const concat = new Uint8Array(push.length + chunk.length); + concat.set(push); + concat.set(chunk, push.length); + chunk = concat; + } + + for (let i = size; i <= chunk.length; i += size) { + controller.enqueue(chunk.slice(i - size, i)); + } + const remainder = -chunk.length % size; + push = remainder ? chunk.slice(remainder) : undefined; + }, + flush(controller) { + if (push?.length) { + controller.enqueue(push); + } + }, + }); + } +} diff --git a/streams/fixed_chunk_stream_test.ts b/streams/fixed_chunk_stream_test.ts new file mode 100644 index 000000000..ba68b9c40 --- /dev/null +++ b/streams/fixed_chunk_stream_test.ts @@ -0,0 +1,18 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +import { assert, assertLessOrEqual } from "@std/assert"; +import { FixedChunkStream } from "./fixed_chunk_stream.ts"; + +Deno.test("FixedChunkStream", async () => { + const size = 512; + + const readable = ReadableStream.from(function* () { + for (let i = 0; i < 100; ++i) { + yield new Uint8Array(Math.random() * 1000); + } + }()).pipeThrough(new FixedChunkStream(size)); + const result = await Array.fromAsync(readable); + + assert(result.slice(0, -1).every((chunk) => chunk.length === size)); + assertLessOrEqual(result.at(-1)!.length, size); +}); diff --git a/streams/mod.ts b/streams/mod.ts index c64410b3d..f618f924e 100644 --- a/streams/mod.ts +++ b/streams/mod.ts @@ -26,6 +26,7 @@ export * from "./early_zip_readable_streams.ts"; export * from "./limited_bytes_transform_stream.ts"; export * from "./limited_transform_stream.ts"; export * from "./merge_readable_streams.ts"; +export * from "./fixed_chunk_stream.ts"; export * from "./text_delimiter_stream.ts"; export * from "./text_line_stream.ts"; export * from "./to_array_buffer.ts";