feat(streams): TextLineStream (#1978)

This commit is contained in:
Leo Kettmeir 2022-03-02 11:42:06 +01:00 committed by GitHub
parent 0aa2406e20
commit e0f74688e7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 113 additions and 2 deletions

View File

@ -6,7 +6,9 @@ import { BytesList } from "../bytes/bytes_list.ts";
const CR = "\r".charCodeAt(0);
const LF = "\n".charCodeAt(0);
/** Transform a stream into a stream where each chunk is divided by a newline,
/** @deprecated Use TextLineStream instead, as it can handle empty lines.
*
* Transform a stream into a stream where each chunk is divided by a newline,
* be it `\n` or `\r\n`.
*
* ```ts
@ -73,6 +75,75 @@ export class LineStream extends TransformStream<Uint8Array, Uint8Array> {
}
}
/** Transform a stream into a stream where each chunk is divided by a newline,
* be it `\n` or `\r\n`.
*
* ```ts
* import { TextLineStream } from "./delimiter.ts";
* const res = await fetch("https://example.com");
* const lines = res.body!
* .pipeThrough(new TextDecoderStream())
* .pipeThrough(new TextLineStream());
* ```
*/
export class TextLineStream extends TransformStream<string, string> {
#buf = "";
#prevHadCR = false;
constructor() {
super({
transform: (chunk, controller) => {
this.#handle(chunk, controller);
},
flush: (controller) => {
controller.enqueue(this.#getBuf(false));
},
});
}
#handle(
chunk: string,
controller: TransformStreamDefaultController<string>,
) {
const lfIndex = chunk.indexOf("\n");
if (this.#prevHadCR) {
this.#prevHadCR = false;
if (lfIndex === 0) {
controller.enqueue(this.#getBuf(true));
this.#handle(chunk.slice(1), controller);
return;
}
}
if (lfIndex === -1) {
if (chunk.at(-1) === "\r") {
this.#prevHadCR = true;
}
this.#buf += chunk;
} else {
let crOrLfIndex = lfIndex;
if (chunk[lfIndex - 1] === "\r") {
crOrLfIndex--;
}
this.#buf += chunk.slice(0, crOrLfIndex);
controller.enqueue(this.#getBuf(false));
this.#handle(chunk.slice(lfIndex + 1), controller);
}
}
#getBuf(prevHadCR: boolean): string {
const buf = this.#buf;
this.#buf = "";
if (prevHadCR) {
return buf.slice(0, -1);
} else {
return buf;
}
}
}
/** Transform a stream into a stream where each chunk is divided by a given delimiter.
*
* ```ts

View File

@ -1,6 +1,6 @@
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
import { DelimiterStream, LineStream } from "./delimiter.ts";
import { DelimiterStream, LineStream, TextLineStream } from "./delimiter.ts";
import { assert, assertEquals } from "../testing/asserts.ts";
Deno.test("[streams] LineStream", async () => {
@ -35,6 +35,46 @@ Deno.test("[streams] LineStream", async () => {
assert(f.done);
});
Deno.test("[streams] TextLineStream", async () => {
const textStream = new ReadableStream({
start(controller) {
controller.enqueue("qwertzu");
controller.enqueue("iopasd\r\nmnbvc");
controller.enqueue("xylk\njhgfds\napoiuzt");
controller.enqueue("qwr09eiqwrjiowqr\r");
controller.enqueue("\nrewq0987\n\n654321");
controller.enqueue("\nrewq0987\r\n\r\n654321");
controller.close();
},
});
const lines = textStream.pipeThrough(new TextLineStream());
const reader = lines.getReader();
const a = await reader.read();
assertEquals(a.value, "qwertzuiopasd");
const b = await reader.read();
assertEquals(b.value, "mnbvcxylk");
const c = await reader.read();
assertEquals(c.value, "jhgfds");
const d = await reader.read();
assertEquals(d.value, "apoiuztqwr09eiqwrjiowqr");
const e = await reader.read();
assertEquals(e.value, "rewq0987");
const f = await reader.read();
assertEquals(f.value, "");
const g = await reader.read();
assertEquals(g.value, "654321");
const h = await reader.read();
assertEquals(h.value, "rewq0987");
const i = await reader.read();
assertEquals(i.value, "");
const j = await reader.read();
assertEquals(j.value, "654321");
const k = await reader.read();
assert(k.done);
});
Deno.test("[streams] DelimiterStream", async () => {
const textStream = new ReadableStream({
start(controller) {