stream: add ReadableByteStream.tee()

This supports teeing readable byte streams to meet
the latest web streams standards.

Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com
PR-URL: https://github.com/nodejs/node/pull/44505
Refs: https://streams.spec.whatwg.org/#readable-stream-tee
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
This commit is contained in:
Daeyeon Jeong 2022-09-09 07:06:06 +09:00 committed by GitHub
parent 7f9cd60eef
commit e06384cb48
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 343 additions and 53 deletions

View File

@ -299,6 +299,10 @@ is active.
<!-- YAML
added: v16.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/44505
description: Support teeing a readable byte stream.
-->
* Returns: {ReadableStream\[]}

View File

@ -95,6 +95,7 @@ const {
ArrayBufferViewGetByteOffset,
ArrayBufferGetByteLength,
AsyncIterator,
cloneAsUint8Array,
copyArrayBuffer,
customInspect,
dequeueValue,
@ -215,6 +216,7 @@ class ReadableStream {
throw new ERR_INVALID_ARG_VALUE('source', 'Object', source);
this[kState] = {
disturbed: false,
reader: undefined,
state: 'readable',
storedError: undefined,
stream: undefined,
@ -1111,7 +1113,6 @@ class ReadableByteStreamController {
chunk);
}
const chunkByteLength = ArrayBufferViewGetByteLength(chunk);
const chunkByteOffset = ArrayBufferViewGetByteOffset(chunk);
const chunkBuffer = ArrayBufferViewGetBuffer(chunk);
const chunkBufferByteLength = ArrayBufferGetByteLength(chunkBuffer);
if (chunkByteLength === 0 || chunkBufferByteLength === 0) {
@ -1122,11 +1123,7 @@ class ReadableByteStreamController {
throw new ERR_INVALID_STATE.TypeError('Controller is already closed');
if (this[kState].stream[kState].state !== 'readable')
throw new ERR_INVALID_STATE.TypeError('ReadableStream is already closed');
readableByteStreamControllerEnqueue(
this,
chunkBuffer,
chunkByteLength,
chunkByteOffset);
readableByteStreamControllerEnqueue(this, chunk);
}
/**
@ -1430,6 +1427,13 @@ function readableStreamPipeTo(
}
function readableStreamTee(stream, cloneForBranch2) {
if (isReadableByteStreamController(stream[kState].controller)) {
return readableByteStreamTee(stream);
}
return readableStreamDefaultTee(stream, cloneForBranch2);
}
function readableStreamDefaultTee(stream, cloneForBranch2) {
const reader = new ReadableStreamDefaultReader(stream);
let reading = false;
let canceled1 = false;
@ -1524,6 +1528,284 @@ function readableStreamTee(stream, cloneForBranch2) {
return [branch1, branch2];
}
function readableByteStreamTee(stream) {
assert(isReadableStream(stream));
assert(isReadableByteStreamController(stream[kState].controller));
let reader = new ReadableStreamDefaultReader(stream);
let reading = false;
let readAgainForBranch1 = false;
let readAgainForBranch2 = false;
let canceled1 = false;
let canceled2 = false;
let reason1;
let reason2;
let branch1;
let branch2;
const cancelDeferred = createDeferredPromise();
function forwardReaderError(thisReader) {
PromisePrototypeThen(
thisReader[kState].close.promise,
undefined,
(error) => {
if (thisReader !== reader) {
return;
}
readableStreamDefaultControllerError(branch1[kState].controller, error);
readableStreamDefaultControllerError(branch2[kState].controller, error);
if (!canceled1 || !canceled2) {
cancelDeferred.resolve();
}
}
);
}
function pullWithDefaultReader() {
if (isReadableStreamBYOBReader(reader)) {
readableStreamBYOBReaderRelease(reader);
reader = new ReadableStreamDefaultReader(stream);
forwardReaderError(reader);
}
const readRequest = {
[kChunk](chunk) {
queueMicrotask(() => {
readAgainForBranch1 = false;
readAgainForBranch2 = false;
const chunk1 = chunk;
let chunk2 = chunk;
if (!canceled1 && !canceled2) {
try {
chunk2 = cloneAsUint8Array(chunk);
} catch (error) {
readableByteStreamControllerError(
branch1[kState].controller,
error
);
readableByteStreamControllerError(
branch2[kState].controller,
error
);
cancelDeferred.resolve(readableStreamCancel(stream, error));
return;
}
}
if (!canceled1) {
readableByteStreamControllerEnqueue(
branch1[kState].controller,
chunk1
);
}
if (!canceled2) {
readableByteStreamControllerEnqueue(
branch2[kState].controller,
chunk2
);
}
reading = false;
if (readAgainForBranch1) {
pull1Algorithm();
} else if (readAgainForBranch2) {
pull2Algorithm();
}
});
},
[kClose]() {
reading = false;
if (!canceled1) {
readableByteStreamControllerClose(branch1[kState].controller);
}
if (!canceled2) {
readableByteStreamControllerClose(branch2[kState].controller);
}
if (branch1[kState].controller[kState].pendingPullIntos.length > 0) {
readableByteStreamControllerRespond(branch1[kState].controller, 0);
}
if (branch2[kState].controller[kState].pendingPullIntos.length > 0) {
readableByteStreamControllerRespond(branch2[kState].controller, 0);
}
if (!canceled1 || !canceled2) {
cancelDeferred.resolve();
}
},
[kError]() {
reading = false;
},
};
readableStreamDefaultReaderRead(reader, readRequest);
}
function pullWithBYOBReader(view, forBranch2) {
if (isReadableStreamDefaultReader(reader)) {
readableStreamDefaultReaderRelease(reader);
reader = new ReadableStreamBYOBReader(stream);
forwardReaderError(reader);
}
const byobBranch = forBranch2 === true ? branch2 : branch1;
const otherBranch = forBranch2 === false ? branch2 : branch1;
const readIntoRequest = {
[kChunk](chunk) {
queueMicrotask(() => {
readAgainForBranch1 = false;
readAgainForBranch2 = false;
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
if (!otherCanceled) {
let clonedChunk;
try {
clonedChunk = cloneAsUint8Array(chunk);
} catch (error) {
readableByteStreamControllerError(
byobBranch[kState].controller,
error
);
readableByteStreamControllerError(
otherBranch[kState].controller,
error
);
cancelDeferred.resolve(readableStreamCancel(stream, error));
return;
}
if (!byobCanceled) {
readableByteStreamControllerRespondWithNewView(
byobBranch[kState].controller,
chunk
);
}
readableByteStreamControllerEnqueue(
otherBranch[kState].controller,
clonedChunk
);
} else if (!byobCanceled) {
readableByteStreamControllerRespondWithNewView(
byobBranch[kState].controller,
chunk
);
}
reading = false;
if (readAgainForBranch1) {
pull1Algorithm();
} else if (readAgainForBranch2) {
pull2Algorithm();
}
});
},
[kClose](chunk) {
reading = false;
const byobCanceled = forBranch2 === true ? canceled2 : canceled1;
const otherCanceled = forBranch2 === false ? canceled2 : canceled1;
if (!byobCanceled) {
readableByteStreamControllerClose(byobBranch[kState].controller);
}
if (!otherCanceled) {
readableByteStreamControllerClose(otherBranch[kState].controller);
}
if (chunk !== undefined) {
if (!byobCanceled) {
readableByteStreamControllerRespondWithNewView(
byobBranch[kState].controller,
chunk
);
}
if (
!otherCanceled &&
otherBranch[kState].controller[kState].pendingPullIntos.length > 0
) {
readableByteStreamControllerRespond(
otherBranch[kState].controller,
0
);
}
}
if (!byobCanceled || !otherCanceled) {
cancelDeferred.resolve();
}
},
[kError]() {
reading = false;
},
};
readableStreamBYOBReaderRead(reader, view, readIntoRequest);
}
function pull1Algorithm() {
if (reading) {
readAgainForBranch1 = true;
return PromiseResolve();
}
reading = true;
const byobRequest = branch1[kState].controller.byobRequest;
if (byobRequest === null) {
pullWithDefaultReader();
} else {
pullWithBYOBReader(byobRequest[kState].view, false);
}
return PromiseResolve();
}
function pull2Algorithm() {
if (reading) {
readAgainForBranch2 = true;
return PromiseResolve();
}
reading = true;
const byobRequest = branch2[kState].controller.byobRequest;
if (byobRequest === null) {
pullWithDefaultReader();
} else {
pullWithBYOBReader(byobRequest[kState].view, true);
}
return PromiseResolve();
}
function cancel1Algorithm(reason) {
canceled1 = true;
reason1 = reason;
if (canceled2) {
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
}
return cancelDeferred.promise;
}
function cancel2Algorithm(reason) {
canceled2 = true;
reason2 = reason;
if (canceled1) {
cancelDeferred.resolve(readableStreamCancel(stream, [reason1, reason2]));
}
return cancelDeferred.promise;
}
branch1 = new ReadableStream({
type: 'bytes',
pull: pull1Algorithm,
cancel: cancel1Algorithm,
});
branch2 = new ReadableStream({
type: 'bytes',
pull: pull2Algorithm,
cancel: cancel2Algorithm,
});
forwardReaderError(reader);
return [branch1, branch2];
}
function readableByteStreamControllerConvertPullIntoDescriptor(desc) {
const {
buffer,
@ -2317,11 +2599,7 @@ function readableByteStreamControllerFillHeadPullIntoDescriptor(
desc.bytesFilled += size;
}
function readableByteStreamControllerEnqueue(
controller,
buffer,
byteLength,
byteOffset) {
function readableByteStreamControllerEnqueue(controller, chunk) {
const {
closeRequested,
pendingPullIntos,
@ -2329,6 +2607,10 @@ function readableByteStreamControllerEnqueue(
stream,
} = controller[kState];
const buffer = ArrayBufferViewGetBuffer(chunk);
const byteOffset = ArrayBufferViewGetByteOffset(chunk);
const byteLength = ArrayBufferViewGetByteLength(chunk);
if (closeRequested || stream[kState].state !== 'readable')
return;

View File

@ -2,6 +2,7 @@
const {
ArrayBufferPrototype,
ArrayBufferPrototypeSlice,
ArrayPrototypePush,
ArrayPrototypeShift,
AsyncIteratorPrototype,
@ -112,6 +113,15 @@ function ArrayBufferGetByteLength(view) {
return ReflectGet(ArrayBufferPrototype, 'byteLength', view);
}
function cloneAsUint8Array(view) {
const buffer = ArrayBufferViewGetBuffer(view);
const byteOffset = ArrayBufferViewGetByteOffset(view);
const byteLength = ArrayBufferViewGetByteLength(view);
return new Uint8Array(
ArrayBufferPrototypeSlice(buffer, byteOffset, byteOffset + byteLength)
);
}
function isBrandCheck(brand) {
return (value) => {
return value != null &&
@ -236,6 +246,7 @@ module.exports = {
ArrayBufferViewGetByteOffset,
ArrayBufferGetByteLength,
AsyncIterator,
cloneAsUint8Array,
copyArrayBuffer,
customInspect,
dequeueValue,

View File

@ -1561,7 +1561,7 @@ class Source {
assert(!readableStreamDefaultControllerCanCloseOrEnqueue(controller));
readableStreamDefaultControllerEnqueue(controller);
readableByteStreamControllerClose(controller);
readableByteStreamControllerEnqueue(controller);
readableByteStreamControllerEnqueue(controller, new Uint8Array(1));
}
{

View File

@ -34,3 +34,37 @@ import assert from 'assert';
assert.strictEqual(dataReader2, 'foobar');
})().then(mustCall());
}
{
// Test ReadableByteStream.tee() with close in the nextTick after enqueue
async function read(stream) {
const chunks = [];
for await (const chunk of stream)
chunks.push(chunk);
return Buffer.concat(chunks).toString();
}
const [r1, r2] = new ReadableStream({
type: 'bytes',
start(controller) {
process.nextTick(() => {
controller.enqueue(new Uint8Array([102, 111, 111, 98, 97, 114]));
process.nextTick(() => {
controller.close();
});
});
}
}).tee();
(async () => {
const [dataReader1, dataReader2] = await Promise.all([
read(r1),
read(r2),
]);
assert.strictEqual(dataReader1, dataReader2);
assert.strictEqual(dataReader1, 'foobar');
assert.strictEqual(dataReader2, 'foobar');
})().then(mustCall());
}

View File

@ -1,11 +1,4 @@
{
"piping/abort.any.js": {
"fail": {
"expected": [
"pipeTo on a teed readable byte stream should only be aborted when both branches are aborted"
]
}
},
"queuing-strategies-size-function-per-global.window.js": {
"skip": "Browser-specific test"
},
@ -38,40 +31,6 @@
]
}
},
"readable-byte-streams/tee.any.js": {
"fail": {
"expected": [
"ReadableStream teeing with byte source: should be able to read one branch to the end without affecting the other",
"ReadableStream teeing with byte source: chunks should be cloned for each branch",
"ReadableStream teeing with byte source: chunks for BYOB requests from branch 1 should be cloned to branch 2",
"ReadableStream teeing with byte source: errors in the source should propagate to both branches",
"ReadableStream teeing with byte source: closing the original should close the branches",
"ReadableStream teeing with byte source: erroring the original should immediately error the branches",
"ReadableStream teeing with byte source: erroring the original should error pending reads from BYOB reader",
"ReadableStream teeing with byte source: canceling branch1 should finish when branch2 reads until end of stream",
"ReadableStream teeing with byte source: canceling branch1 should finish when original stream errors",
"ReadableStream teeing with byte source: should not pull any chunks if no branches are reading",
"ReadableStream teeing with byte source: should only pull enough to fill the emptiest queue",
"ReadableStream teeing with byte source: should not pull when original is already errored",
"ReadableStream teeing with byte source: stops pulling when original stream errors while branch 1 is reading",
"ReadableStream teeing with byte source: stops pulling when original stream errors while branch 2 is reading",
"ReadableStream teeing with byte source: stops pulling when original stream errors while both branches are reading",
"ReadableStream teeing with byte source: canceling both branches in sequence with delay",
"ReadableStream teeing with byte source: failing to cancel when canceling both branches in sequence with delay",
"ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch1, cancel branch2",
"ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch2, cancel branch1",
"ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch2, enqueue to branch1",
"ReadableStream teeing with byte source: read from branch1 and branch2, cancel branch1, respond to branch2",
"ReadableStream teeing with byte source: pull with BYOB reader, then pull with default reader",
"ReadableStream teeing with byte source: pull with default reader, then pull with BYOB reader",
"ReadableStream teeing with byte source: read from branch2, then read from branch1",
"ReadableStream teeing with byte source: read from branch1 with default reader, then close while branch2 has pending BYOB read",
"ReadableStream teeing with byte source: read from branch2 with default reader, then close while branch1 has pending BYOB read",
"ReadableStream teeing with byte source: close when both branches have pending BYOB reads",
"ReadableStream teeing with byte source: respond() and close() while both branches are pulling"
]
}
},
"readable-streams/cross-realm-crash.window.js": {
"skip": "Browser-specific test"
},