2024-01-01 21:11:32 +00:00
|
|
|
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
|
2023-03-18 12:36:00 +00:00
|
|
|
// This module is browser compatible.
|
2022-11-29 13:55:38 +00:00
|
|
|
|
2024-04-29 02:57:30 +00:00
|
|
|
import { concat } from "@std/bytes/concat";
|
2022-11-29 13:55:38 +00:00
|
|
|
import { createLPS } from "./_common.ts";
|
|
|
|
|
2023-12-04 06:12:52 +00:00
|
|
|
/** Disposition of the delimiter for {@linkcode DelimiterStreamOptions}. */
|
2023-02-13 17:46:22 +00:00
|
|
|
export type DelimiterDisposition =
|
|
|
|
/** Include delimiter in the found chunk. */
|
|
|
|
| "suffix"
|
|
|
|
/** Include delimiter in the subsequent chunk. */
|
|
|
|
| "prefix"
|
|
|
|
/** Discard the delimiter. */
|
|
|
|
| "discard" // delimiter discarded
|
|
|
|
;
|
|
|
|
|
2023-12-04 06:12:52 +00:00
|
|
|
/** Options for {@linkcode DelimiterStream}. */
|
2023-02-13 17:46:22 +00:00
|
|
|
export interface DelimiterStreamOptions {
|
2024-05-28 01:27:40 +00:00
|
|
|
/**
|
|
|
|
* Disposition of the delimiter.
|
|
|
|
*
|
|
|
|
* @default {"discard"}
|
|
|
|
*/
|
2023-02-13 17:46:22 +00:00
|
|
|
disposition?: DelimiterDisposition;
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Divide a stream into chunks delimited by a given byte sequence.
|
2022-11-29 13:55:38 +00:00
|
|
|
*
|
2024-05-28 01:27:40 +00:00
|
|
|
* If you are working with a stream of `string`, consider using {@linkcode TextDelimiterStream}.
|
|
|
|
*
|
2023-02-13 17:46:22 +00:00
|
|
|
* @example
|
|
|
|
* Divide a CSV stream by commas, discarding the commas:
|
2022-11-29 13:55:38 +00:00
|
|
|
* ```ts
|
2024-04-29 02:57:30 +00:00
|
|
|
* import { DelimiterStream } from "@std/streams/delimiter-stream";
|
refactor(assert,async,bytes,cli,collections,crypto,csv,data-structures,datetime,dotenv,encoding,expect,fmt,front-matter,fs,html,http,ini,internal,io,json,jsonc,log,media-types,msgpack,net,path,semver,streams,testing,text,toml,ulid,url,uuid,webgpu,yaml): import from `@std/assert` (#5199)
* refactor: import from `@std/assert`
* update
2024-06-30 08:30:10 +00:00
|
|
|
* import { assertEquals } from "@std/assert";
|
2024-05-28 01:27:40 +00:00
|
|
|
*
|
|
|
|
* const inputStream = ReadableStream.from(["foo,bar", ",baz"]);
|
|
|
|
*
|
|
|
|
* const transformed = inputStream.pipeThrough(new TextEncoderStream())
|
2023-02-13 17:46:22 +00:00
|
|
|
* .pipeThrough(new DelimiterStream(new TextEncoder().encode(",")))
|
2022-11-29 13:55:38 +00:00
|
|
|
* .pipeThrough(new TextDecoderStream());
|
2024-05-28 01:27:40 +00:00
|
|
|
*
|
|
|
|
* assertEquals(await Array.fromAsync(transformed), ["foo", "bar", "baz"]);
|
2022-11-29 13:55:38 +00:00
|
|
|
* ```
|
2023-02-13 17:46:22 +00:00
|
|
|
*
|
|
|
|
* @example
|
2024-05-28 01:27:40 +00:00
|
|
|
* Divide a stream after semi-colons, keeping the semicolons in the output:
|
2023-02-13 17:46:22 +00:00
|
|
|
* ```ts
|
2024-04-29 02:57:30 +00:00
|
|
|
* import { DelimiterStream } from "@std/streams/delimiter-stream";
|
refactor(assert,async,bytes,cli,collections,crypto,csv,data-structures,datetime,dotenv,encoding,expect,fmt,front-matter,fs,html,http,ini,internal,io,json,jsonc,log,media-types,msgpack,net,path,semver,streams,testing,text,toml,ulid,url,uuid,webgpu,yaml): import from `@std/assert` (#5199)
* refactor: import from `@std/assert`
* update
2024-06-30 08:30:10 +00:00
|
|
|
* import { assertEquals } from "@std/assert";
|
2024-05-28 01:27:40 +00:00
|
|
|
*
|
|
|
|
* const inputStream = ReadableStream.from(["foo;", "bar;baz", ";"]);
|
|
|
|
*
|
|
|
|
* const transformed = inputStream.pipeThrough(new TextEncoderStream())
|
2023-02-13 17:46:22 +00:00
|
|
|
* .pipeThrough(
|
2024-05-28 01:27:40 +00:00
|
|
|
* new DelimiterStream(new TextEncoder().encode(";"), {
|
|
|
|
* disposition: "suffix",
|
|
|
|
* }),
|
|
|
|
* ).pipeThrough(new TextDecoderStream());
|
|
|
|
*
|
|
|
|
* assertEquals(await Array.fromAsync(transformed), ["foo;", "bar;", "baz;"]);
|
2023-02-13 17:46:22 +00:00
|
|
|
* ```
|
2022-11-29 13:55:38 +00:00
|
|
|
*/
|
|
|
|
export class DelimiterStream extends TransformStream<Uint8Array, Uint8Array> {
|
2023-08-31 10:40:32 +00:00
|
|
|
#bufs: Uint8Array[] = [];
|
2022-11-29 13:55:38 +00:00
|
|
|
#delimiter: Uint8Array;
|
|
|
|
#matchIndex = 0;
|
2023-11-10 00:21:37 +00:00
|
|
|
#delimLPS: Uint8Array | null;
|
2023-08-31 10:40:32 +00:00
|
|
|
#disp: DelimiterDisposition;
|
2022-11-29 13:55:38 +00:00
|
|
|
|
2024-05-28 01:27:40 +00:00
|
|
|
/**
|
|
|
|
* Constructs a new instance.
|
|
|
|
*
|
|
|
|
* @param delimiter A delimiter to split the stream by.
|
|
|
|
* @param options Options for the delimiter stream.
|
|
|
|
*/
|
2023-02-13 17:46:22 +00:00
|
|
|
constructor(
|
|
|
|
delimiter: Uint8Array,
|
2024-06-18 06:09:29 +00:00
|
|
|
options: DelimiterStreamOptions = {},
|
2023-02-13 17:46:22 +00:00
|
|
|
) {
|
2022-11-29 13:55:38 +00:00
|
|
|
super({
|
2023-11-10 00:21:37 +00:00
|
|
|
transform: (chunk, controller) =>
|
|
|
|
delimiter.length === 1
|
|
|
|
? this.#handleChar(chunk, controller)
|
|
|
|
: this.#handle(chunk, controller),
|
|
|
|
flush: (controller) => this.#flush(controller),
|
2022-11-29 13:55:38 +00:00
|
|
|
});
|
|
|
|
|
|
|
|
this.#delimiter = delimiter;
|
2023-11-10 00:21:37 +00:00
|
|
|
this.#delimLPS = delimiter.length > 1 ? createLPS(delimiter) : null;
|
2024-05-28 01:27:40 +00:00
|
|
|
this.#disp = options.disposition ?? "discard";
|
2022-11-29 13:55:38 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
#handle(
|
|
|
|
chunk: Uint8Array,
|
|
|
|
controller: TransformStreamDefaultController<Uint8Array>,
|
|
|
|
) {
|
2023-08-31 10:40:32 +00:00
|
|
|
const bufs = this.#bufs;
|
|
|
|
const length = chunk.byteLength;
|
|
|
|
const disposition = this.#disp;
|
|
|
|
const delimiter = this.#delimiter;
|
|
|
|
const delimLen = delimiter.length;
|
2023-11-10 00:21:37 +00:00
|
|
|
const lps = this.#delimLPS as Uint8Array;
|
2023-08-31 10:40:32 +00:00
|
|
|
let chunkStart = 0;
|
|
|
|
let matchIndex = this.#matchIndex;
|
|
|
|
let inspectIndex = 0;
|
|
|
|
while (inspectIndex < length) {
|
|
|
|
if (chunk[inspectIndex] === delimiter[matchIndex]) {
|
|
|
|
// Next byte matched our next delimiter byte
|
|
|
|
inspectIndex++;
|
|
|
|
matchIndex++;
|
|
|
|
if (matchIndex === delimLen) {
|
2022-11-29 13:55:38 +00:00
|
|
|
// Full match
|
2023-08-31 10:40:32 +00:00
|
|
|
matchIndex = 0;
|
|
|
|
const delimiterStartIndex = inspectIndex - delimLen;
|
|
|
|
const delimitedChunkEnd = disposition === "suffix"
|
|
|
|
? inspectIndex
|
|
|
|
: delimiterStartIndex;
|
|
|
|
if (delimitedChunkEnd <= 0 && bufs.length === 0) {
|
|
|
|
// Our chunk started with a delimiter and no previous chunks exist:
|
|
|
|
// Enqueue an empty chunk.
|
|
|
|
controller.enqueue(new Uint8Array());
|
2023-09-04 00:41:24 +00:00
|
|
|
chunkStart = disposition === "prefix" ? 0 : inspectIndex;
|
2023-08-31 10:40:32 +00:00
|
|
|
} else if (delimitedChunkEnd > 0 && bufs.length === 0) {
|
|
|
|
// No previous chunks, slice from current chunk.
|
|
|
|
controller.enqueue(chunk.subarray(chunkStart, delimitedChunkEnd));
|
|
|
|
// Our chunk may have more than one delimiter; we must remember where
|
|
|
|
// the next delimited chunk begins.
|
|
|
|
chunkStart = disposition === "prefix"
|
|
|
|
? delimiterStartIndex
|
|
|
|
: inspectIndex;
|
|
|
|
} else if (delimitedChunkEnd === 0 && bufs.length > 0) {
|
|
|
|
// Our chunk started with a delimiter, previous chunks are passed as
|
|
|
|
// they are (with concatenation).
|
|
|
|
if (bufs.length === 1) {
|
|
|
|
// Concat not needed when a single buffer is passed.
|
2024-02-24 20:22:50 +00:00
|
|
|
controller.enqueue(bufs[0]!);
|
2023-08-31 10:40:32 +00:00
|
|
|
} else {
|
2023-11-13 05:34:32 +00:00
|
|
|
controller.enqueue(concat(bufs));
|
2023-08-31 10:40:32 +00:00
|
|
|
}
|
|
|
|
// Drop all previous chunks.
|
|
|
|
bufs.length = 0;
|
|
|
|
if (disposition !== "prefix") {
|
|
|
|
// suffix or discard: The next chunk starts where our inspection finished.
|
|
|
|
// We should only ever end up here with a discard disposition as
|
|
|
|
// for a suffix disposition this branch would mean that the previous
|
|
|
|
// chunk ended with a full match but was not enqueued.
|
|
|
|
chunkStart = inspectIndex;
|
2023-09-04 00:41:24 +00:00
|
|
|
} else {
|
|
|
|
chunkStart = 0;
|
2023-08-31 10:40:32 +00:00
|
|
|
}
|
|
|
|
} else if (delimitedChunkEnd < 0 && bufs.length > 0) {
|
|
|
|
// Our chunk started by finishing a partial delimiter match.
|
|
|
|
const lastIndex = bufs.length - 1;
|
2024-02-24 20:22:50 +00:00
|
|
|
const last = bufs[lastIndex]!;
|
2023-08-31 10:40:32 +00:00
|
|
|
const lastSliceIndex = last.byteLength + delimitedChunkEnd;
|
|
|
|
const lastSliced = last.subarray(0, lastSliceIndex);
|
|
|
|
if (lastIndex === 0) {
|
|
|
|
controller.enqueue(lastSliced);
|
|
|
|
} else {
|
|
|
|
bufs[lastIndex] = lastSliced;
|
2023-11-13 05:34:32 +00:00
|
|
|
controller.enqueue(concat(bufs));
|
2023-08-31 10:40:32 +00:00
|
|
|
}
|
|
|
|
bufs.length = 0;
|
|
|
|
if (disposition === "prefix") {
|
|
|
|
// Must keep last bytes of last chunk.
|
|
|
|
bufs.push(last.subarray(lastSliceIndex));
|
2023-09-04 00:41:24 +00:00
|
|
|
chunkStart = 0;
|
2023-08-31 10:40:32 +00:00
|
|
|
} else {
|
|
|
|
chunkStart = inspectIndex;
|
|
|
|
}
|
|
|
|
} else if (delimitedChunkEnd > 0 && bufs.length > 0) {
|
|
|
|
// Previous chunks and current chunk together form a delimited chunk.
|
|
|
|
const chunkSliced = chunk.subarray(chunkStart, delimitedChunkEnd);
|
2023-11-13 05:34:32 +00:00
|
|
|
const result = concat([...bufs, chunkSliced]);
|
2023-08-31 10:40:32 +00:00
|
|
|
bufs.length = 0;
|
2023-09-04 00:41:24 +00:00
|
|
|
controller.enqueue(result);
|
2023-08-31 10:40:32 +00:00
|
|
|
chunkStart = disposition === "prefix"
|
|
|
|
? delimitedChunkEnd
|
|
|
|
: inspectIndex;
|
|
|
|
} else {
|
|
|
|
throw new Error("unreachable");
|
|
|
|
}
|
2022-11-29 13:55:38 +00:00
|
|
|
}
|
2023-08-31 10:40:32 +00:00
|
|
|
} else if (matchIndex === 0) {
|
|
|
|
// No match ongoing, keep going through the buffer.
|
|
|
|
inspectIndex++;
|
2022-11-29 13:55:38 +00:00
|
|
|
} else {
|
2023-08-31 10:40:32 +00:00
|
|
|
// Ongoing match: Degrade to the previous possible match.
|
|
|
|
// eg. If we're looking for 'AAB' and had matched 'AA' previously
|
|
|
|
// but now got a new 'A', then we'll drop down to having matched
|
|
|
|
// just 'A'. The while loop will turn around again and we'll rematch
|
|
|
|
// to 'AA' and proceed onwards to try and match on 'B' again.
|
2024-02-24 20:22:50 +00:00
|
|
|
matchIndex = lps[matchIndex - 1]!;
|
2022-11-29 13:55:38 +00:00
|
|
|
}
|
|
|
|
}
|
2023-08-31 10:40:32 +00:00
|
|
|
// Save match index.
|
|
|
|
this.#matchIndex = matchIndex;
|
|
|
|
if (chunkStart === 0) {
|
|
|
|
bufs.push(chunk);
|
2023-09-04 00:41:24 +00:00
|
|
|
} else if (chunkStart < length) {
|
2023-08-31 10:40:32 +00:00
|
|
|
// If we matched partially somewhere in the middle of our chunk
|
|
|
|
// then the remnants should be pushed into buffers.
|
|
|
|
bufs.push(chunk.subarray(chunkStart));
|
|
|
|
}
|
2022-11-29 13:55:38 +00:00
|
|
|
}
|
2023-11-10 00:21:37 +00:00
|
|
|
|
|
|
|
/**
|
|
|
|
* Optimized handler for a char delimited stream:
|
|
|
|
*
|
|
|
|
* For char delimited streams we do not need to keep track of
|
|
|
|
* the match index, removing the need for a fair bit of work.
|
|
|
|
*/
|
|
|
|
#handleChar(
|
|
|
|
chunk: Uint8Array,
|
|
|
|
controller: TransformStreamDefaultController<Uint8Array>,
|
|
|
|
) {
|
|
|
|
const bufs = this.#bufs;
|
|
|
|
const length = chunk.byteLength;
|
|
|
|
const disposition = this.#disp;
|
|
|
|
const delimiter = this.#delimiter[0];
|
|
|
|
let chunkStart = 0;
|
|
|
|
let inspectIndex = 0;
|
|
|
|
while (inspectIndex < length) {
|
|
|
|
if (chunk[inspectIndex] === delimiter) {
|
|
|
|
// Next byte matched our next delimiter
|
|
|
|
inspectIndex++;
|
|
|
|
/**
|
|
|
|
* Always non-negative
|
|
|
|
*/
|
|
|
|
const delimitedChunkEnd = disposition === "suffix"
|
|
|
|
? inspectIndex
|
|
|
|
: inspectIndex - 1;
|
|
|
|
if (delimitedChunkEnd === 0 && bufs.length === 0) {
|
|
|
|
// Our chunk started with a delimiter and no previous chunks exist:
|
|
|
|
// Enqueue an empty chunk.
|
|
|
|
controller.enqueue(new Uint8Array());
|
|
|
|
chunkStart = disposition === "prefix" ? 0 : 1;
|
|
|
|
} else if (delimitedChunkEnd > 0 && bufs.length === 0) {
|
|
|
|
// No previous chunks, slice from current chunk.
|
|
|
|
controller.enqueue(chunk.subarray(chunkStart, delimitedChunkEnd));
|
|
|
|
// Our chunk may have more than one delimiter; we must remember where
|
|
|
|
// the next delimited chunk begins.
|
|
|
|
chunkStart = disposition === "prefix"
|
|
|
|
? inspectIndex - 1
|
|
|
|
: inspectIndex;
|
|
|
|
} else if (delimitedChunkEnd === 0 && bufs.length > 0) {
|
|
|
|
// Our chunk started with a delimiter, previous chunks are passed as
|
|
|
|
// they are (with concatenation).
|
|
|
|
if (bufs.length === 1) {
|
|
|
|
// Concat not needed when a single buffer is passed.
|
2024-02-24 20:22:50 +00:00
|
|
|
controller.enqueue(bufs[0]!);
|
2023-11-10 00:21:37 +00:00
|
|
|
} else {
|
2023-11-24 08:19:11 +00:00
|
|
|
controller.enqueue(concat(bufs));
|
2023-11-10 00:21:37 +00:00
|
|
|
}
|
|
|
|
// Drop all previous chunks.
|
|
|
|
bufs.length = 0;
|
|
|
|
if (disposition !== "prefix") {
|
|
|
|
// suffix or discard: The next chunk starts where our inspection finished.
|
|
|
|
// We should only ever end up here with a discard disposition as
|
|
|
|
// for a suffix disposition this branch would mean that the previous
|
|
|
|
// chunk ended with a full match but was not enqueued.
|
|
|
|
chunkStart = inspectIndex;
|
|
|
|
}
|
|
|
|
} else if (delimitedChunkEnd > 0 && bufs.length > 0) {
|
|
|
|
// Previous chunks and current chunk together form a delimited chunk.
|
|
|
|
const chunkSliced = chunk.subarray(chunkStart, delimitedChunkEnd);
|
2023-11-24 08:19:11 +00:00
|
|
|
const result = concat([...bufs, chunkSliced]);
|
2023-11-10 00:21:37 +00:00
|
|
|
bufs.length = 0;
|
|
|
|
chunkStart = disposition === "prefix"
|
|
|
|
? delimitedChunkEnd
|
|
|
|
: inspectIndex;
|
|
|
|
controller.enqueue(result);
|
|
|
|
} else {
|
|
|
|
throw new Error("unreachable");
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
inspectIndex++;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if (chunkStart === 0) {
|
|
|
|
bufs.push(chunk);
|
|
|
|
} else if (chunkStart < length) {
|
|
|
|
// If we matched partially somewhere in the middle of our chunk
|
|
|
|
// then the remnants should be pushed into buffers.
|
|
|
|
bufs.push(chunk.subarray(chunkStart));
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#flush(controller: TransformStreamDefaultController<Uint8Array>) {
|
|
|
|
const bufs = this.#bufs;
|
|
|
|
const length = bufs.length;
|
|
|
|
if (length === 0) {
|
|
|
|
controller.enqueue(new Uint8Array());
|
|
|
|
} else if (length === 1) {
|
2024-02-24 20:22:50 +00:00
|
|
|
controller.enqueue(bufs[0]!);
|
2023-11-10 00:21:37 +00:00
|
|
|
} else {
|
2023-11-13 05:34:32 +00:00
|
|
|
controller.enqueue(concat(bufs));
|
2023-11-10 00:21:37 +00:00
|
|
|
}
|
|
|
|
}
|
2022-11-29 13:55:38 +00:00
|
|
|
}
|