std/cbor/_text_decoded_stream.ts

60 lines
1.9 KiB
TypeScript

// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
import type { ReleaseLock } from "./_common.ts";
/**
* A {@link ReadableStream} that wraps the decoded CBOR "Text String".
* [RFC 8949 - Concise Binary Object Representation (CBOR)](https://datatracker.ietf.org/doc/html/rfc8949)
*
* Instances of this class is created from {@link CborSequenceDecoderStream}.
* This class is not designed for you to create instances of it yourself. It is
* merely a way for you to validate the type being returned.
*
* @example Usage
* ```ts
* import { assert, assertEquals } from "@std/assert";
* import {
* CborSequenceDecoderStream,
* CborTextDecodedStream,
* CborTextEncoderStream,
* } from "@std/cbor";
*
* const rawMessage = "a".repeat(100);
*
* for await (
* const value of ReadableStream.from([rawMessage])
* .pipeThrough(new CborTextEncoderStream())
* .pipeThrough(new CborSequenceDecoderStream())
* ) {
* assert(typeof value === "string" || value instanceof CborTextDecodedStream);
* if (value instanceof CborTextDecodedStream) {
* assertEquals((await Array.fromAsync(value)).join(""), rawMessage);
* } else assertEquals(value, rawMessage);
* }
* ```
*/
export class CborTextDecodedStream extends ReadableStream<string> {
/**
* Constructs a new instance.
*
* @param gen A {@link AsyncGenerator<string>}.
* @param releaseLock A Function that's called when the stream is finished.
*/
constructor(gen: AsyncGenerator<string>, releaseLock: ReleaseLock) {
super({
async pull(controller) {
const { done, value } = await gen.next();
if (done) {
releaseLock();
controller.close();
} else controller.enqueue(value);
},
async cancel() {
// deno-lint-ignore no-empty
for await (const _ of gen) {}
releaseLock();
},
});
}
}