mirror of
https://github.com/denoland/std.git
synced 2024-11-22 04:59:05 +00:00
596 lines
17 KiB
TypeScript
596 lines
17 KiB
TypeScript
// Copyright 2018-2022 the Deno authors. All rights reserved. MIT license.
|
|
|
|
import { Buffer } from "../io/buffer.ts";
|
|
|
|
const DEFAULT_CHUNK_SIZE = 16_640;
|
|
const DEFAULT_BUFFER_SIZE = 32 * 1024;
|
|
|
|
function isCloser(value: unknown): value is Deno.Closer {
|
|
return typeof value === "object" && value != null && "close" in value &&
|
|
// deno-lint-ignore no-explicit-any
|
|
typeof (value as Record<string, any>)["close"] === "function";
|
|
}
|
|
|
|
/** Create a `Deno.Reader` from an iterable of `Uint8Array`s.
|
|
*
|
|
* ```ts
|
|
* import { readerFromIterable, copy } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* const file = await Deno.open("metrics.txt", { write: true });
|
|
* const reader = readerFromIterable((async function* () {
|
|
* while (true) {
|
|
* await new Promise((r) => setTimeout(r, 1000));
|
|
* const message = `data: ${JSON.stringify(Deno.metrics())}\n\n`;
|
|
* yield new TextEncoder().encode(message);
|
|
* }
|
|
* })());
|
|
* await copy(reader, file);
|
|
* ```
|
|
*/
|
|
export function readerFromIterable(
|
|
iterable: Iterable<Uint8Array> | AsyncIterable<Uint8Array>,
|
|
): Deno.Reader {
|
|
const iterator: Iterator<Uint8Array> | AsyncIterator<Uint8Array> =
|
|
(iterable as AsyncIterable<Uint8Array>)[Symbol.asyncIterator]?.() ??
|
|
(iterable as Iterable<Uint8Array>)[Symbol.iterator]?.();
|
|
const buffer = new Buffer();
|
|
return {
|
|
async read(p: Uint8Array): Promise<number | null> {
|
|
if (buffer.length == 0) {
|
|
const result = await iterator.next();
|
|
if (result.done) {
|
|
return null;
|
|
} else {
|
|
if (result.value.byteLength <= p.byteLength) {
|
|
p.set(result.value);
|
|
return result.value.byteLength;
|
|
}
|
|
p.set(result.value.subarray(0, p.byteLength));
|
|
await writeAll(buffer, result.value.subarray(p.byteLength));
|
|
return p.byteLength;
|
|
}
|
|
} else {
|
|
const n = await buffer.read(p);
|
|
if (n == null) {
|
|
return this.read(p);
|
|
}
|
|
return n;
|
|
}
|
|
},
|
|
};
|
|
}
|
|
|
|
/** Create a `Writer` from a `WritableStreamDefaultWriter`. */
|
|
export function writerFromStreamWriter(
|
|
streamWriter: WritableStreamDefaultWriter<Uint8Array>,
|
|
): Deno.Writer {
|
|
return {
|
|
async write(p: Uint8Array): Promise<number> {
|
|
await streamWriter.ready;
|
|
await streamWriter.write(p);
|
|
return p.length;
|
|
},
|
|
};
|
|
}
|
|
|
|
/** Create a `Reader` from a `ReadableStreamDefaultReader`. */
|
|
export function readerFromStreamReader(
|
|
streamReader: ReadableStreamDefaultReader<Uint8Array>,
|
|
): Deno.Reader {
|
|
const buffer = new Buffer();
|
|
|
|
return {
|
|
async read(p: Uint8Array): Promise<number | null> {
|
|
if (buffer.empty()) {
|
|
const res = await streamReader.read();
|
|
if (res.done) {
|
|
return null; // EOF
|
|
}
|
|
|
|
await writeAll(buffer, res.value);
|
|
}
|
|
|
|
return buffer.read(p);
|
|
},
|
|
};
|
|
}
|
|
|
|
export interface WritableStreamFromWriterOptions {
|
|
/**
|
|
* If the `writer` is also a `Deno.Closer`, automatically close the `writer`
|
|
* when the stream is closed, aborted, or a write error occurs.
|
|
*
|
|
* Defaults to `true`. */
|
|
autoClose?: boolean;
|
|
}
|
|
|
|
/** Create a `WritableStream` from a `Writer`. */
|
|
export function writableStreamFromWriter(
|
|
writer: Deno.Writer,
|
|
options: WritableStreamFromWriterOptions = {},
|
|
): WritableStream<Uint8Array> {
|
|
const { autoClose = true } = options;
|
|
|
|
return new WritableStream({
|
|
async write(chunk, controller) {
|
|
try {
|
|
await writeAll(writer, chunk);
|
|
} catch (e) {
|
|
controller.error(e);
|
|
if (isCloser(writer) && autoClose) {
|
|
writer.close();
|
|
}
|
|
}
|
|
},
|
|
close() {
|
|
if (isCloser(writer) && autoClose) {
|
|
writer.close();
|
|
}
|
|
},
|
|
abort() {
|
|
if (isCloser(writer) && autoClose) {
|
|
writer.close();
|
|
}
|
|
},
|
|
});
|
|
}
|
|
|
|
/** Create a `ReadableStream` from any kind of iterable.
|
|
*
|
|
* ```ts
|
|
* import { readableStreamFromIterable } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* const r1 = readableStreamFromIterable(["foo, bar, baz"]);
|
|
* const r2 = readableStreamFromIterable(async function* () {
|
|
* await new Promise(((r) => setTimeout(r, 1000)));
|
|
* yield "foo";
|
|
* await new Promise(((r) => setTimeout(r, 1000)));
|
|
* yield "bar";
|
|
* await new Promise(((r) => setTimeout(r, 1000)));
|
|
* yield "baz";
|
|
* }());
|
|
* ```
|
|
*
|
|
* If the produced iterator (`iterable[Symbol.asyncIterator]()` or
|
|
* `iterable[Symbol.iterator]()`) is a generator, or more specifically is found
|
|
* to have a `.throw()` method on it, that will be called upon
|
|
* `readableStream.cancel()`. This is the case for the second input type above:
|
|
*
|
|
* ```ts
|
|
* import { readableStreamFromIterable } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* const r3 = readableStreamFromIterable(async function* () {
|
|
* try {
|
|
* yield "foo";
|
|
* } catch (error) {
|
|
* console.log(error); // Error: Cancelled by consumer.
|
|
* }
|
|
* }());
|
|
* const reader = r3.getReader();
|
|
* console.log(await reader.read()); // { value: "foo", done: false }
|
|
* await reader.cancel(new Error("Cancelled by consumer."));
|
|
* ```
|
|
*/
|
|
export function readableStreamFromIterable<T>(
|
|
iterable: Iterable<T> | AsyncIterable<T>,
|
|
): ReadableStream<T> {
|
|
const iterator: Iterator<T> | AsyncIterator<T> =
|
|
(iterable as AsyncIterable<T>)[Symbol.asyncIterator]?.() ??
|
|
(iterable as Iterable<T>)[Symbol.iterator]?.();
|
|
return new ReadableStream({
|
|
async pull(controller) {
|
|
const { value, done } = await iterator.next();
|
|
if (done) {
|
|
controller.close();
|
|
} else {
|
|
controller.enqueue(value);
|
|
}
|
|
},
|
|
async cancel(reason) {
|
|
if (typeof iterator.throw == "function") {
|
|
try {
|
|
await iterator.throw(reason);
|
|
} catch { /* `iterator.throw()` always throws on site. We catch it. */ }
|
|
}
|
|
},
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Convert the generator function into a TransformStream.
|
|
*
|
|
* ```ts
|
|
* import { readableStreamFromIterable, toTransformStream } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* const readable = readableStreamFromIterable([0, 1, 2])
|
|
* .pipeThrough(toTransformStream(async function* (src) {
|
|
* for await (const chunk of src) {
|
|
* yield chunk * 100;
|
|
* }
|
|
* }));
|
|
*
|
|
* for await (const chunk of readable) {
|
|
* console.log(chunk);
|
|
* }
|
|
* // output: 0, 100, 200
|
|
* ```
|
|
*
|
|
* @param transformer A function to transform.
|
|
* @param writableStrategy An object that optionally defines a queuing strategy for the stream.
|
|
* @param readableStrategy An object that optionally defines a queuing strategy for the stream.
|
|
*/
|
|
export function toTransformStream<I, O>(
|
|
transformer: (src: ReadableStream<I>) => Iterable<O> | AsyncIterable<O>,
|
|
writableStrategy?: QueuingStrategy<I>,
|
|
readableStrategy?: QueuingStrategy<O>,
|
|
): TransformStream<I, O> {
|
|
const {
|
|
writable,
|
|
readable,
|
|
} = new TransformStream<I, I>(undefined, writableStrategy);
|
|
|
|
const iterable = transformer(readable);
|
|
const iterator: Iterator<O> | AsyncIterator<O> =
|
|
(iterable as AsyncIterable<O>)[Symbol.asyncIterator]?.() ??
|
|
(iterable as Iterable<O>)[Symbol.iterator]?.();
|
|
return {
|
|
writable,
|
|
readable: new ReadableStream<O>({
|
|
async pull(controller) {
|
|
let result: IteratorResult<O>;
|
|
try {
|
|
result = await iterator.next();
|
|
} catch (error) {
|
|
// Propagate error to stream from iterator
|
|
// If the stream status is "errored", it will be thrown, but ignore.
|
|
await readable.cancel(error).catch(() => {});
|
|
controller.error(error);
|
|
return;
|
|
}
|
|
if (result.done) {
|
|
controller.close();
|
|
return;
|
|
}
|
|
controller.enqueue(result.value);
|
|
},
|
|
async cancel(reason) {
|
|
// Propagate cancellation to readable and iterator
|
|
if (typeof iterator.throw == "function") {
|
|
try {
|
|
await iterator.throw(reason);
|
|
} catch {
|
|
/* `iterator.throw()` always throws on site. We catch it. */
|
|
}
|
|
}
|
|
await readable.cancel(reason);
|
|
},
|
|
}, readableStrategy),
|
|
};
|
|
}
|
|
|
|
export interface ReadableStreamFromReaderOptions {
|
|
/** If the `reader` is also a `Deno.Closer`, automatically close the `reader`
|
|
* when `EOF` is encountered, or a read error occurs.
|
|
*
|
|
* Defaults to `true`. */
|
|
autoClose?: boolean;
|
|
|
|
/** The size of chunks to allocate to read, the default is ~16KiB, which is
|
|
* the maximum size that Deno operations can currently support. */
|
|
chunkSize?: number;
|
|
|
|
/** The queuing strategy to create the `ReadableStream` with. */
|
|
strategy?: { highWaterMark?: number | undefined; size?: undefined };
|
|
}
|
|
|
|
/**
|
|
* Create a `ReadableStream<Uint8Array>` from from a `Deno.Reader`.
|
|
*
|
|
* When the pull algorithm is called on the stream, a chunk from the reader
|
|
* will be read. When `null` is returned from the reader, the stream will be
|
|
* closed along with the reader (if it is also a `Deno.Closer`).
|
|
*
|
|
* An example converting a `Deno.FsFile` into a readable stream:
|
|
*
|
|
* ```ts
|
|
* import { readableStreamFromReader } from "https://deno.land/std@$STD_VERSION/streams/mod.ts";
|
|
*
|
|
* const file = await Deno.open("./file.txt", { read: true });
|
|
* const fileStream = readableStreamFromReader(file);
|
|
* ```
|
|
*/
|
|
export function readableStreamFromReader(
|
|
reader: Deno.Reader | (Deno.Reader & Deno.Closer),
|
|
options: ReadableStreamFromReaderOptions = {},
|
|
): ReadableStream<Uint8Array> {
|
|
const {
|
|
autoClose = true,
|
|
chunkSize = DEFAULT_CHUNK_SIZE,
|
|
strategy,
|
|
} = options;
|
|
|
|
return new ReadableStream({
|
|
async pull(controller) {
|
|
const chunk = new Uint8Array(chunkSize);
|
|
try {
|
|
const read = await reader.read(chunk);
|
|
if (read === null) {
|
|
if (isCloser(reader) && autoClose) {
|
|
reader.close();
|
|
}
|
|
controller.close();
|
|
return;
|
|
}
|
|
controller.enqueue(chunk.subarray(0, read));
|
|
} catch (e) {
|
|
controller.error(e);
|
|
if (isCloser(reader)) {
|
|
reader.close();
|
|
}
|
|
}
|
|
},
|
|
cancel() {
|
|
if (isCloser(reader) && autoClose) {
|
|
reader.close();
|
|
}
|
|
},
|
|
}, strategy);
|
|
}
|
|
|
|
/** Read Reader `r` until EOF (`null`) and resolve to the content as
|
|
* Uint8Array`.
|
|
*
|
|
* ```ts
|
|
* import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts";
|
|
* import { readAll } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* // Example from stdin
|
|
* const stdinContent = await readAll(Deno.stdin);
|
|
*
|
|
* // Example from file
|
|
* const file = await Deno.open("my_file.txt", {read: true});
|
|
* const myFileContent = await readAll(file);
|
|
* file.close();
|
|
*
|
|
* // Example from buffer
|
|
* const myData = new Uint8Array(100);
|
|
* // ... fill myData array with data
|
|
* const reader = new Buffer(myData.buffer);
|
|
* const bufferContent = await readAll(reader);
|
|
* ```
|
|
*/
|
|
export async function readAll(r: Deno.Reader): Promise<Uint8Array> {
|
|
const buf = new Buffer();
|
|
await buf.readFrom(r);
|
|
return buf.bytes();
|
|
}
|
|
|
|
/** Synchronously reads Reader `r` until EOF (`null`) and returns the content
|
|
* as `Uint8Array`.
|
|
*
|
|
* ```ts
|
|
* import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts";
|
|
* import { readAllSync } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* // Example from stdin
|
|
* const stdinContent = readAllSync(Deno.stdin);
|
|
*
|
|
* // Example from file
|
|
* const file = Deno.openSync("my_file.txt", {read: true});
|
|
* const myFileContent = readAllSync(file);
|
|
* file.close();
|
|
*
|
|
* // Example from buffer
|
|
* const myData = new Uint8Array(100);
|
|
* // ... fill myData array with data
|
|
* const reader = new Buffer(myData.buffer);
|
|
* const bufferContent = readAllSync(reader);
|
|
* ```
|
|
*/
|
|
export function readAllSync(r: Deno.ReaderSync): Uint8Array {
|
|
const buf = new Buffer();
|
|
buf.readFromSync(r);
|
|
return buf.bytes();
|
|
}
|
|
|
|
/** Write all the content of the array buffer (`arr`) to the writer (`w`).
|
|
*
|
|
* ```ts
|
|
* import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts";
|
|
* import { writeAll } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
|
|
* // Example writing to stdout
|
|
* let contentBytes = new TextEncoder().encode("Hello World");
|
|
* await writeAll(Deno.stdout, contentBytes);
|
|
*
|
|
* // Example writing to file
|
|
* contentBytes = new TextEncoder().encode("Hello World");
|
|
* const file = await Deno.open('test.file', {write: true});
|
|
* await writeAll(file, contentBytes);
|
|
* file.close();
|
|
*
|
|
* // Example writing to buffer
|
|
* contentBytes = new TextEncoder().encode("Hello World");
|
|
* const writer = new Buffer();
|
|
* await writeAll(writer, contentBytes);
|
|
* console.log(writer.bytes().length); // 11
|
|
* ```
|
|
*/
|
|
export async function writeAll(w: Deno.Writer, arr: Uint8Array) {
|
|
let nwritten = 0;
|
|
while (nwritten < arr.length) {
|
|
nwritten += await w.write(arr.subarray(nwritten));
|
|
}
|
|
}
|
|
|
|
/** Synchronously write all the content of the array buffer (`arr`) to the
|
|
* writer (`w`).
|
|
*
|
|
* ```ts
|
|
* import { Buffer } from "https://deno.land/std@$STD_VERSION/io/buffer.ts";
|
|
* import { writeAllSync } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* // Example writing to stdout
|
|
* let contentBytes = new TextEncoder().encode("Hello World");
|
|
* writeAllSync(Deno.stdout, contentBytes);
|
|
*
|
|
* // Example writing to file
|
|
* contentBytes = new TextEncoder().encode("Hello World");
|
|
* const file = Deno.openSync('test.file', {write: true});
|
|
* writeAllSync(file, contentBytes);
|
|
* file.close();
|
|
*
|
|
* // Example writing to buffer
|
|
* contentBytes = new TextEncoder().encode("Hello World");
|
|
* const writer = new Buffer();
|
|
* writeAllSync(writer, contentBytes);
|
|
* console.log(writer.bytes().length); // 11
|
|
* ```
|
|
*/
|
|
export function writeAllSync(w: Deno.WriterSync, arr: Uint8Array) {
|
|
let nwritten = 0;
|
|
while (nwritten < arr.length) {
|
|
nwritten += w.writeSync(arr.subarray(nwritten));
|
|
}
|
|
}
|
|
|
|
/** Turns a Reader, `r`, into an async iterator.
|
|
*
|
|
* ```ts
|
|
* import { iterateReader } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* let f = await Deno.open("/etc/passwd");
|
|
* for await (const chunk of iterateReader(f)) {
|
|
* console.log(chunk);
|
|
* }
|
|
* f.close();
|
|
* ```
|
|
*
|
|
* Second argument can be used to tune size of a buffer.
|
|
* Default size of the buffer is 32kB.
|
|
*
|
|
* ```ts
|
|
* import { iterateReader } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* let f = await Deno.open("/etc/passwd");
|
|
* const it = iterateReader(f, {
|
|
* bufSize: 1024 * 1024
|
|
* });
|
|
* for await (const chunk of it) {
|
|
* console.log(chunk);
|
|
* }
|
|
* f.close();
|
|
* ```
|
|
*/
|
|
export async function* iterateReader(
|
|
r: Deno.Reader,
|
|
options?: {
|
|
bufSize?: number;
|
|
},
|
|
): AsyncIterableIterator<Uint8Array> {
|
|
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
|
|
const b = new Uint8Array(bufSize);
|
|
while (true) {
|
|
const result = await r.read(b);
|
|
if (result === null) {
|
|
break;
|
|
}
|
|
|
|
yield b.slice(0, result);
|
|
}
|
|
}
|
|
|
|
/** Turns a ReaderSync, `r`, into an iterator.
|
|
*
|
|
* ```ts
|
|
* import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* let f = Deno.openSync("/etc/passwd");
|
|
* for (const chunk of iterateReaderSync(f)) {
|
|
* console.log(chunk);
|
|
* }
|
|
* f.close();
|
|
* ```
|
|
*
|
|
* Second argument can be used to tune size of a buffer.
|
|
* Default size of the buffer is 32kB.
|
|
*
|
|
* ```ts
|
|
* import { iterateReaderSync } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
|
|
* let f = await Deno.open("/etc/passwd");
|
|
* const iter = iterateReaderSync(f, {
|
|
* bufSize: 1024 * 1024
|
|
* });
|
|
* for (const chunk of iter) {
|
|
* console.log(chunk);
|
|
* }
|
|
* f.close();
|
|
* ```
|
|
*
|
|
* Iterator uses an internal buffer of fixed size for efficiency; it returns
|
|
* a view on that buffer on each iteration. It is therefore caller's
|
|
* responsibility to copy contents of the buffer if needed; otherwise the
|
|
* next iteration will overwrite contents of previously returned chunk.
|
|
*/
|
|
export function* iterateReaderSync(
|
|
r: Deno.ReaderSync,
|
|
options?: {
|
|
bufSize?: number;
|
|
},
|
|
): IterableIterator<Uint8Array> {
|
|
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
|
|
const b = new Uint8Array(bufSize);
|
|
while (true) {
|
|
const result = r.readSync(b);
|
|
if (result === null) {
|
|
break;
|
|
}
|
|
|
|
yield b.slice(0, result);
|
|
}
|
|
}
|
|
|
|
/** Copies from `src` to `dst` until either EOF (`null`) is read from `src` or
|
|
* an error occurs. It resolves to the number of bytes copied or rejects with
|
|
* the first error encountered while copying.
|
|
*
|
|
* ```ts
|
|
* import { copy } from "https://deno.land/std@$STD_VERSION/streams/conversion.ts";
|
|
*
|
|
* const source = await Deno.open("my_file.txt");
|
|
* const bytesCopied1 = await copy(source, Deno.stdout);
|
|
* const destination = await Deno.create("my_file_2.txt");
|
|
* const bytesCopied2 = await copy(source, destination);
|
|
* ```
|
|
*
|
|
* @param src The source to copy from
|
|
* @param dst The destination to copy to
|
|
* @param options Can be used to tune size of the buffer. Default size is 32kB
|
|
*/
|
|
export async function copy(
|
|
src: Deno.Reader,
|
|
dst: Deno.Writer,
|
|
options?: {
|
|
bufSize?: number;
|
|
},
|
|
): Promise<number> {
|
|
let n = 0;
|
|
const bufSize = options?.bufSize ?? DEFAULT_BUFFER_SIZE;
|
|
const b = new Uint8Array(bufSize);
|
|
let gotEOF = false;
|
|
while (gotEOF === false) {
|
|
const result = await src.read(b);
|
|
if (result === null) {
|
|
gotEOF = true;
|
|
} else {
|
|
let nwritten = 0;
|
|
while (nwritten < result) {
|
|
nwritten += await dst.write(b.subarray(nwritten, result));
|
|
}
|
|
n += nwritten;
|
|
}
|
|
}
|
|
return n;
|
|
}
|