diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js index 07f80aedc69..04ceb72460a 100644 --- a/lib/internal/streams/end-of-stream.js +++ b/lib/internal/streams/end-of-stream.js @@ -261,11 +261,34 @@ function eos(stream, options, callback) { return cleanup; } -function eosWeb(stream, opts, callback) { +function eosWeb(stream, options, callback) { + let isAborted = false; + let abort = nop; + if (options.signal) { + abort = () => { + isAborted = true; + callback.call(stream, new AbortError(undefined, { cause: options.signal.reason })); + }; + if (options.signal.aborted) { + process.nextTick(abort); + } else { + const originalCallback = callback; + callback = once((...args) => { + options.signal.removeEventListener('abort', abort); + originalCallback.apply(stream, args); + }); + options.signal.addEventListener('abort', abort); + } + } + const resolverFn = (...args) => { + if (!isAborted) { + process.nextTick(() => callback.apply(stream, args)); + } + }; PromisePrototypeThen( stream[kIsClosedPromise].promise, - () => process.nextTick(() => callback.call(stream)), - (err) => process.nextTick(() => callback.call(stream, err)), + resolverFn, + resolverFn ); return nop; } diff --git a/test/parallel/test-webstreams-finished.js b/test/parallel/test-webstreams-finished.js index 65a14d863eb..2a19c1ebae3 100644 --- a/test/parallel/test-webstreams-finished.js +++ b/test/parallel/test-webstreams-finished.js @@ -230,3 +230,93 @@ const { finished: finishedPromise } = require('stream/promises'); assert.strictEqual(err?.message, 'asd'); }); } + +{ + // Check pre-cancelled + const signal = new EventTarget(); + signal.aborted = true; + + const rs = new ReadableStream({ + start() {} + }); + finished(rs, { signal }, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); +} + +{ + // Check cancelled before the stream ends sync. + const ac = new AbortController(); + const { signal } = ac; + + const rs = new ReadableStream({ + start() {} + }); + finished(rs, { signal }, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); + + ac.abort(); +} + +{ + // Check cancelled before the stream ends async. + const ac = new AbortController(); + const { signal } = ac; + + const rs = new ReadableStream({ + start() {} + }); + setTimeout(() => ac.abort(), 1); + finished(rs, { signal }, common.mustCall((err) => { + assert.strictEqual(err.name, 'AbortError'); + })); +} + +{ + // Check cancelled after doesn't throw. + const ac = new AbortController(); + const { signal } = ac; + + const rs = new ReadableStream({ + start(controller) { + controller.enqueue('asd'); + controller.close(); + } + }); + finished(rs, { signal }, common.mustSucceed()); + + rs.getReader().read().then(common.mustCall((chunk) => { + assert.strictEqual(chunk.value, 'asd'); + setImmediate(() => ac.abort()); + })); +} + +{ + // Promisified abort works + async function run() { + const ac = new AbortController(); + const { signal } = ac; + const rs = new ReadableStream({ + start() {} + }); + setImmediate(() => ac.abort()); + await finishedPromise(rs, { signal }); + } + + assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); +} + +{ + // Promisified pre-aborted works + async function run() { + const signal = new EventTarget(); + signal.aborted = true; + const rs = new ReadableStream({ + start() {} + }); + await finishedPromise(rs, { signal }); + } + + assert.rejects(run, { name: 'AbortError' }).then(common.mustCall()); +}