stream: use callback to properly propagate error

The stream will be destroyed upstream through the proper error
flow.

PR-URL: https://github.com/nodejs/node/pull/29179
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
This commit is contained in:
Robert Nagy 2019-08-17 13:11:37 +02:00
parent b9da063ae9
commit 8f86986985
7 changed files with 95 additions and 15 deletions

View File

@ -144,7 +144,10 @@ function ReadableState(options, stream, isDuplex) {
// Has it been destroyed
this.destroyed = false;
// Indicates whether the stream has errored.
// Indicates whether the stream has errored. When true no further
// _read calls, 'data' or 'readable' events should occur. This is needed
// since when autoDestroy is disabled we need a way to tell whether the
// stream has failed.
this.errored = false;
// Indicates whether the stream has finished destroying.
@ -258,7 +261,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
addChunk(stream, state, chunk, true);
} else if (state.ended) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if (state.destroyed) {
} else if (state.destroyed || state.errored) {
return false;
} else {
state.reading = false;
@ -453,9 +456,9 @@ Readable.prototype.read = function(n) {
}
// However, if we've ended, then there's no point, if we're already
// reading, then it's unnecessary, and if we're destroyed, then it's
// not allowed.
if (state.ended || state.reading || state.destroyed) {
// reading, then it's unnecessary, and if we're destroyed or errored,
// then it's not allowed.
if (state.ended || state.reading || state.destroyed || state.errored) {
doRead = false;
debug('reading or ended', doRead);
} else if (doRead) {
@ -553,7 +556,7 @@ function emitReadable(stream) {
function emitReadable_(stream) {
const state = stream._readableState;
debug('emitReadable_', state.destroyed, state.length, state.ended);
if (!state.destroyed && (state.length || state.ended)) {
if (!state.destroyed && !state.errored && (state.length || state.ended)) {
stream.emit('readable');
state.emittedReadable = false;
}

View File

@ -416,6 +416,13 @@ function onwrite(stream, er) {
if (er) {
state.errored = true;
// In case of duplex streams we need to notify the readable side of the
// error.
if (stream._readableState) {
stream._readableState.errored = true;
}
if (sync) {
process.nextTick(onwriteError, stream, state, er, cb);
} else {

View File

@ -1995,10 +1995,19 @@ class Http2Stream extends Duplex {
let req;
// writeGeneric does not destroy on error and we cannot enable autoDestroy,
// so make sure to destroy on error.
const callback = (err) => {
if (err) {
this.destroy(err);
}
cb(err);
};
if (writev)
req = writevGeneric(this, data, cb);
req = writevGeneric(this, data, callback);
else
req = writeGeneric(this, data, encoding, cb);
req = writeGeneric(this, data, encoding, callback);
trackWriteState(this, req.bytes);
}

View File

@ -88,9 +88,14 @@ function onWriteComplete(status) {
return;
}
// TODO (ronag): This should be moved before if(stream.destroyed)
// in order to avoid swallowing error.
if (status < 0) {
const ex = errnoException(status, 'write', this.error);
stream.destroy(ex, this.callback);
if (typeof this.callback === 'function')
this.callback(ex);
else
stream.destroy(ex);
return;
}
@ -134,7 +139,7 @@ function writevGeneric(self, data, cb) {
// Retain chunks
if (err === 0) req._chunks = chunks;
afterWriteDispatched(self, req, err, cb);
afterWriteDispatched(req, err, cb);
return req;
}
@ -142,16 +147,16 @@ function writeGeneric(self, data, encoding, cb) {
const req = createWriteWrap(self[kHandle]);
const err = handleWriteReq(req, data, encoding);
afterWriteDispatched(self, req, err, cb);
afterWriteDispatched(req, err, cb);
return req;
}
function afterWriteDispatched(self, req, err, cb) {
function afterWriteDispatched(req, err, cb) {
req.bytes = streamBaseState[kBytesWritten];
req.async = !!streamBaseState[kLastWriteWasAsync];
if (err !== 0)
return self.destroy(errnoException(err, 'write', req.error), cb);
return cb(errnoException(err, 'write', req.error));
if (!req.async) {
cb();
@ -264,7 +269,6 @@ function setStreamTimeout(msecs, callback) {
}
module.exports = {
createWriteWrap,
writevGeneric,
writeGeneric,
onStreamRead,

View File

@ -0,0 +1,56 @@
'use strict';
const common = require('../common');
const assert = require('assert');
const net = require('net');
const tcp = net.Server(common.mustCall((s) => {
tcp.close();
let buf = '';
s.setEncoding('utf8');
s.on('data', function(d) {
buf += d;
});
s.on('end', common.mustCall(function() {
console.error('SERVER: end', buf);
assert.strictEqual(buf, "L'État, c'est moi");
s.end();
}));
}));
tcp.listen(0, common.mustCall(function() {
const socket = net.Stream({ highWaterMark: 0 });
let connected = false;
assert.strictEqual(socket.pending, true);
socket.connect(this.address().port, common.mustCall(() => connected = true));
assert.strictEqual(socket.pending, true);
assert.strictEqual(socket.connecting, true);
assert.strictEqual(socket.readyState, 'opening');
// Write a string that contains a multi-byte character sequence to test that
// `bytesWritten` is incremented with the # of bytes, not # of characters.
const a = "L'État, c'est ";
const b = 'moi';
// We're still connecting at this point so the datagram is first pushed onto
// the connect queue. Make sure that it's not added to `bytesWritten` again
// when the actual write happens.
const r = socket.write(a, common.mustCall((er) => {
console.error('write cb');
assert.ok(connected);
assert.strictEqual(socket.bytesWritten, Buffer.from(a + b).length);
assert.strictEqual(socket.pending, false);
}));
socket.on('close', common.mustCall(() => {
assert.strictEqual(socket.pending, true);
}));
assert.strictEqual(socket.bytesWritten, Buffer.from(a).length);
assert.strictEqual(r, false);
socket.end(b);
assert.strictEqual(socket.readyState, 'opening');
}));

View File

@ -25,6 +25,7 @@ assert.throws(() => {
[],
{}
].forEach((value) => {
const socket = net.Stream({ highWaterMark: 0 });
// We need to check the callback since 'error' will only
// be emitted once per instance.
assert.throws(() => {

View File

@ -10,7 +10,7 @@ process.once('uncaughtException', common.mustCall((err) => {
}));
const socket = new JSStreamWrap(new Duplex({
read: common.mustNotCall(),
read: common.mustCall(),
write: common.mustCall((buffer, data, cb) => {
throw new Error('exception!');
})