stream: add suport for abort signal in finished() for webstreams

Refs: https://github.com/nodejs/node/pull/46205
PR-URL: https://github.com/nodejs/node/pull/46403
Refs: https://github.com/nodejs/node/pull/37354
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
This commit is contained in:
Debadree Chatterjee 2023-02-03 00:47:26 +05:30 committed by GitHub
parent bd092054f7
commit ebcc711e14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 116 additions and 3 deletions

View File

@ -261,11 +261,34 @@ function eos(stream, options, callback) {
return cleanup;
}
function eosWeb(stream, opts, callback) {
function eosWeb(stream, options, callback) {
let isAborted = false;
let abort = nop;
if (options.signal) {
abort = () => {
isAborted = true;
callback.call(stream, new AbortError(undefined, { cause: options.signal.reason }));
};
if (options.signal.aborted) {
process.nextTick(abort);
} else {
const originalCallback = callback;
callback = once((...args) => {
options.signal.removeEventListener('abort', abort);
originalCallback.apply(stream, args);
});
options.signal.addEventListener('abort', abort);
}
}
const resolverFn = (...args) => {
if (!isAborted) {
process.nextTick(() => callback.apply(stream, args));
}
};
PromisePrototypeThen(
stream[kIsClosedPromise].promise,
() => process.nextTick(() => callback.call(stream)),
(err) => process.nextTick(() => callback.call(stream, err)),
resolverFn,
resolverFn
);
return nop;
}

View File

@ -230,3 +230,93 @@ const { finished: finishedPromise } = require('stream/promises');
assert.strictEqual(err?.message, 'asd');
});
}
{
// Check pre-cancelled
const signal = new EventTarget();
signal.aborted = true;
const rs = new ReadableStream({
start() {}
});
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}
{
// Check cancelled before the stream ends sync.
const ac = new AbortController();
const { signal } = ac;
const rs = new ReadableStream({
start() {}
});
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
ac.abort();
}
{
// Check cancelled before the stream ends async.
const ac = new AbortController();
const { signal } = ac;
const rs = new ReadableStream({
start() {}
});
setTimeout(() => ac.abort(), 1);
finished(rs, { signal }, common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
}
{
// Check cancelled after doesn't throw.
const ac = new AbortController();
const { signal } = ac;
const rs = new ReadableStream({
start(controller) {
controller.enqueue('asd');
controller.close();
}
});
finished(rs, { signal }, common.mustSucceed());
rs.getReader().read().then(common.mustCall((chunk) => {
assert.strictEqual(chunk.value, 'asd');
setImmediate(() => ac.abort());
}));
}
{
// Promisified abort works
async function run() {
const ac = new AbortController();
const { signal } = ac;
const rs = new ReadableStream({
start() {}
});
setImmediate(() => ac.abort());
await finishedPromise(rs, { signal });
}
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
}
{
// Promisified pre-aborted works
async function run() {
const signal = new EventTarget();
signal.aborted = true;
const rs = new ReadableStream({
start() {}
});
await finishedPromise(rs, { signal });
}
assert.rejects(run, { name: 'AbortError' }).then(common.mustCall());
}