stream: support dispose in writable

Add support to Symbol.asyncDispose in writable streams.
Additionally add a test for writable, transform and duplex streams
who inherit from readable/writable to avoid breakage.

Co-authored-by: Robert Nagy <ronagy@icloud.com>
Co-authored-by: atlowChemi <chemi@atlow.co.il>
PR-URL: https://github.com/nodejs/node/pull/48547
Reviewed-By: Chemi Atlow <chemi@atlow.co.il>
Reviewed-By: Moshe Atlow <moshe@atlow.co.il>
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
This commit is contained in:
Benjamin Gruenbaum 2024-06-16 02:41:59 +03:00 committed by GitHub
parent 9351425c82
commit 75fe4f35d4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 76 additions and 10 deletions

View File

@ -954,6 +954,17 @@ added: v12.3.0
Getter for the property `objectMode` of a given `Writable` stream.
##### `writable[Symbol.asyncDispose]()`
<!-- YAML
added: REPLACEME
-->
> Stability: 1 - Experimental
Calls [`writable.destroy()`][writable-destroy] with an `AbortError` and returns
a promise that fulfills when the stream is finished.
##### `writable.write(chunk[, encoding][, callback])`
<!-- YAML

View File

@ -32,8 +32,10 @@ const {
ObjectDefineProperties,
ObjectDefineProperty,
ObjectSetPrototypeOf,
Promise,
StringPrototypeToLowerCase,
Symbol,
SymbolAsyncDispose,
SymbolHasInstance,
} = primordials;
@ -44,6 +46,7 @@ const EE = require('events');
const Stream = require('internal/streams/legacy').Stream;
const { Buffer } = require('buffer');
const destroyImpl = require('internal/streams/destroy');
const eos = require('internal/streams/end-of-stream');
const {
addAbortSignal,
@ -54,16 +57,19 @@ const {
getDefaultHighWaterMark,
} = require('internal/streams/state');
const {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
} = require('internal/errors').codes;
AbortError,
codes: {
ERR_INVALID_ARG_TYPE,
ERR_METHOD_NOT_IMPLEMENTED,
ERR_MULTIPLE_CALLBACK,
ERR_STREAM_ALREADY_FINISHED,
ERR_STREAM_CANNOT_PIPE,
ERR_STREAM_DESTROYED,
ERR_STREAM_NULL_VALUES,
ERR_STREAM_WRITE_AFTER_END,
ERR_UNKNOWN_ENCODING,
},
} = require('internal/errors');
const {
kState,
// bitfields
@ -1142,3 +1148,14 @@ Writable.fromWeb = function(writableStream, options) {
Writable.toWeb = function(streamWritable) {
return lazyWebStreams().newWritableStreamFromStreamWritable(streamWritable);
};
Writable.prototype[SymbolAsyncDispose] = function() {
let error;
if (!this.destroyed) {
error = this.writableFinished ? null : new AbortError();
this.destroy(error);
}
return new Promise((resolve, reject) =>
eos(this, (err) => (err && err.name !== 'AbortError' ? reject(err) : resolve(null))),
);
};

View File

@ -269,3 +269,18 @@ const assert = require('assert');
}));
duplex.destroy();
}
{
// Check Symbol.asyncDispose
const duplex = new Duplex({
write(chunk, enc, cb) { cb(); },
read() {},
});
let count = 0;
duplex.on('error', common.mustCall((e) => {
assert.strictEqual(count++, 0); // Ensure not called twice
assert.strictEqual(e.name, 'AbortError');
}));
duplex.on('close', common.mustCall());
duplex[Symbol.asyncDispose]().then(common.mustCall());
}

View File

@ -141,3 +141,14 @@ const assert = require('assert');
transform.destroy();
}
{
const transform = new Transform({
transform(chunk, enc, cb) {}
});
transform.on('error', common.mustCall((err) => {
assert.strictEqual(err.name, 'AbortError');
}));
transform.on('close', common.mustCall());
transform[Symbol.asyncDispose]().then(common.mustCall());
}

View File

@ -487,3 +487,15 @@ const assert = require('assert');
}));
s.destroy(_err);
}
{
const write = new Writable({
write(chunk, enc, cb) { cb(); }
});
write.on('error', common.mustCall((e) => {
assert.strictEqual(e.name, 'AbortError');
assert.strictEqual(write.destroyed, true);
}));
write[Symbol.asyncDispose]().then(common.mustCall());
}