mirror of
https://github.com/nodejs/node.git
synced 2024-11-21 10:59:27 +00:00
3f942e2c49
tracingChannel.traceCallback() requires a callback otherwise it throws and invalid argument error. As a result arguments are not optional. Correct the documentation to reflect that arguments are not optional. Besides that correct description regarding signaling of errors. Remove an unneeded null check in wrappedCallback() which can't happen because it's validated that callback is of type function. PR-URL: https://github.com/nodejs/node/pull/51068 Fixes: https://github.com/nodejs/node/issues/50996 Reviewed-By: Stephen Belanger <admin@stephenbelanger.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Mohammed Keyvanzadeh <mohammadkeyvanzade94@gmail.com>
411 lines
9.4 KiB
JavaScript
411 lines
9.4 KiB
JavaScript
'use strict';
|
|
|
|
const {
|
|
ArrayPrototypeAt,
|
|
ArrayPrototypeIndexOf,
|
|
ArrayPrototypePush,
|
|
ArrayPrototypeSplice,
|
|
SafeFinalizationRegistry,
|
|
ObjectGetPrototypeOf,
|
|
ObjectSetPrototypeOf,
|
|
Promise,
|
|
PromisePrototypeThen,
|
|
PromiseResolve,
|
|
PromiseReject,
|
|
ReflectApply,
|
|
SafeMap,
|
|
SymbolHasInstance,
|
|
} = primordials;
|
|
|
|
const {
|
|
codes: {
|
|
ERR_INVALID_ARG_TYPE,
|
|
},
|
|
} = require('internal/errors');
|
|
const {
|
|
validateFunction,
|
|
} = require('internal/validators');
|
|
|
|
const { triggerUncaughtException } = internalBinding('errors');
|
|
|
|
const { WeakReference } = require('internal/util');
|
|
|
|
// Can't delete when weakref count reaches 0 as it could increment again.
|
|
// Only GC can be used as a valid time to clean up the channels map.
|
|
class WeakRefMap extends SafeMap {
|
|
#finalizers = new SafeFinalizationRegistry((key) => {
|
|
this.delete(key);
|
|
});
|
|
|
|
set(key, value) {
|
|
this.#finalizers.register(value, key);
|
|
return super.set(key, new WeakReference(value));
|
|
}
|
|
|
|
get(key) {
|
|
return super.get(key)?.get();
|
|
}
|
|
|
|
incRef(key) {
|
|
return super.get(key)?.incRef();
|
|
}
|
|
|
|
decRef(key) {
|
|
return super.get(key)?.decRef();
|
|
}
|
|
}
|
|
|
|
function markActive(channel) {
|
|
// eslint-disable-next-line no-use-before-define
|
|
ObjectSetPrototypeOf(channel, ActiveChannel.prototype);
|
|
channel._subscribers = [];
|
|
channel._stores = new SafeMap();
|
|
}
|
|
|
|
function maybeMarkInactive(channel) {
|
|
// When there are no more active subscribers or bound, restore to fast prototype.
|
|
if (!channel._subscribers.length && !channel._stores.size) {
|
|
// eslint-disable-next-line no-use-before-define
|
|
ObjectSetPrototypeOf(channel, Channel.prototype);
|
|
channel._subscribers = undefined;
|
|
channel._stores = undefined;
|
|
}
|
|
}
|
|
|
|
function defaultTransform(data) {
|
|
return data;
|
|
}
|
|
|
|
function wrapStoreRun(store, data, next, transform = defaultTransform) {
|
|
return () => {
|
|
let context;
|
|
try {
|
|
context = transform(data);
|
|
} catch (err) {
|
|
process.nextTick(() => {
|
|
triggerUncaughtException(err, false);
|
|
});
|
|
return next();
|
|
}
|
|
|
|
return store.run(context, next);
|
|
};
|
|
}
|
|
|
|
// TODO(qard): should there be a C++ channel interface?
|
|
class ActiveChannel {
|
|
subscribe(subscription) {
|
|
validateFunction(subscription, 'subscription');
|
|
ArrayPrototypePush(this._subscribers, subscription);
|
|
channels.incRef(this.name);
|
|
}
|
|
|
|
unsubscribe(subscription) {
|
|
const index = ArrayPrototypeIndexOf(this._subscribers, subscription);
|
|
if (index === -1) return false;
|
|
|
|
ArrayPrototypeSplice(this._subscribers, index, 1);
|
|
|
|
channels.decRef(this.name);
|
|
maybeMarkInactive(this);
|
|
|
|
return true;
|
|
}
|
|
|
|
bindStore(store, transform) {
|
|
const replacing = this._stores.has(store);
|
|
if (!replacing) channels.incRef(this.name);
|
|
this._stores.set(store, transform);
|
|
}
|
|
|
|
unbindStore(store) {
|
|
if (!this._stores.has(store)) {
|
|
return false;
|
|
}
|
|
|
|
this._stores.delete(store);
|
|
|
|
channels.decRef(this.name);
|
|
maybeMarkInactive(this);
|
|
|
|
return true;
|
|
}
|
|
|
|
get hasSubscribers() {
|
|
return true;
|
|
}
|
|
|
|
publish(data) {
|
|
for (let i = 0; i < (this._subscribers?.length || 0); i++) {
|
|
try {
|
|
const onMessage = this._subscribers[i];
|
|
onMessage(data, this.name);
|
|
} catch (err) {
|
|
process.nextTick(() => {
|
|
triggerUncaughtException(err, false);
|
|
});
|
|
}
|
|
}
|
|
}
|
|
|
|
runStores(data, fn, thisArg, ...args) {
|
|
let run = () => {
|
|
this.publish(data);
|
|
return ReflectApply(fn, thisArg, args);
|
|
};
|
|
|
|
for (const entry of this._stores.entries()) {
|
|
const store = entry[0];
|
|
const transform = entry[1];
|
|
run = wrapStoreRun(store, data, run, transform);
|
|
}
|
|
|
|
return run();
|
|
}
|
|
}
|
|
|
|
class Channel {
|
|
constructor(name) {
|
|
this._subscribers = undefined;
|
|
this._stores = undefined;
|
|
this.name = name;
|
|
|
|
channels.set(name, this);
|
|
}
|
|
|
|
static [SymbolHasInstance](instance) {
|
|
const prototype = ObjectGetPrototypeOf(instance);
|
|
return prototype === Channel.prototype ||
|
|
prototype === ActiveChannel.prototype;
|
|
}
|
|
|
|
subscribe(subscription) {
|
|
markActive(this);
|
|
this.subscribe(subscription);
|
|
}
|
|
|
|
unsubscribe() {
|
|
return false;
|
|
}
|
|
|
|
bindStore(store, transform) {
|
|
markActive(this);
|
|
this.bindStore(store, transform);
|
|
}
|
|
|
|
unbindStore() {
|
|
return false;
|
|
}
|
|
|
|
get hasSubscribers() {
|
|
return false;
|
|
}
|
|
|
|
publish() {}
|
|
|
|
runStores(data, fn, thisArg, ...args) {
|
|
return ReflectApply(fn, thisArg, args);
|
|
}
|
|
}
|
|
|
|
const channels = new WeakRefMap();
|
|
|
|
function channel(name) {
|
|
const channel = channels.get(name);
|
|
if (channel) return channel;
|
|
|
|
if (typeof name !== 'string' && typeof name !== 'symbol') {
|
|
throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name);
|
|
}
|
|
|
|
return new Channel(name);
|
|
}
|
|
|
|
function subscribe(name, subscription) {
|
|
return channel(name).subscribe(subscription);
|
|
}
|
|
|
|
function unsubscribe(name, subscription) {
|
|
return channel(name).unsubscribe(subscription);
|
|
}
|
|
|
|
function hasSubscribers(name) {
|
|
const channel = channels.get(name);
|
|
if (!channel) return false;
|
|
|
|
return channel.hasSubscribers;
|
|
}
|
|
|
|
const traceEvents = [
|
|
'start',
|
|
'end',
|
|
'asyncStart',
|
|
'asyncEnd',
|
|
'error',
|
|
];
|
|
|
|
function assertChannel(value, name) {
|
|
if (!(value instanceof Channel)) {
|
|
throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value);
|
|
}
|
|
}
|
|
|
|
class TracingChannel {
|
|
constructor(nameOrChannels) {
|
|
if (typeof nameOrChannels === 'string') {
|
|
this.start = channel(`tracing:${nameOrChannels}:start`);
|
|
this.end = channel(`tracing:${nameOrChannels}:end`);
|
|
this.asyncStart = channel(`tracing:${nameOrChannels}:asyncStart`);
|
|
this.asyncEnd = channel(`tracing:${nameOrChannels}:asyncEnd`);
|
|
this.error = channel(`tracing:${nameOrChannels}:error`);
|
|
} else if (typeof nameOrChannels === 'object') {
|
|
const { start, end, asyncStart, asyncEnd, error } = nameOrChannels;
|
|
|
|
assertChannel(start, 'nameOrChannels.start');
|
|
assertChannel(end, 'nameOrChannels.end');
|
|
assertChannel(asyncStart, 'nameOrChannels.asyncStart');
|
|
assertChannel(asyncEnd, 'nameOrChannels.asyncEnd');
|
|
assertChannel(error, 'nameOrChannels.error');
|
|
|
|
this.start = start;
|
|
this.end = end;
|
|
this.asyncStart = asyncStart;
|
|
this.asyncEnd = asyncEnd;
|
|
this.error = error;
|
|
} else {
|
|
throw new ERR_INVALID_ARG_TYPE('nameOrChannels',
|
|
['string', 'object', 'Channel'],
|
|
nameOrChannels);
|
|
}
|
|
}
|
|
|
|
subscribe(handlers) {
|
|
for (const name of traceEvents) {
|
|
if (!handlers[name]) continue;
|
|
|
|
this[name]?.subscribe(handlers[name]);
|
|
}
|
|
}
|
|
|
|
unsubscribe(handlers) {
|
|
let done = true;
|
|
|
|
for (const name of traceEvents) {
|
|
if (!handlers[name]) continue;
|
|
|
|
if (!this[name]?.unsubscribe(handlers[name])) {
|
|
done = false;
|
|
}
|
|
}
|
|
|
|
return done;
|
|
}
|
|
|
|
traceSync(fn, context = {}, thisArg, ...args) {
|
|
const { start, end, error } = this;
|
|
|
|
return start.runStores(context, () => {
|
|
try {
|
|
const result = ReflectApply(fn, thisArg, args);
|
|
context.result = result;
|
|
return result;
|
|
} catch (err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
throw err;
|
|
} finally {
|
|
end.publish(context);
|
|
}
|
|
});
|
|
}
|
|
|
|
tracePromise(fn, context = {}, thisArg, ...args) {
|
|
const { start, end, asyncStart, asyncEnd, error } = this;
|
|
|
|
function reject(err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
asyncStart.publish(context);
|
|
// TODO: Is there a way to have asyncEnd _after_ the continuation?
|
|
asyncEnd.publish(context);
|
|
return PromiseReject(err);
|
|
}
|
|
|
|
function resolve(result) {
|
|
context.result = result;
|
|
asyncStart.publish(context);
|
|
// TODO: Is there a way to have asyncEnd _after_ the continuation?
|
|
asyncEnd.publish(context);
|
|
return result;
|
|
}
|
|
|
|
return start.runStores(context, () => {
|
|
try {
|
|
let promise = ReflectApply(fn, thisArg, args);
|
|
// Convert thenables to native promises
|
|
if (!(promise instanceof Promise)) {
|
|
promise = PromiseResolve(promise);
|
|
}
|
|
return PromisePrototypeThen(promise, resolve, reject);
|
|
} catch (err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
throw err;
|
|
} finally {
|
|
end.publish(context);
|
|
}
|
|
});
|
|
}
|
|
|
|
traceCallback(fn, position = -1, context = {}, thisArg, ...args) {
|
|
const { start, end, asyncStart, asyncEnd, error } = this;
|
|
|
|
function wrappedCallback(err, res) {
|
|
if (err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
} else {
|
|
context.result = res;
|
|
}
|
|
|
|
// Using runStores here enables manual context failure recovery
|
|
asyncStart.runStores(context, () => {
|
|
try {
|
|
return ReflectApply(callback, this, arguments);
|
|
} finally {
|
|
asyncEnd.publish(context);
|
|
}
|
|
});
|
|
}
|
|
|
|
const callback = ArrayPrototypeAt(args, position);
|
|
validateFunction(callback, 'callback');
|
|
ArrayPrototypeSplice(args, position, 1, wrappedCallback);
|
|
|
|
return start.runStores(context, () => {
|
|
try {
|
|
return ReflectApply(fn, thisArg, args);
|
|
} catch (err) {
|
|
context.error = err;
|
|
error.publish(context);
|
|
throw err;
|
|
} finally {
|
|
end.publish(context);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
|
|
function tracingChannel(nameOrChannels) {
|
|
return new TracingChannel(nameOrChannels);
|
|
}
|
|
|
|
module.exports = {
|
|
channel,
|
|
hasSubscribers,
|
|
subscribe,
|
|
tracingChannel,
|
|
unsubscribe,
|
|
Channel,
|
|
};
|