fix(streams/delimiter): avoid recursion in TextLineStream (#2318)

This commit is contained in:
ud2 2022-06-29 16:09:42 +08:00 committed by GitHub
parent b9fce041d4
commit 883fe0f401
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 51 additions and 72 deletions

View File

@ -92,86 +92,50 @@ interface TextLineStreamOptions {
* ```
*/
export class TextLineStream extends TransformStream<string, string> {
readonly #allowCR: boolean;
#buf = "";
#prevHadCR = false;
#allowCR: boolean;
constructor(options?: TextLineStreamOptions) {
super({
transform: (chunk, controller) => {
this.#handle(chunk, controller);
},
flush: (controller) => {
controller.enqueue(this.#getBuf(this.#prevHadCR));
if (this.#prevHadCR) {
controller.enqueue("");
}
},
transform: (chunk, controller) => this.#handle(chunk, controller),
flush: (controller) => this.#handle("\r\n", controller),
});
this.#allowCR = options?.allowCR ?? false;
}
#handle(
chunk: string,
controller: TransformStreamDefaultController<string>,
) {
const lfIndex = chunk.indexOf("\n");
const crIndex = chunk.indexOf("\r");
#handle(chunk: string, controller: TransformStreamDefaultController<string>) {
chunk = this.#buf + chunk;
if (this.#prevHadCR) {
this.#prevHadCR = false;
controller.enqueue(this.#getBuf(true));
if (lfIndex === 0) {
this.#handle(chunk.slice(1), controller);
return;
for (;;) {
const lfIndex = chunk.indexOf("\n");
if (this.#allowCR) {
const crIndex = chunk.indexOf("\r");
if (
crIndex !== -1 && crIndex !== (chunk.length - 1) &&
(lfIndex === -1 || (lfIndex - 1) > crIndex)
) {
controller.enqueue(chunk.slice(0, crIndex));
chunk = chunk.slice(crIndex + 1);
continue;
}
}
if (lfIndex !== -1) {
let crOrLfIndex = lfIndex;
if (chunk[lfIndex - 1] === "\r") {
crOrLfIndex--;
}
controller.enqueue(chunk.slice(0, crOrLfIndex));
chunk = chunk.slice(lfIndex + 1);
continue;
}
break;
}
if (lfIndex === -1 && crIndex === -1) { // neither \n nor \r
this.#buf += chunk;
} else if (lfIndex === -1 && crIndex !== -1) { // not \n but \r
if (crIndex === (chunk.length - 1)) { // \r is last character
this.#buf += chunk;
this.#prevHadCR = true;
} else if (this.#allowCR) {
this.#mergeHandle(chunk, crIndex, crIndex, controller);
} else {
this.#buf += chunk.slice(0, crIndex + 1);
this.#handle(chunk.slice(crIndex + 1), controller);
}
} else if (lfIndex !== -1 && crIndex === -1) { // \n but not \r
this.#mergeHandle(chunk, lfIndex, lfIndex, controller);
} else { // \n and \r
if ((lfIndex - 1) === crIndex) { // \r\n
this.#mergeHandle(chunk, crIndex, lfIndex, controller);
} else if (crIndex < lfIndex && this.#allowCR) { // \r first
this.#mergeHandle(chunk, crIndex, crIndex, controller);
} else { // \n first
this.#mergeHandle(chunk, lfIndex, lfIndex, controller);
}
}
}
#mergeHandle(
chunk: string,
prevChunkEndIndex: number,
newChunkStartIndex: number,
controller: TransformStreamDefaultController<string>,
) {
this.#buf += chunk.slice(0, prevChunkEndIndex);
controller.enqueue(this.#getBuf(false));
this.#handle(chunk.slice(newChunkStartIndex + 1), controller);
}
#getBuf(prevHadCR: boolean): string {
const buf = this.#buf;
this.#buf = "";
if (prevHadCR) {
return buf.slice(0, -1);
} else {
return buf;
}
this.#buf = chunk;
}
}

View File

@ -60,15 +60,13 @@ Deno.test("[streams] TextLineStream", async () => {
assertEquals(lines, [
"qwertzuiopasd",
"mnbvcxylk\rjhgfds",
"apoiuzt",
"qwr\r09ei\rqwrjiowqr",
"apoiuzt\rqwr\r09ei\rqwrjiowqr",
"rewq0987",
"",
"654321",
"rewq0987",
"",
"654321",
"",
"654321\r",
]);
const textStream2 = new ReadableStream({
@ -145,6 +143,23 @@ Deno.test("[streams] TextLineStream - allowCR", async () => {
]);
});
Deno.test("[streams] TextLineStream - large chunks", async () => {
const textStream = new ReadableStream({
start(controller) {
controller.enqueue("\n".repeat(10000));
controller.enqueue("\n".repeat(10000));
controller.close();
},
});
let lines = 0;
for await (const chunk of textStream.pipeThrough(new TextLineStream())) {
assertEquals(chunk, "");
lines++;
}
assertEquals(lines, 20001);
});
Deno.test("[streams] DelimiterStream", async () => {
const textStream = new ReadableStream({
start(controller) {