stream: writable state bitmap

PR-URL: https://github.com/nodejs/node/pull/49899
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Raz Luvaton <rluvaton@gmail.com>
Reviewed-By: Yagiz Nizipli <yagiz@nizipli.com>
This commit is contained in:
Robert Nagy 2023-09-29 20:13:44 +02:00 committed by GitHub
parent 7b624c30b2
commit 53b5545672
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 194 additions and 81 deletions

View File

@ -4,7 +4,7 @@ const common = require('../common');
const Writable = require('stream').Writable;
const bench = common.createBenchmark(main, {
n: [2e6],
n: [1e5],
sync: ['yes', 'no'],
writev: ['yes', 'no'],
callback: ['yes', 'no'],
@ -13,7 +13,7 @@ const bench = common.createBenchmark(main, {
function main({ n, sync, writev, callback, len }) {
const b = Buffer.allocUnsafe(len);
const s = new Writable();
const s = new Writable({ highWaterMark: 16 * 1024 });
sync = sync === 'yes';
const writecb = (cb) => {

View File

@ -72,7 +72,11 @@ ObjectSetPrototypeOf(Writable, Stream);
function nop() {}
const kOnFinished = Symbol('kOnFinished');
const kOnFinishedValue = Symbol('kOnFinishedValue');
const kErroredValue = Symbol('kErroredValue');
const kDefaultEncodingValue = Symbol('kDefaultEncodingValue');
const kWriteCbValue = Symbol('kWriteCbValue');
const kAfterWriteTickInfoValue = Symbol('kAfterWriteTickInfoValue');
const kObjectMode = 1 << 0;
const kEnded = 1 << 1;
@ -94,6 +98,16 @@ const kBufferProcessing = 1 << 16;
const kPrefinished = 1 << 17;
const kAllBuffers = 1 << 18;
const kAllNoop = 1 << 19;
const kOnFinished = 1 << 20;
const kErrored = 1 << 21;
const kHasWritable = 1 << 22;
const kWritable = 1 << 23;
const kCorked = 1 << 24;
const kDefaultUTF8Encoding = 1 << 25;
const kWriteCb = 1 << 26;
const kExpectWriteCb = 1 << 27;
const kAfterWriteTickInfo = 1 << 28;
const kAfterWritePending = 1 << 29;
// TODO(benjamingr) it is likely slower to do it this way than with free functions
function makeBitMapDescriptor(bit) {
@ -176,6 +190,85 @@ ObjectDefineProperties(WritableState.prototype, {
allBuffers: makeBitMapDescriptor(kAllBuffers),
allNoop: makeBitMapDescriptor(kAllNoop),
// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
// This is/should be a cold path.
errored: {
__proto__: null,
enumerable: false,
get() { return (this.state & kErrored) !== 0 ? this[kErroredValue] : null; },
set(value) {
if (value) {
this[kErroredValue] = value;
this.state |= kErrored;
} else {
this.state &= ~kErrored;
}
},
},
writable: {
__proto__: null,
enumerable: false,
get() { return (this.state & kHasWritable) !== 0 ? (this.state & kWritable) !== 0 : undefined; },
set(value) {
if (value == null) {
this.state &= ~(kHasWritable | kWritable);
} else if (value) {
this.state |= (kHasWritable | kWritable);
} else {
this.state |= kHasWritable;
this.state &= ~kWritable;
}
},
},
defaultEncoding: {
__proto__: null,
enumerable: false,
get() { return (this.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : this[kDefaultEncodingValue]; },
set(value) {
if (value === 'utf8' || value === 'utf-8') {
this.state |= kDefaultUTF8Encoding;
} else {
this.state &= ~kDefaultUTF8Encoding;
this[kDefaultEncodingValue] = value;
}
},
},
// The callback that the user supplies to write(chunk, encoding, cb).
writecb: {
__proto__: null,
enumerable: false,
get() { return (this.state & kWriteCb) !== 0 ? this[kWriteCbValue] : nop; },
set(value) {
if (value) {
this[kWriteCbValue] = value;
this.state |= kWriteCb;
} else {
this.state &= ~kWriteCb;
}
},
},
// Storage for data passed to the afterWrite() callback in case of
// synchronous _write() completion.
afterWriteTickInfo: {
__proto__: null,
enumerable: false,
get() { return (this.state & kAfterWriteTickInfo) !== 0 ? this[kAfterWriteTickInfoValue] : null; },
set(value) {
if (value) {
this[kAfterWriteTickInfoValue] = value;
this.state |= kAfterWriteTickInfo;
} else {
this.state &= ~kAfterWriteTickInfo;
}
},
},
});
function WritableState(options, stream, isDuplex) {
@ -213,10 +306,11 @@ function WritableState(options, stream, isDuplex) {
// encoding is 'binary' so we have to make this configurable.
// Everything else in the universe uses 'utf8', though.
const defaultEncoding = options?.defaultEncoding;
if (defaultEncoding == null) {
this.defaultEncoding = 'utf8';
if (defaultEncoding == null || defaultEncoding === 'utf8' || defaultEncoding === 'utf-8') {
this.state |= kDefaultUTF8Encoding;
} else if (Buffer.isEncoding(defaultEncoding)) {
this.defaultEncoding = defaultEncoding;
this.state &= ~kDefaultUTF8Encoding;
this[kDefaultEncodingValue] = defaultEncoding;
} else {
throw new ERR_UNKNOWN_ENCODING(defaultEncoding);
}
@ -232,28 +326,14 @@ function WritableState(options, stream, isDuplex) {
// The callback that's passed to _write(chunk, cb).
this.onwrite = onwrite.bind(undefined, stream);
// The callback that the user supplies to write(chunk, encoding, cb).
this.writecb = null;
// The amount that is being written when _write is called.
this.writelen = 0;
// Storage for data passed to the afterWrite() callback in case of
// synchronous _write() completion.
this.afterWriteTickInfo = null;
resetBuffer(this);
// Number of pending user-supplied write callbacks
// this must be 0 before 'finish' can be emitted.
this.pendingcb = 0;
// Indicates whether the stream has errored. When true all write() calls
// should return false. This is needed since when autoDestroy
// is disabled we need a way to tell whether the stream has failed.
this.errored = null;
this[kOnFinished] = [];
}
function resetBuffer(state) {
@ -344,10 +424,10 @@ function _write(stream, chunk, encoding, cb) {
if (typeof encoding === 'function') {
cb = encoding;
encoding = state.defaultEncoding;
encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding;
} else {
if (!encoding)
encoding = state.defaultEncoding;
encoding = (state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state.defaultEncoding;
else if (encoding !== 'buffer' && !Buffer.isEncoding(encoding))
throw new ERR_UNKNOWN_ENCODING(encoding);
if (typeof cb !== 'function')
@ -394,7 +474,10 @@ Writable.prototype.write = function(chunk, encoding, cb) {
};
Writable.prototype.cork = function() {
this._writableState.corked++;
const state = this._writableState;
state.state |= kCorked;
state.corked++;
};
Writable.prototype.uncork = function() {
@ -403,6 +486,10 @@ Writable.prototype.uncork = function() {
if (state.corked) {
state.corked--;
if (!state.corked) {
state.state &= ~kCorked;
}
if ((state.state & kWriting) === 0)
clearBuffer(this, state);
}
@ -428,11 +515,13 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
// stream._write resets state.length
const ret = state.length < state.highWaterMark;
// We must ensure that previous needDrain will not be reset to false.
if (!ret)
state.state |= kNeedDrain;
if ((state.state & kWriting) !== 0 || state.corked || state.errored || (state.state & kConstructed) === 0) {
// We must ensure that previous needDrain will not be reset to false.
if (!ret) {
state.state |= kNeedDrain;
}
if ((state.state & (kWriting | kErrored | kCorked | kConstructed)) !== kConstructed) {
state.buffered.push({ chunk, encoding, callback });
if ((state.state & kAllBuffers) !== 0 && encoding !== 'buffer') {
state.state &= ~kAllBuffers;
@ -442,21 +531,25 @@ function writeOrBuffer(stream, state, chunk, encoding, callback) {
}
} else {
state.writelen = len;
state.writecb = callback;
state.state |= kWriting | kSync;
if (callback !== nop) {
state.writecb = callback;
}
state.state |= kWriting | kSync | kExpectWriteCb;
stream._write(chunk, encoding, state.onwrite);
state.state &= ~kSync;
}
// Return false if errored or destroyed in order to break
// any synchronous while(stream.write(data)) loops.
return ret && !state.errored && (state.state & kDestroyed) === 0;
return ret && (state.state & (kDestroyed | kErrored)) === 0;
}
function doWrite(stream, state, writev, len, chunk, encoding, cb) {
state.writelen = len;
state.writecb = cb;
state.state |= kWriting | kSync;
if (cb !== nop) {
state.writecb = cb;
}
state.state |= kWriting | kSync | kExpectWriteCb;
if ((state.state & kDestroyed) !== 0)
state.onwrite(new ERR_STREAM_DESTROYED('write'));
else if (writev)
@ -481,16 +574,16 @@ function onwriteError(stream, state, er, cb) {
function onwrite(stream, er) {
const state = stream._writableState;
const sync = (state.state & kSync) !== 0;
const cb = state.writecb;
if (typeof cb !== 'function') {
if ((state.state & kExpectWriteCb) === 0) {
errorOrDestroy(stream, new ERR_MULTIPLE_CALLBACK());
return;
}
state.state &= ~kWriting;
state.writecb = null;
const sync = (state.state & kSync) !== 0;
const cb = (state.state & kWriteCb) !== 0 ? state[kWriteCbValue] : nop;
state.state &= ~(kWriting | kExpectWriteCb | kWriteCb);
state.length -= state.writelen;
state.writelen = 0;
@ -523,12 +616,20 @@ function onwrite(stream, er) {
// the same. In that case, we do not schedule a new nextTick(), but
// rather just increase a counter, to improve performance and avoid
// memory allocations.
if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
if (cb === nop) {
if ((state.state & kAfterWritePending) === 0) {
process.nextTick(afterWrite, stream, state, 1, cb);
state.state |= kAfterWritePending;
} else {
state.pendingcb -= 1;
}
} else if (state.afterWriteTickInfo !== null &&
state.afterWriteTickInfo.cb === cb) {
state.afterWriteTickInfo.count++;
} else {
state.afterWriteTickInfo = { count: 1, cb, stream, state };
process.nextTick(afterWriteTick, state.afterWriteTickInfo);
state.state |= kAfterWritePending;
}
} else {
afterWrite(stream, state, 1, cb);
@ -542,7 +643,9 @@ function afterWriteTick({ stream, state, count, cb }) {
}
function afterWrite(stream, state, count, cb) {
const needDrain = (state.state & (kEnding | kNeedDrain)) === kNeedDrain && !stream.destroyed && state.length === 0;
state.state &= ~kAfterWritePending;
const needDrain = (state.state & (kEnding | kNeedDrain | kDestroyed)) === kNeedDrain && state.length === 0;
if (needDrain) {
state.state &= ~kNeedDrain;
stream.emit('drain');
@ -573,19 +676,16 @@ function errorBuffer(state) {
callback(state.errored ?? new ERR_STREAM_DESTROYED('write'));
}
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](state.errored ?? new ERR_STREAM_DESTROYED('end'));
}
callFinishedCallbacks(state, state.errored ?? new ERR_STREAM_DESTROYED('end'));
resetBuffer(state);
}
// If there's something in the buffer waiting, then process it.
function clearBuffer(stream, state) {
if (state.corked ||
(state.state & (kDestroyed | kBufferProcessing)) !== 0 ||
(state.state & kConstructed) === 0) {
if ((state.state & (kDestroyed | kBufferProcessing | kCorked)) !== 0 ||
(state.state & kConstructed) === 0) {
return;
}
@ -661,7 +761,7 @@ Writable.prototype.end = function(chunk, encoding, cb) {
let err;
if (chunk !== null && chunk !== undefined) {
if (chunk != null) {
const ret = _write(this, chunk, encoding);
if (ret instanceof Error) {
err = ret;
@ -669,14 +769,14 @@ Writable.prototype.end = function(chunk, encoding, cb) {
}
// .end() fully uncorks.
if (state.corked) {
if ((state.state & kCorked) !== 0) {
state.corked = 1;
this.uncork();
}
if (err) {
// Do nothing...
} else if (!state.errored && (state.state & kEnding) === 0) {
} else if ((state.state & (kEnding | kErrored)) === 0) {
// This is forgiving in terms of unnecessary calls to end() and can hide
// logic errors. However, usually such errors are harmless and causing a
// hard error can be disproportionately destructive. It is not always
@ -698,7 +798,9 @@ Writable.prototype.end = function(chunk, encoding, cb) {
} else if ((state.state & kFinished) !== 0) {
process.nextTick(cb, null);
} else {
state[kOnFinished].push(cb);
state.state |= kOnFinished;
state[kOnFinishedValue] ??= [];
state[kOnFinishedValue].push(cb);
}
}
@ -715,10 +817,10 @@ function needFinish(state) {
kFinished |
kWriting |
kErrorEmitted |
kCloseEmitted
kCloseEmitted |
kErrored
)) === (kEnding | kConstructed) &&
state.length === 0 &&
!state.errored &&
state.buffered.length === 0);
}
@ -734,10 +836,7 @@ function callFinal(stream, state) {
state.pendingcb--;
if (err) {
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](err);
}
callFinishedCallbacks(state, err);
errorOrDestroy(stream, err, (state.state & kSync) !== 0);
} else if (needFinish(state)) {
state.state |= kPrefinished;
@ -799,10 +898,7 @@ function finish(stream, state) {
state.pendingcb--;
state.state |= kFinished;
const onfinishCallbacks = state[kOnFinished].splice(0);
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](null);
}
callFinishedCallbacks(state, null);
stream.emit('finish');
@ -822,8 +918,20 @@ function finish(stream, state) {
}
}
ObjectDefineProperties(Writable.prototype, {
function callFinishedCallbacks(state, err) {
if ((state.state & kOnFinished) === 0) {
return;
}
const onfinishCallbacks = state[kOnFinishedValue];
state[kOnFinishedValue] = null;
state.state &= ~kOnFinished;
for (let i = 0; i < onfinishCallbacks.length; i++) {
onfinishCallbacks[i](err);
}
}
ObjectDefineProperties(Writable.prototype, {
closed: {
__proto__: null,
get() {
@ -867,60 +975,64 @@ ObjectDefineProperties(Writable.prototype, {
writableFinished: {
__proto__: null,
get() {
return this._writableState ? (this._writableState.state & kFinished) !== 0 : false;
const state = this._writableState;
return state ? (state.state & kFinished) !== 0 : false;
},
},
writableObjectMode: {
__proto__: null,
get() {
return this._writableState ? (this._writableState.state & kObjectMode) !== 0 : false;
const state = this._writableState;
return state ? (state.state & kObjectMode) !== 0 : false;
},
},
writableBuffer: {
__proto__: null,
get() {
return this._writableState && this._writableState.getBuffer();
const state = this._writableState;
return state && state.getBuffer();
},
},
writableEnded: {
__proto__: null,
get() {
return this._writableState ? (this._writableState.state & kEnding) !== 0 : false;
const state = this._writableState;
return state ? (state.state & kEnding) !== 0 : false;
},
},
writableNeedDrain: {
__proto__: null,
get() {
const wState = this._writableState;
if (!wState) return false;
// !destroyed && !ending && needDrain
return (wState.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain;
const state = this._writableState;
return state ? (state.state & (kDestroyed | kEnding | kNeedDrain)) === kNeedDrain : false;
},
},
writableHighWaterMark: {
__proto__: null,
get() {
return this._writableState && this._writableState.highWaterMark;
const state = this._writableState;
return state && state.highWaterMark;
},
},
writableCorked: {
__proto__: null,
get() {
return this._writableState ? this._writableState.corked : 0;
const state = this._writableState;
return state ? state.corked : 0;
},
},
writableLength: {
__proto__: null,
get() {
return this._writableState && this._writableState.length;
const state = this._writableState;
return state && state.length;
},
},
@ -928,18 +1040,19 @@ ObjectDefineProperties(Writable.prototype, {
__proto__: null,
enumerable: false,
get() {
return this._writableState ? this._writableState.errored : null;
const state = this._writableState;
return state ? state.errored : null;
},
},
writableAborted: {
__proto__: null,
enumerable: false,
get: function() {
return !!(
this._writableState.writable !== false &&
((this._writableState.state & kDestroyed) !== 0 || this._writableState.errored) &&
(this._writableState.state & kFinished) === 0
const state = this._writableState;
return (
(state.state & (kHasWritable | kWritable)) !== kHasWritable &&
(state.state & (kDestroyed | kErrored)) !== 0 &&
(state.state & kFinished) === 0
);
},
},
@ -952,7 +1065,7 @@ Writable.prototype.destroy = function(err, cb) {
// Invoke pending callbacks.
if ((state.state & kDestroyed) === 0 &&
(state.bufferedIndex < state.buffered.length ||
state[kOnFinished].length)) {
(state.state & kOnFinished) !== 0)) {
process.nextTick(errorBuffer, state);
}