mirror of
https://github.com/denoland/std.git
synced 2024-11-22 04:59:05 +00:00
feat(streams/unstable): FixedChunkStream
(#4995)
* feat(streams): new ResizeStreams(size) * chore(streams): Renamed ResizeStream to FixedChunkStream * Update streams/fixed_chunk_stream_test.ts * update * tweaks * tweak test * fix(streams): chunks getting lost if size was equally divisible by `size` * work * work --------- Co-authored-by: Yoshiya Hinosawa <stibium121@gmail.com> Co-authored-by: Asher Gomez <ashersaupingomez@gmail.com>
This commit is contained in:
parent
5e2d065e08
commit
77345498a2
@ -11,6 +11,7 @@
|
|||||||
"./limited-bytes-transform-stream": "./limited_bytes_transform_stream.ts",
|
"./limited-bytes-transform-stream": "./limited_bytes_transform_stream.ts",
|
||||||
"./limited-transform-stream": "./limited_transform_stream.ts",
|
"./limited-transform-stream": "./limited_transform_stream.ts",
|
||||||
"./merge-readable-streams": "./merge_readable_streams.ts",
|
"./merge-readable-streams": "./merge_readable_streams.ts",
|
||||||
|
"./fixed-chunk-stream": "./fixed_chunk_stream.ts",
|
||||||
"./text-delimiter-stream": "./text_delimiter_stream.ts",
|
"./text-delimiter-stream": "./text_delimiter_stream.ts",
|
||||||
"./text-line-stream": "./text_line_stream.ts",
|
"./text-line-stream": "./text_line_stream.ts",
|
||||||
"./to-array-buffer": "./to_array_buffer.ts",
|
"./to-array-buffer": "./to_array_buffer.ts",
|
||||||
|
64
streams/fixed_chunk_stream.ts
Normal file
64
streams/fixed_chunk_stream.ts
Normal file
@ -0,0 +1,64 @@
|
|||||||
|
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A transform stream that resize {@linkcode Uint8Array} chunks into perfectly
|
||||||
|
* `size` chunks with the exception of the last chunk.
|
||||||
|
*
|
||||||
|
* > [!WARNING]
|
||||||
|
* > **UNSTABLE**: New API, yet to be vetted.
|
||||||
|
*
|
||||||
|
* @experimental
|
||||||
|
*
|
||||||
|
* @example Usage
|
||||||
|
* ```ts
|
||||||
|
* import { FixedChunkStream } from "@std/streams/fixed-chunk-stream";
|
||||||
|
* import { assertEquals } from "@std/assert/equals";
|
||||||
|
*
|
||||||
|
* const readable = ReadableStream.from(function* () {
|
||||||
|
* let count = 0
|
||||||
|
* for (let i = 0; i < 100; ++i) {
|
||||||
|
* const array = new Uint8Array(Math.floor(Math.random() * 1000));
|
||||||
|
* count += array.length;
|
||||||
|
* yield array;
|
||||||
|
* }
|
||||||
|
* yield new Uint8Array(512 - count % 512)
|
||||||
|
* }())
|
||||||
|
* .pipeThrough(new FixedChunkStream(512))
|
||||||
|
* .pipeTo(new WritableStream({
|
||||||
|
* write(chunk, _controller) {
|
||||||
|
* assertEquals(chunk.length, 512)
|
||||||
|
* }
|
||||||
|
* }))
|
||||||
|
* ```
|
||||||
|
*/
|
||||||
|
export class FixedChunkStream extends TransformStream<Uint8Array, Uint8Array> {
|
||||||
|
/**
|
||||||
|
* Constructs a new instance.
|
||||||
|
*
|
||||||
|
* @param size The size of the chunks to be resized to.
|
||||||
|
*/
|
||||||
|
constructor(size: number) {
|
||||||
|
let push: Uint8Array | undefined;
|
||||||
|
super({
|
||||||
|
transform(chunk, controller) {
|
||||||
|
if (push !== undefined) {
|
||||||
|
const concat = new Uint8Array(push.length + chunk.length);
|
||||||
|
concat.set(push);
|
||||||
|
concat.set(chunk, push.length);
|
||||||
|
chunk = concat;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (let i = size; i <= chunk.length; i += size) {
|
||||||
|
controller.enqueue(chunk.slice(i - size, i));
|
||||||
|
}
|
||||||
|
const remainder = -chunk.length % size;
|
||||||
|
push = remainder ? chunk.slice(remainder) : undefined;
|
||||||
|
},
|
||||||
|
flush(controller) {
|
||||||
|
if (push?.length) {
|
||||||
|
controller.enqueue(push);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
18
streams/fixed_chunk_stream_test.ts
Normal file
18
streams/fixed_chunk_stream_test.ts
Normal file
@ -0,0 +1,18 @@
|
|||||||
|
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
||||||
|
|
||||||
|
import { assert, assertLessOrEqual } from "@std/assert";
|
||||||
|
import { FixedChunkStream } from "./fixed_chunk_stream.ts";
|
||||||
|
|
||||||
|
Deno.test("FixedChunkStream", async () => {
|
||||||
|
const size = 512;
|
||||||
|
|
||||||
|
const readable = ReadableStream.from(function* () {
|
||||||
|
for (let i = 0; i < 100; ++i) {
|
||||||
|
yield new Uint8Array(Math.random() * 1000);
|
||||||
|
}
|
||||||
|
}()).pipeThrough(new FixedChunkStream(size));
|
||||||
|
const result = await Array.fromAsync(readable);
|
||||||
|
|
||||||
|
assert(result.slice(0, -1).every((chunk) => chunk.length === size));
|
||||||
|
assertLessOrEqual(result.at(-1)!.length, size);
|
||||||
|
});
|
@ -26,6 +26,7 @@ export * from "./early_zip_readable_streams.ts";
|
|||||||
export * from "./limited_bytes_transform_stream.ts";
|
export * from "./limited_bytes_transform_stream.ts";
|
||||||
export * from "./limited_transform_stream.ts";
|
export * from "./limited_transform_stream.ts";
|
||||||
export * from "./merge_readable_streams.ts";
|
export * from "./merge_readable_streams.ts";
|
||||||
|
export * from "./fixed_chunk_stream.ts";
|
||||||
export * from "./text_delimiter_stream.ts";
|
export * from "./text_delimiter_stream.ts";
|
||||||
export * from "./text_line_stream.ts";
|
export * from "./text_line_stream.ts";
|
||||||
export * from "./to_array_buffer.ts";
|
export * from "./to_array_buffer.ts";
|
||||||
|
Loading…
Reference in New Issue
Block a user