stream: handle a pending pull request from a released reader

In order to meet the specification, this includes mainly the followings:

- Adding the 'release steps' to ReadableStreamController
- Responding to a pull request from a released reader in
ReadableByteStreamController

Signed-off-by: Daeyeon Jeong daeyeon.dev@gmail.com
PR-URL: https://github.com/nodejs/node/pull/44702
Refs: https://streams.spec.whatwg.org/#abstract-opdef-readablebytestreamcontroller-releasesteps
Refs: https://streams.spec.whatwg.org/#readable-byte-stream-controller-respond-in-readable-state
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
This commit is contained in:
Daeyeon Jeong 2022-09-23 21:01:52 +09:00 committed by GitHub
parent 76364215d4
commit e213deabe5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 8 deletions

View File

@ -651,6 +651,10 @@ Signals an error that causes the {ReadableStream} to error and close.
<!-- YAML
added: v16.5.0
changes:
- version: REPLACEME
pr-url: https://github.com/nodejs/node/pull/44702
description: Support handling a BYOB pull request from a released reader.
-->
Every {ReadableStream} has a controller that is responsible for

View File

@ -139,6 +139,7 @@ const kClose = Symbol('kClose');
const kChunk = Symbol('kChunk');
const kError = Symbol('kError');
const kPull = Symbol('kPull');
const kRelease = Symbol('kRelease');
/**
* @typedef {import('../abort_controller').AbortSignal} AbortSignal
@ -1019,6 +1020,8 @@ class ReadableStreamDefaultController {
readableStreamDefaultControllerPullSteps(this, readRequest);
}
[kRelease]() {}
[kInspect](depth, options) {
return customInspect(depth, options, this[kType], { });
}
@ -1143,6 +1146,17 @@ class ReadableByteStreamController {
readableByteStreamControllerPullSteps(this, readRequest);
}
[kRelease]() {
const {
pendingPullIntos,
} = this[kState];
if (pendingPullIntos.length > 0) {
const firstPendingPullInto = pendingPullIntos[0];
firstPendingPullInto.type = 'none';
this[kState].pendingPullIntos = [firstPendingPullInto];
}
}
[kInspect](depth, options) {
return customInspect(depth, options, this[kType], { });
}
@ -2060,6 +2074,9 @@ function readableStreamReaderGenericRelease(reader) {
};
}
setPromiseHandled(reader[kState].close.promise);
stream[kState].controller[kRelease]();
stream[kState].reader = undefined;
reader[kState].stream = undefined;
}
@ -2365,6 +2382,8 @@ function readableByteStreamControllerClose(controller) {
function readableByteStreamControllerCommitPullIntoDescriptor(stream, desc) {
assert(stream[kState].state !== 'errored');
assert(desc.type !== 'none');
let done = false;
if (stream[kState].state === 'closed') {
desc.bytesFilled = 0;
@ -2574,6 +2593,9 @@ function readableByteStreamControllerRespond(controller, bytesWritten) {
function readableByteStreamControllerRespondInClosedState(controller, desc) {
assert(!desc.bytesFilled);
if (desc.type === 'none') {
readableByteStreamControllerShiftPendingPullInto(controller);
}
const {
stream,
} = controller[kState];
@ -2663,6 +2685,31 @@ function readableByteStreamControllerEnqueue(controller, chunk) {
readableByteStreamControllerCallPullIfNeeded(controller);
}
function readableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
buffer,
byteOffset,
byteLength
) {
let cloneResult;
try {
cloneResult = ArrayBufferPrototypeSlice(
buffer,
byteOffset,
byteOffset + byteLength
);
} catch (error) {
readableByteStreamControllerError(controller, error);
throw error;
}
readableByteStreamControllerEnqueueChunkToQueue(
controller,
cloneResult,
0,
byteLength
);
}
function readableByteStreamControllerEnqueueChunkToQueue(
controller,
buffer,
@ -2678,6 +2725,29 @@ function readableByteStreamControllerEnqueueChunkToQueue(
controller[kState].queueTotalSize += byteLength;
}
function readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
controller,
desc
) {
const {
buffer,
byteOffset,
bytesFilled,
type,
} = desc;
assert(type === 'none');
if (bytesFilled > 0) {
readableByteStreamControllerEnqueueClonedChunkToQueue(
controller,
buffer,
byteOffset,
bytesFilled
);
}
readableByteStreamControllerShiftPendingPullInto(controller);
}
function readableByteStreamControllerFillPullIntoDescriptorFromQueue(
controller,
desc) {
@ -2773,6 +2843,7 @@ function readableByteStreamControllerRespondInReadableState(
buffer,
bytesFilled,
byteLength,
type,
} = desc;
if (bytesFilled + bytesWritten > byteLength)
@ -2783,6 +2854,17 @@ function readableByteStreamControllerRespondInReadableState(
bytesWritten,
desc);
if (type === 'none') {
readableByteStreamControllerEnqueueDetachedPullIntoToQueue(
controller,
desc
);
readableByteStreamControllerProcessPullIntoDescriptorsUsingQueue(
controller
);
return;
}
if (desc.bytesFilled < desc.elementSize)
return;

View File

@ -16,17 +16,9 @@
"fail": {
"expected": [
"ReadableStream with byte source: enqueue() discards auto-allocated BYOB request",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respond()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 1 element Uint16Array, respond(1)",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader with 2 element Uint8Array, respond(3)",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, respondWithNewView()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, enqueue()",
"ReadableStream with byte source: releaseLock() with pending read(view), read(view) on second reader, close(), respond(0)",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, respond()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read() on second reader, enqueue()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, respond()",
"ReadableStream with byte source: autoAllocateChunkSize, releaseLock() with pending read(), read(view) on second reader, enqueue()",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read(view) on second reader with 1 element Uint16Array, respond(1)",
"ReadableStream with byte source: read(view) with 1 element Uint16Array, respond(1), releaseLock(), read() on second reader, enqueue()"
]
}