stream: save error in state

Useful for future PR's to resolve situations where e.g. finished()
is invoked on an already errored streams.

PR-URL: https://github.com/nodejs/node/pull/34103
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Denys Otrishko <shishugi@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
This commit is contained in:
Robert Nagy 2020-06-28 18:43:59 +02:00
parent 60a217b1ea
commit 6213fcee9b
6 changed files with 81 additions and 34 deletions

View File

@ -155,7 +155,7 @@ function ReadableState(options, stream, isDuplex) {
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = false;
this.errored = null;
// Indicates whether the stream has finished destroying.
this.closed = false;

View File

@ -179,7 +179,7 @@ function WritableState(options, stream, isDuplex) {
// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = false;
this.errored = null;
// Indicates whether the stream has finished destroying.
this.closed = false;
@ -436,12 +436,17 @@ function onwrite(stream, er) {
state.writelen = 0;
if (er) {
state.errored = true;
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
er.stack;
if (!state.errored) {
state.errored = er;
}
// In case of duplex streams we need to notify the readable side of the
// error.
if (stream._readableState) {
stream._readableState.errored = true;
if (stream._readableState && !stream._readableState.errored) {
stream._readableState.errored = er;
}
if (sync) {

View File

@ -25,11 +25,14 @@ function destroy(err, cb) {
}
if (err) {
if (w) {
w.errored = true;
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
}
@ -61,11 +64,14 @@ function _destroy(self, err, cb) {
const w = self._writableState;
if (err) {
if (w) {
w.errored = true;
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
}
@ -136,7 +142,7 @@ function undestroy() {
r.closed = false;
r.closeEmitted = false;
r.destroyed = false;
r.errored = false;
r.errored = null;
r.errorEmitted = false;
r.reading = false;
r.ended = false;
@ -148,7 +154,7 @@ function undestroy() {
w.destroyed = false;
w.closed = false;
w.closeEmitted = false;
w.errored = false;
w.errored = null;
w.errorEmitted = false;
w.ended = false;
w.ending = false;
@ -175,11 +181,14 @@ function errorOrDestroy(stream, err, sync) {
if ((r && r.autoDestroy) || (w && w.autoDestroy))
stream.destroy(err);
else if (err) {
if (w) {
w.errored = true;
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack;
if (w && !w.errored) {
w.errored = err;
}
if (r) {
r.errored = true;
if (r && !r.errored) {
r.errored = err;
}
if (sync) {
process.nextTick(emitErrorNT, stream, err);

View File

@ -134,13 +134,13 @@ const assert = require('assert');
read.on('error', common.mustCall((err) => {
assert.strictEqual(ticked, true);
assert.strictEqual(read._readableState.errorEmitted, true);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(err, expected);
}));
read.destroy();
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(read.destroyed, true);
ticked = true;
}
@ -190,15 +190,15 @@ const assert = require('assert');
assert.strictEqual(err, expected);
}));
assert.strictEqual(read._readableState.errored, false);
assert.strictEqual(read._readableState.errored, null);
assert.strictEqual(read._readableState.errorEmitted, false);
read.destroy(expected, common.mustCall(function(err) {
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
assert.strictEqual(err, expected);
}));
assert.strictEqual(read._readableState.errorEmitted, false);
assert.strictEqual(read._readableState.errored, true);
assert.strictEqual(read._readableState.errored, expected);
ticked = true;
}
@ -223,14 +223,14 @@ const assert = require('assert');
readable.destroy();
assert.strictEqual(readable.destroyed, true);
assert.strictEqual(readable._readableState.errored, false);
assert.strictEqual(readable._readableState.errored, null);
assert.strictEqual(readable._readableState.errorEmitted, false);
// Test case where `readable.destroy()` is called again with an error before
// the `_destroy()` callback is called.
readable.destroy(new Error('kaboom 2'));
assert.strictEqual(readable._readableState.errorEmitted, false);
assert.strictEqual(readable._readableState.errored, false);
assert.strictEqual(readable._readableState.errored, null);
ticked = true;
}
@ -253,3 +253,18 @@ const assert = require('assert');
assert.strictEqual(read.destroyed, true);
read.read();
}
{
const read = new Readable({
autoDestroy: false,
read() {
this.push(null);
this.push('asd');
}
});
read.on('error', common.mustCall(() => {
assert(read._readableState.errored);
}));
read.resume();
}

View File

@ -167,9 +167,10 @@ const assert = require('assert');
assert.strictEqual(write._writableState.errorEmitted, true);
}));
write.destroy(new Error('kaboom 1'));
const expected = new Error('kaboom 1');
write.destroy(expected);
write.destroy(new Error('kaboom 2'));
assert.strictEqual(write._writableState.errored, true);
assert.strictEqual(write._writableState.errored, expected);
assert.strictEqual(write._writableState.errorEmitted, false);
assert.strictEqual(write.destroyed, true);
ticked = true;
@ -200,14 +201,14 @@ const assert = require('assert');
writable.destroy();
assert.strictEqual(writable.destroyed, true);
assert.strictEqual(writable._writableState.errored, false);
assert.strictEqual(writable._writableState.errored, null);
assert.strictEqual(writable._writableState.errorEmitted, false);
// Test case where `writable.destroy()` is called again with an error before
// the `_destroy()` callback is called.
writable.destroy(new Error('kaboom 2'));
assert.strictEqual(writable._writableState.errorEmitted, false);
assert.strictEqual(writable._writableState.errored, false);
assert.strictEqual(writable._writableState.errored, null);
ticked = true;
}
@ -401,3 +402,18 @@ const assert = require('assert');
}));
write.destroy();
}
{
const write = new Writable({
autoDestroy: false,
write(chunk, enc, cb) {
cb();
cb();
}
});
write.on('error', common.mustCall(() => {
assert(write._writableState.errored);
}));
write.write('asd');
}

View File

@ -10,23 +10,25 @@ oldStream.pause = () => {};
oldStream.resume = () => {};
{
const err = new Error();
const r = new Readable({ autoDestroy: true })
.wrap(oldStream)
.on('error', common.mustCall(() => {
assert.strictEqual(r._readableState.errorEmitted, true);
assert.strictEqual(r._readableState.errored, true);
assert.strictEqual(r._readableState.errored, err);
assert.strictEqual(r.destroyed, true);
}));
oldStream.emit('error', new Error());
oldStream.emit('error', err);
}
{
const err = new Error();
const r = new Readable({ autoDestroy: false })
.wrap(oldStream)
.on('error', common.mustCall(() => {
assert.strictEqual(r._readableState.errorEmitted, true);
assert.strictEqual(r._readableState.errored, true);
assert.strictEqual(r._readableState.errored, err);
assert.strictEqual(r.destroyed, false);
}));
oldStream.emit('error', new Error());
oldStream.emit('error', err);
}