std/streams/to_transform_stream_test.ts

219 lines
6.1 KiB
TypeScript
Raw Normal View History

2023-01-03 10:47:44 +00:00
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
import { assertEquals, assertRejects } from "../assert/mod.ts";
import { toTransformStream } from "./to_transform_stream.ts";
import { readableStreamFromIterable } from "./readable_stream_from_iterable.ts";
Deno.test({
name: "[streams] toTransformStream()",
async fn() {
const readable = readableStreamFromIterable([0, 1, 2])
.pipeThrough(toTransformStream(async function* (src) {
for await (const i of src) {
yield i * 100;
}
}));
const res = [];
for await (const i of readable) {
res.push(i);
}
assertEquals(res, [0, 100, 200]);
},
});
Deno.test({
name: "[streams] toTransformStream() Pass iterable instead of asyncIterable",
async fn() {
const readable = readableStreamFromIterable([0, 1, 2])
.pipeThrough(toTransformStream(function* (_src) {
yield 0;
yield 100;
yield 200;
}));
const res = [];
for await (const i of readable) {
res.push(i);
}
assertEquals(res, [0, 100, 200]);
},
});
Deno.test({
name: "[streams] toTransformStream() Propagate the error from readable 1",
async fn(t) {
// When data is pipelined in the order of readable1 → generator → readable2,
// Propagate the error that occurred in readable1 to generator and readable2.
const expectedError = new Error("Error from readable1");
await t.step({
name: "to readable 2",
async fn() {
// Propagate the error that occurred in readable1 to readable2.
let actualError = null;
const readable1 = new ReadableStream({
start(controller) {
controller.error(expectedError); // error from readable1
},
});
const readable2 = readable1.pipeThrough(
toTransformStream(async function* (src) {
for await (const i of src) {
yield i;
}
}),
);
try {
await readable2.getReader().read();
} catch (error) {
actualError = error; // catch error in readable2
}
assertEquals(actualError, expectedError);
},
});
await t.step({
name: "to generator",
async fn() {
// Propagate the error that occurred in readable1 to generator.
let actualError = null;
const readable1 = new ReadableStream({
start(controller) {
controller.error(expectedError); // error from readable1
},
});
const readable2 = readable1.pipeThrough(
toTransformStream(async function* (src) {
try {
await src.getReader().read();
} catch (error) {
actualError = error; // catch error in generator
}
yield 0;
}),
);
await readable2.getReader().read();
assertEquals(actualError, expectedError);
},
});
},
});
Deno.test({
name: "[streams] toTransformStream() Propagate the error from generator",
async fn(t) {
// When data is pipelined in the order of readable1 → generator → readable2,
// Propagate the error that occurred in generator to readable2 and readable1.
const expectedError = new Error("Error from generator");
let actualError1: unknown = null;
let actualError2: unknown = null;
const readable1 = new ReadableStream({
cancel(reason) {
actualError1 = reason; // catch error in readable1
},
});
const readable2 = readable1.pipeThrough(
// deno-lint-ignore require-yield
toTransformStream(function* () {
throw expectedError; // error from generator
}),
);
try {
await readable2.getReader().read();
} catch (error) {
actualError2 = error; // catch error in readable2
}
await t.step({
name: "to readable 1",
fn() {
assertEquals(actualError1, expectedError);
},
});
await t.step({
name: "to readable 2",
fn() {
assertEquals(actualError2, expectedError);
},
});
},
});
Deno.test({
name: "[streams] toTransformStream() Propagate cancellation from readable 2",
async fn(t) {
// When data is pipelined in the order of readable1 → generator → readable2,
// Propagate the cancellation that occurred in readable2 to readable1 and generator.
const expectedError = new Error("Error from readable2");
await t.step({
name: "to readable 1",
async fn() {
let actualError = null;
const readable1 = new ReadableStream({
cancel(reason) {
actualError = reason; // catch error in readable1
},
});
const readable2 = readable1.pipeThrough(
toTransformStream(function* () {
yield 0;
}),
);
await readable2.cancel(expectedError); // cancellation from readable2
assertEquals(actualError, expectedError);
},
});
await t.step({
name: "to readable 2",
async fn() {
let actualError = null;
const readable1 = new ReadableStream();
const readable2 = readable1.pipeThrough(
toTransformStream(function* () {
try {
yield 0;
} catch (error) {
actualError = error; // catch error in generator
}
}),
);
const reader = readable2.getReader();
await reader.read();
await reader.cancel(expectedError); // cancellation from readable2
assertEquals(actualError, expectedError);
},
});
},
});
Deno.test({
name:
"[streams] toTransformStream() Cancel streams with the correct error message",
async fn() {
const src = readableStreamFromIterable([0, 1, 2]);
// deno-lint-ignore require-yield
const transform = toTransformStream(function* (src) {
src.getReader(); // lock the source stream to cause error at cancel
throw new Error("foo");
});
await assertRejects(
async () => {
for await (const _ of src.pipeThrough(transform));
},
Error,
"foo",
);
},
});