node/lib/diagnostics_channel.js
simon-id 80b56bbab0
diagnostics_channel: fix unsubscribe during publish
PR-URL: https://github.com/nodejs/node/pull/55116
Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
Reviewed-By: Claudio Wunder <cwunder@gnome.org>
2024-10-14 10:55:39 +00:00

439 lines
10 KiB
JavaScript

'use strict';
const {
ArrayPrototypeAt,
ArrayPrototypeIndexOf,
ArrayPrototypePush,
ArrayPrototypePushApply,
ArrayPrototypeSlice,
ArrayPrototypeSplice,
ObjectDefineProperty,
ObjectGetPrototypeOf,
ObjectSetPrototypeOf,
Promise,
PromisePrototypeThen,
PromiseReject,
PromiseResolve,
ReflectApply,
SafeFinalizationRegistry,
SafeMap,
SymbolHasInstance,
} = primordials;
const {
codes: {
ERR_INVALID_ARG_TYPE,
},
} = require('internal/errors');
const {
validateFunction,
} = require('internal/validators');
const { triggerUncaughtException } = internalBinding('errors');
const { WeakReference } = require('internal/util');
// Can't delete when weakref count reaches 0 as it could increment again.
// Only GC can be used as a valid time to clean up the channels map.
class WeakRefMap extends SafeMap {
#finalizers = new SafeFinalizationRegistry((key) => {
this.delete(key);
});
set(key, value) {
this.#finalizers.register(value, key);
return super.set(key, new WeakReference(value));
}
get(key) {
return super.get(key)?.get();
}
incRef(key) {
return super.get(key)?.incRef();
}
decRef(key) {
return super.get(key)?.decRef();
}
}
function markActive(channel) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
channel._subscribers = [];
channel._stores = new SafeMap();
}
function maybeMarkInactive(channel) {
// When there are no more active subscribers or bound, restore to fast prototype.
if (!channel._subscribers.length && !channel._stores.size) {
// eslint-disable-next-line no-use-before-define
ObjectSetPrototypeOf(channel, Channel.prototype);
channel._subscribers = undefined;
channel._stores = undefined;
}
}
function defaultTransform(data) {
return data;
}
function wrapStoreRun(store, data, next, transform = defaultTransform) {
return () => {
let context;
try {
context = transform(data);
} catch (err) {
process.nextTick(() => {
triggerUncaughtException(err, false);
});
return next();
}
return store.run(context, next);
};
}
// TODO(qard): should there be a C++ channel interface?
class ActiveChannel {
subscribe(subscription) {
validateFunction(subscription, 'subscription');
this._subscribers = ArrayPrototypeSlice(this._subscribers);
ArrayPrototypePush(this._subscribers, subscription);
channels.incRef(this.name);
}
unsubscribe(subscription) {
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
if (index === -1) return false;
const before = ArrayPrototypeSlice(this._subscribers, 0, index);
const after = ArrayPrototypeSlice(this._subscribers, index + 1);
this._subscribers = before;
ArrayPrototypePushApply(this._subscribers, after);
channels.decRef(this.name);
maybeMarkInactive(this);
return true;
}
bindStore(store, transform) {
const replacing = this._stores.has(store);
if (!replacing) channels.incRef(this.name);
this._stores.set(store, transform);
}
unbindStore(store) {
if (!this._stores.has(store)) {
return false;
}
this._stores.delete(store);
channels.decRef(this.name);
maybeMarkInactive(this);
return true;
}
get hasSubscribers() {
return true;
}
publish(data) {
const subscribers = this._subscribers;
for (let i = 0; i < (subscribers?.length || 0); i++) {
try {
const onMessage = subscribers[i];
onMessage(data, this.name);
} catch (err) {
process.nextTick(() => {
triggerUncaughtException(err, false);
});
}
}
}
runStores(data, fn, thisArg, ...args) {
let run = () => {
this.publish(data);
return ReflectApply(fn, thisArg, args);
};
for (const entry of this._stores.entries()) {
const store = entry[0];
const transform = entry[1];
run = wrapStoreRun(store, data, run, transform);
}
return run();
}
}
class Channel {
constructor(name) {
this._subscribers = undefined;
this._stores = undefined;
this.name = name;
channels.set(name, this);
}
static [SymbolHasInstance](instance) {
const prototype = ObjectGetPrototypeOf(instance);
return prototype === Channel.prototype ||
prototype === ActiveChannel.prototype;
}
subscribe(subscription) {
markActive(this);
this.subscribe(subscription);
}
unsubscribe() {
return false;
}
bindStore(store, transform) {
markActive(this);
this.bindStore(store, transform);
}
unbindStore() {
return false;
}
get hasSubscribers() {
return false;
}
publish() {}
runStores(data, fn, thisArg, ...args) {
return ReflectApply(fn, thisArg, args);
}
}
const channels = new WeakRefMap();
function channel(name) {
const channel = channels.get(name);
if (channel) return channel;
if (typeof name !== 'string' && typeof name !== 'symbol') {
throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name);
}
return new Channel(name);
}
function subscribe(name, subscription) {
return channel(name).subscribe(subscription);
}
function unsubscribe(name, subscription) {
return channel(name).unsubscribe(subscription);
}
function hasSubscribers(name) {
const channel = channels.get(name);
if (!channel) return false;
return channel.hasSubscribers;
}
const traceEvents = [
'start',
'end',
'asyncStart',
'asyncEnd',
'error',
];
function assertChannel(value, name) {
if (!(value instanceof Channel)) {
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
}
}
function tracingChannelFrom(nameOrChannels, name) {
if (typeof nameOrChannels === 'string') {
return channel(`tracing:${nameOrChannels}:${name}`);
}
if (typeof nameOrChannels === 'object' && nameOrChannels !== null) {
const channel = nameOrChannels[name];
assertChannel(channel, `nameOrChannels.${name}`);
return channel;
}
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
['string', 'object', 'TracingChannel'],
nameOrChannels);
}
class TracingChannel {
constructor(nameOrChannels) {
for (let i = 0; i < traceEvents.length; ++i) {
const eventName = traceEvents[i];
ObjectDefineProperty(this, eventName, {
__proto__: null,
value: tracingChannelFrom(nameOrChannels, eventName),
});
}
}
get hasSubscribers() {
return this.start?.hasSubscribers ||
this.end?.hasSubscribers ||
this.asyncStart?.hasSubscribers ||
this.asyncEnd?.hasSubscribers ||
this.error?.hasSubscribers;
}
subscribe(handlers) {
for (let i = 0; i < traceEvents.length; ++i) {
const name = traceEvents[i];
if (!handlers[name]) continue;
this[name]?.subscribe(handlers[name]);
}
}
unsubscribe(handlers) {
let done = true;
for (let i = 0; i < traceEvents.length; ++i) {
const name = traceEvents[i];
if (!handlers[name]) continue;
if (!this[name]?.unsubscribe(handlers[name])) {
done = false;
}
}
return done;
}
traceSync(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}
const { start, end, error } = this;
return start.runStores(context, () => {
try {
const result = ReflectApply(fn, thisArg, args);
context.result = result;
return result;
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
tracePromise(fn, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}
const { start, end, asyncStart, asyncEnd, error } = this;
function reject(err) {
context.error = err;
error.publish(context);
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return PromiseReject(err);
}
function resolve(result) {
context.result = result;
asyncStart.publish(context);
// TODO: Is there a way to have asyncEnd _after_ the continuation?
asyncEnd.publish(context);
return result;
}
return start.runStores(context, () => {
try {
let promise = ReflectApply(fn, thisArg, args);
// Convert thenables to native promises
if (!(promise instanceof Promise)) {
promise = PromiseResolve(promise);
}
return PromisePrototypeThen(promise, resolve, reject);
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
if (!this.hasSubscribers) {
return ReflectApply(fn, thisArg, args);
}
const { start, end, asyncStart, asyncEnd, error } = this;
function wrappedCallback(err, res) {
if (err) {
context.error = err;
error.publish(context);
} else {
context.result = res;
}
// Using runStores here enables manual context failure recovery
asyncStart.runStores(context, () => {
try {
return ReflectApply(callback, this, arguments);
} finally {
asyncEnd.publish(context);
}
});
}
const callback = ArrayPrototypeAt(args, position);
validateFunction(callback, 'callback');
ArrayPrototypeSplice(args, position, 1, wrappedCallback);
return start.runStores(context, () => {
try {
return ReflectApply(fn, thisArg, args);
} catch (err) {
context.error = err;
error.publish(context);
throw err;
} finally {
end.publish(context);
}
});
}
}
function tracingChannel(nameOrChannels) {
return new TracingChannel(nameOrChannels);
}
module.exports = {
channel,
hasSubscribers,
subscribe,
tracingChannel,
unsubscribe,
Channel,
};