feat(io/unstable): LimitedBytesTransformStream

This commit is contained in:
Asher Gomez 2024-09-26 11:13:22 +10:00
parent 33915afbb4
commit 8e11d05065
4 changed files with 201 additions and 0 deletions

View File

@ -88,6 +88,7 @@ const ENTRY_POINTS = [
"../semver/mod.ts",
"../streams/mod.ts",
"../streams/unstable_fixed_chunk_stream.ts",
"../streams/unstable_limited_bytes_transform_stream.ts",
"../streams/unstable_to_lines.ts",
"../streams/unstable_to_bytes.ts",
"../tar/mod.ts",

View File

@ -10,6 +10,7 @@
"./early-zip-readable-streams": "./early_zip_readable_streams.ts",
"./limited-bytes-transform-stream": "./limited_bytes_transform_stream.ts",
"./limited-transform-stream": "./limited_transform_stream.ts",
"./unstable-limited-bytes-transform-stream": "./unstable_limited_bytes_transform_stream.ts",
"./merge-readable-streams": "./merge_readable_streams.ts",
"./unstable-fixed-chunk-stream": "./unstable_fixed_chunk_stream.ts",
"./text-delimiter-stream": "./text_delimiter_stream.ts",

View File

@ -0,0 +1,88 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.
/**
* A {@linkcode TransformStream} that will only read & enqueue chunks until the
* total amount of enqueued data equals `size`. Excess data will be discarded.
*
* @example `size` is equal to the total byte length of the chunks
* ```ts
* import { LimitedBytesTransformStream } from "@std/streams/unstable-limited-bytes-transform-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from(["1234", "5678"]);
* const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough(
* new LimitedBytesTransformStream(8),
* ).pipeThrough(new TextDecoderStream());
*
* assertEquals(
* await Array.fromAsync(transformed),
* ["1234", "5678"],
* );
* ```
*
* @example `size` is less than the total byte length of the chunks, and at the
* boundary of the chunks
* ```ts
* import { LimitedBytesTransformStream } from "@std/streams/unstable-limited-bytes-transform-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from(["1234", "5678"]);
* const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough(
* // `4` is the boundary of the chunks
* new LimitedBytesTransformStream(4),
* ).pipeThrough(new TextDecoderStream());
*
* assertEquals(
* await Array.fromAsync(transformed),
* // The first chunk was read, but the second chunk was not
* ["1234"],
* );
* ```
*
* @example `size` is less than the total byte length of the chunks, and not at
* the boundary of the chunks
* ```ts
* import { LimitedBytesTransformStream } from "@std/streams/unstable-limited-bytes-transform-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from(["1234", "5678"]);
* const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough(
* // `5` is not the boundary of the chunks
* new LimitedBytesTransformStream(5),
* ).pipeThrough(new TextDecoderStream());
*
* assertEquals(
* await Array.fromAsync(transformed),
* // The second chunk was not read because it would exceed the specified size
* ["1234", "5"],
* );
* ```
*/
export class LimitedBytesTransformStream
extends TransformStream<Uint8Array, Uint8Array> {
#read = 0;
/**
* Constructs a new instance.
*
* @param size A size limit in bytes.
* @param options Options for the stream.
*/
constructor(size: number) {
super({
transform: (chunk, controller) => {
if (((this.#read + chunk.byteLength) > size)) {
if (this.#read < size) {
const remaining = size - this.#read;
controller.enqueue(chunk.slice(0, remaining));
}
controller.terminate();
} else {
this.#read += chunk.byteLength;
controller.enqueue(chunk);
}
},
});
}
}

View File

@ -0,0 +1,111 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
import { assertEquals } from "@std/assert";
import { LimitedBytesTransformStream } from "./unstable_limited_bytes_transform_stream.ts";
Deno.test("LimitedBytesTransformStream - specified size is the boundary of the chunks", async function () {
const r = ReadableStream.from([
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11, 12]),
new Uint8Array([13, 14, 15]),
new Uint8Array([16, 17, 18]),
]).pipeThrough(new LimitedBytesTransformStream(6));
const chunks = await Array.fromAsync(r);
assertEquals(chunks, [
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
]);
});
Deno.test("LimitedBytesTransformStream - specified size is not the boundary of the chunks", async function () {
const r = ReadableStream.from([
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11, 12]),
new Uint8Array([13, 14, 15]),
new Uint8Array([16, 17, 18]),
]).pipeThrough(new LimitedBytesTransformStream(7));
const chunks = await Array.fromAsync(r);
assertEquals(chunks, [
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7]),
]);
});
Deno.test("LimitedBytesTransformStream - specified size is 0", async function () {
const r = ReadableStream.from([
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11, 12]),
new Uint8Array([13, 14, 15]),
new Uint8Array([16, 17, 18]),
]).pipeThrough(new LimitedBytesTransformStream(0));
const chunks = await Array.fromAsync(r);
assertEquals(chunks.length, 0);
});
Deno.test("LimitedBytesTransformStream - specified size is equal to the total size of the chunks", async function () {
const r = ReadableStream.from([
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11, 12]),
new Uint8Array([13, 14, 15]),
new Uint8Array([16, 17, 18]),
]).pipeThrough(new LimitedBytesTransformStream(18));
const chunks = await Array.fromAsync(r);
assertEquals(chunks, [
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11, 12]),
new Uint8Array([13, 14, 15]),
new Uint8Array([16, 17, 18]),
]);
});
Deno.test("LimitedBytesTransformStream - specified size is greater than the total size of the chunks", async function () {
const r = ReadableStream.from([
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11, 12]),
new Uint8Array([13, 14, 15]),
new Uint8Array([16, 17, 18]),
]).pipeThrough(new LimitedBytesTransformStream(19));
const chunks = await Array.fromAsync(r);
assertEquals(chunks, [
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11, 12]),
new Uint8Array([13, 14, 15]),
new Uint8Array([16, 17, 18]),
]);
});
Deno.test("LimitedBytesTransformStream - specified size is less than the size of the first chunk", async function () {
const r = ReadableStream.from([
new Uint8Array([1, 2, 3]),
new Uint8Array([4, 5, 6]),
new Uint8Array([7, 8, 9]),
new Uint8Array([10, 11, 12]),
new Uint8Array([13, 14, 15]),
new Uint8Array([16, 17, 18]),
]).pipeThrough(new LimitedBytesTransformStream(2));
const chunks = await Array.fromAsync(r);
assertEquals(chunks, [
new Uint8Array([1, 2]),
]);
});