stream: add pipeline() for webstreams

Refs: https://github.com/nodejs/node/issues/39316
PR-URL: https://github.com/nodejs/node/pull/46307
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
This commit is contained in:
Debadree Chatterjee 2023-02-03 01:15:42 +05:30 committed by GitHub
parent ebcc711e14
commit 23effb255e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 500 additions and 10 deletions

View File

@ -2698,6 +2698,9 @@ const cleanup = finished(rs, (err) => {
<!-- YAML <!-- YAML
added: v10.0.0 added: v10.0.0
changes: changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/46307
description: Added support for webstreams.
- version: v18.0.0 - version: v18.0.0
pr-url: https://github.com/nodejs/node/pull/41678 pr-url: https://github.com/nodejs/node/pull/41678
description: Passing an invalid callback to the `callback` argument description: Passing an invalid callback to the `callback` argument
@ -2714,13 +2717,14 @@ changes:
description: Add support for async generators. description: Add support for async generators.
--> -->
* `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]} * `streams` {Stream\[]|Iterable\[]|AsyncIterable\[]|Function\[]|
* `source` {Stream|Iterable|AsyncIterable|Function} ReadableStream\[]|WritableStream\[]|TransformStream\[]}
* `source` {Stream|Iterable|AsyncIterable|Function|ReadableStream}
* Returns: {Iterable|AsyncIterable} * Returns: {Iterable|AsyncIterable}
* `...transforms` {Stream|Function} * `...transforms` {Stream|Function|TransformStream}
* `source` {AsyncIterable} * `source` {AsyncIterable}
* Returns: {AsyncIterable} * Returns: {AsyncIterable}
* `destination` {Stream|Function} * `destination` {Stream|Function|WritableStream}
* `source` {AsyncIterable} * `source` {AsyncIterable}
* Returns: {AsyncIterable|Promise} * Returns: {AsyncIterable|Promise}
* `callback` {Function} Called when the pipeline is fully done. * `callback` {Function} Called when the pipeline is fully done.

View File

@ -35,6 +35,9 @@ const {
isReadable, isReadable,
isReadableNodeStream, isReadableNodeStream,
isNodeStream, isNodeStream,
isTransformStream,
isWebStream,
isReadableStream,
} = require('internal/streams/utils'); } = require('internal/streams/utils');
const { AbortController } = require('internal/abort_controller'); const { AbortController } = require('internal/abort_controller');
@ -88,7 +91,7 @@ async function* fromReadable(val) {
yield* Readable.prototype[SymbolAsyncIterator].call(val); yield* Readable.prototype[SymbolAsyncIterator].call(val);
} }
async function pump(iterable, writable, finish, { end }) { async function pumpToNode(iterable, writable, finish, { end }) {
let error; let error;
let onresolve = null; let onresolve = null;
@ -147,6 +150,35 @@ async function pump(iterable, writable, finish, { end }) {
} }
} }
async function pumpToWeb(readable, writable, finish, { end }) {
if (isTransformStream(writable)) {
writable = writable.writable;
}
// https://streams.spec.whatwg.org/#example-manual-write-with-backpressure
const writer = writable.getWriter();
try {
for await (const chunk of readable) {
await writer.ready;
writer.write(chunk).catch(() => {});
}
await writer.ready;
if (end) {
await writer.close();
}
finish();
} catch (err) {
try {
await writer.abort(err);
finish(err);
} catch (err) {
finish(err);
}
}
}
function pipeline(...streams) { function pipeline(...streams) {
return pipelineImpl(streams, once(popCallback(streams))); return pipelineImpl(streams, once(popCallback(streams)));
} }
@ -259,7 +291,11 @@ function pipelineImpl(streams, callback, opts) {
ret = Duplex.from(stream); ret = Duplex.from(stream);
} }
} else if (typeof stream === 'function') { } else if (typeof stream === 'function') {
ret = makeAsyncIterable(ret); if (isTransformStream(ret)) {
ret = makeAsyncIterable(ret?.readable);
} else {
ret = makeAsyncIterable(ret);
}
ret = stream(ret, { signal }); ret = stream(ret, { signal });
if (reading) { if (reading) {
@ -303,7 +339,11 @@ function pipelineImpl(streams, callback, opts) {
); );
} else if (isIterable(ret, true)) { } else if (isIterable(ret, true)) {
finishCount++; finishCount++;
pump(ret, pt, finish, { end }); pumpToNode(ret, pt, finish, { end });
} else if (isReadableStream(ret) || isTransformStream(ret)) {
const toRead = ret.readable || ret;
finishCount++;
pumpToNode(toRead, pt, finish, { end });
} else { } else {
throw new ERR_INVALID_RETURN_VALUE( throw new ERR_INVALID_RETURN_VALUE(
'AsyncIterable or Promise', 'destination', ret); 'AsyncIterable or Promise', 'destination', ret);
@ -324,12 +364,30 @@ function pipelineImpl(streams, callback, opts) {
if (isReadable(stream) && isLastStream) { if (isReadable(stream) && isLastStream) {
lastStreamCleanup.push(cleanup); lastStreamCleanup.push(cleanup);
} }
} else if (isTransformStream(ret) || isReadableStream(ret)) {
const toRead = ret.readable || ret;
finishCount++;
pumpToNode(toRead, stream, finish, { end });
} else if (isIterable(ret)) { } else if (isIterable(ret)) {
finishCount++; finishCount++;
pump(ret, stream, finish, { end }); pumpToNode(ret, stream, finish, { end });
} else { } else {
throw new ERR_INVALID_ARG_TYPE( throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable'], ret); 'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
}
ret = stream;
} else if (isWebStream(stream)) {
if (isReadableNodeStream(ret)) {
finishCount++;
pumpToWeb(makeAsyncIterable(ret), stream, finish, { end });
} else if (isReadableStream(ret) || isIterable(ret)) {
finishCount++;
pumpToWeb(ret, stream, finish, { end });
} else if (isTransformStream(ret)) {
pumpToWeb(ret.readable, stream, finish, { end });
} else {
throw new ERR_INVALID_ARG_TYPE(
'val', ['Readable', 'Iterable', 'AsyncIterable', 'ReadableStream', 'TransformStream'], ret);
} }
ret = stream; ret = stream;
} else { } else {

View File

@ -77,6 +77,19 @@ function isWritableStream(obj) {
); );
} }
function isTransformStream(obj) {
return !!(
obj &&
!isNodeStream(obj) &&
typeof obj.readable === 'object' &&
typeof obj.writable === 'object'
);
}
function isWebStream(obj) {
return isReadableStream(obj) || isWritableStream(obj) || isTransformStream(obj);
}
function isIterable(obj, isAsync) { function isIterable(obj, isAsync) {
if (obj == null) return false; if (obj == null) return false;
if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function'; if (isAsync === true) return typeof obj[SymbolAsyncIterator] === 'function';
@ -303,6 +316,7 @@ module.exports = {
isReadableFinished, isReadableFinished,
isReadableErrored, isReadableErrored,
isNodeStream, isNodeStream,
isWebStream,
isWritable, isWritable,
isWritableNodeStream, isWritableNodeStream,
isWritableStream, isWritableStream,
@ -312,4 +326,5 @@ module.exports = {
isServerRequest, isServerRequest,
isServerResponse, isServerResponse,
willEmitClose, willEmitClose,
isTransformStream,
}; };

View File

@ -8,6 +8,7 @@ const {
const { const {
isIterable, isIterable,
isNodeStream, isNodeStream,
isWebStream,
} = require('internal/streams/utils'); } = require('internal/streams/utils');
const { pipelineImpl: pl } = require('internal/streams/pipeline'); const { pipelineImpl: pl } = require('internal/streams/pipeline');
@ -21,7 +22,7 @@ function pipeline(...streams) {
let end; let end;
const lastArg = streams[streams.length - 1]; const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' && if (lastArg && typeof lastArg === 'object' &&
!isNodeStream(lastArg) && !isIterable(lastArg)) { !isNodeStream(lastArg) && !isIterable(lastArg) && !isWebStream(lastArg)) {
const options = ArrayPrototypePop(streams); const options = ArrayPrototypePop(streams);
signal = options.signal; signal = options.signal;
end = options.end; end = options.end;

View File

@ -0,0 +1,412 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const { Readable, Writable, Transform, pipeline } = require('stream');
const { pipeline: pipelinePromise } = require('stream/promises');
const { ReadableStream, WritableStream, TransformStream } = require('stream/web');
const http = require('http');
{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write(chunk) {
values.push(chunk);
}
});
pipeline(rs, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
}));
c.enqueue('hello');
c.enqueue('world');
c.close();
}
{
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write() { }
});
pipeline(rs, ws, common.mustCall((err) => {
assert.strictEqual(err?.message, 'kaboom');
}));
c.error(new Error('kaboom'));
}
{
let c;
const values = [];
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk?.toString().toUpperCase());
}
});
const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});
pipeline(rs, ts, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
}));
c.enqueue('hello');
c.enqueue('world');
c.close();
}
{
function makeTransformStream() {
return new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk?.toString());
}
});
}
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write() { }
});
pipeline(rs,
makeTransformStream(),
makeTransformStream(),
makeTransformStream(),
makeTransformStream(),
ws,
common.mustCall((err) => {
assert.strictEqual(err?.message, 'kaboom');
}));
c.error(new Error('kaboom'));
}
{
const values = [];
const r = new Readable({
read() { }
});
const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});
pipeline(r, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['helloworld']);
}));
r.push('hello');
r.push('world');
r.push(null);
}
{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const w = new Writable({
write(chunk, encoding, callback) {
values.push(chunk?.toString());
callback();
}
});
pipeline(rs, w, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
}));
c.enqueue('hello');
c.enqueue('world');
c.close();
}
{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});
const t = new Transform({
transform(chunk, encoding, callback) {
callback(null, chunk?.toString().toUpperCase());
}
});
pipeline(rs, t, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLOWORLD']);
}));
c.enqueue('hello');
c.enqueue('world');
c.close();
}
{
const server = http.createServer((req, res) => {
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.enqueue('world');
controller.close();
}
});
pipeline(rs, res, common.mustSucceed(() => {}));
});
server.listen(0, common.mustCall(() => {
const req = http.request({
port: server.address().port
});
req.end();
const values = [];
req.on('response', (res) => {
res.on('data', (chunk) => {
values.push(chunk?.toString());
});
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
server.close();
}));
});
}));
}
{
const values = [];
const server = http.createServer((req, res) => {
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk?.toString().toUpperCase());
}
});
pipeline(req, ts, res, common.mustSucceed());
});
server.listen(0, () => {
const req = http.request({
port: server.address().port,
method: 'POST',
});
const rs = new ReadableStream({
start(controller) {
controller.enqueue('hello');
controller.close();
}
});
pipeline(rs, req, common.mustSucceed());
req.on('response', (res) => {
res.on('data', (chunk) => {
values.push(chunk?.toString());
}
);
res.on('end', common.mustCall(() => {
assert.deepStrictEqual(values, ['HELLO']);
server.close();
}));
});
});
}
{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});
pipelinePromise(rs, ws).then(common.mustCall(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
}));
c.enqueue('hello');
c.enqueue('world');
c.close();
}
{
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write() { }
});
pipelinePromise(rs, ws).then(common.mustNotCall()).catch(common.mustCall((err) => {
assert.strictEqual(err?.message, 'kaboom');
}));
c.error(new Error('kaboom'));
}
{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
pipeline(rs, async function(source) {
for await (const chunk of source) {
values.push(chunk?.toString());
}
}, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['hello', 'world']);
}));
c.enqueue('hello');
c.enqueue('world');
c.close();
}
{
const rs = new ReadableStream({
start() {}
});
pipeline(rs, async function(source) {
throw new Error('kaboom');
}, (err) => {
assert.strictEqual(err?.message, 'kaboom');
});
}
{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ts = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk?.toString().toUpperCase());
}
});
pipeline(rs, ts, async function(source) {
for await (const chunk of source) {
values.push(chunk?.toString());
}
}, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
}));
c.enqueue('hello');
c.enqueue('world');
c.close();
}
{
const values = [];
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write(chunk) {
values.push(chunk?.toString());
}
});
pipeline(rs, async function* (source) {
for await (const chunk of source) {
yield chunk?.toString().toUpperCase();
}
}, ws, common.mustSucceed(() => {
assert.deepStrictEqual(values, ['HELLO', 'WORLD']);
}));
c.enqueue('hello');
c.enqueue('world');
c.close();
}
{
let c;
const rs = new ReadableStream({
start(controller) {
c = controller;
}
});
const ws = new WritableStream({
write(chunk) { }
}, { highWaterMark: 0 });
pipeline(rs, ws, common.mustNotCall());
for (let i = 0; i < 10; i++) {
c.enqueue(`${i}`);
}
c.close();
}