std/streams/text_line_stream.ts

143 lines
4.0 KiB
TypeScript

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
// This module is browser compatible.
/** Options for {@linkcode TextLineStream}. */
export interface TextLineStreamOptions {
/**
* Allow splitting by `\r`.
*
* @default {false}
*/
allowCR?: boolean;
}
/**
* Transform a stream into a stream where each chunk is divided by a newline,
* be it `\n` or `\r\n`. `\r` can be enabled via the `allowCR` option.
*
* If you want to split by a custom delimiter, consider using {@linkcode TextDelimiterStream}.
*
* @example JSON Lines
* ```ts
* import { TextLineStream } from "@std/streams/text-line-stream";
* import { toTransformStream } from "@std/streams/to-transform-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from([
* '{"name": "Alice", "age": ',
* '30}\n{"name": "Bob", "age"',
* ": 25}\n",
* ]);
*
* type Person = { name: string; age: number };
*
* // Split the stream by newline and parse each line as a JSON object
* const jsonStream = stream.pipeThrough(new TextLineStream())
* .pipeThrough(toTransformStream(async function* (src) {
* for await (const chunk of src) {
* if (chunk.trim().length === 0) {
* continue;
* }
* yield JSON.parse(chunk) as Person;
* }
* }));
*
* assertEquals(
* await Array.fromAsync(jsonStream),
* [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }],
* );
* ```
*
* @example Allow splitting by `\r`
*
* ```ts
* import { TextLineStream } from "@std/streams/text-line-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from([
* "CR\rLF",
* "\nCRLF\r\ndone",
* ]).pipeThrough(new TextLineStream({ allowCR: true }));
*
* const lines = await Array.fromAsync(stream);
*
* assertEquals(lines, ["CR", "LF", "CRLF", "done"]);
* ```
*/
export class TextLineStream extends TransformStream<string, string> {
#currentLine = "";
/**
* Constructs a new instance.
*
* @param options Options for the stream.
*
* @example No parameters
* ```ts
* import { TextLineStream } from "@std/streams/text-line-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from([
* "Hello,\n",
* "world!\n",
* ]).pipeThrough(new TextLineStream());
*
* const lines = await Array.fromAsync(stream);
*
* assertEquals(lines, ["Hello,", "world!"]);
* ```
*
* @example Allow splitting by `\r`
*
* ```ts
* import { TextLineStream } from "@std/streams/text-line-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from([
* "CR\rLF",
* "\nCRLF\r\ndone",
* ]).pipeThrough(new TextLineStream({ allowCR: true }));
*
* const lines = await Array.fromAsync(stream);
*
* assertEquals(lines, ["CR", "LF", "CRLF", "done"]);
* ```
*/
constructor(options: TextLineStreamOptions = { allowCR: false }) {
super({
transform: (chars, controller) => {
chars = this.#currentLine + chars;
while (true) {
const lfIndex = chars.indexOf("\n");
const crIndex = options.allowCR ? chars.indexOf("\r") : -1;
if (
crIndex !== -1 && crIndex !== (chars.length - 1) &&
(lfIndex === -1 || (lfIndex - 1) > crIndex)
) {
controller.enqueue(chars.slice(0, crIndex));
chars = chars.slice(crIndex + 1);
continue;
}
if (lfIndex === -1) break;
const endIndex = chars[lfIndex - 1] === "\r" ? lfIndex - 1 : lfIndex;
controller.enqueue(chars.slice(0, endIndex));
chars = chars.slice(lfIndex + 1);
}
this.#currentLine = chars;
},
flush: (controller) => {
if (this.#currentLine === "") return;
const currentLine = options.allowCR && this.#currentLine.endsWith("\r")
? this.#currentLine.slice(0, -1)
: this.#currentLine;
controller.enqueue(currentLine);
},
});
}
}