'use strict'; const { ArrayPrototypeAt, ArrayPrototypeIndexOf, ArrayPrototypePush, ArrayPrototypePushApply, ArrayPrototypeSlice, ArrayPrototypeSplice, ObjectDefineProperty, ObjectGetPrototypeOf, ObjectSetPrototypeOf, Promise, PromisePrototypeThen, PromiseReject, PromiseResolve, ReflectApply, SafeFinalizationRegistry, SafeMap, SymbolHasInstance, } = primordials; const { codes: { ERR_INVALID_ARG_TYPE, }, } = require('internal/errors'); const { validateFunction, } = require('internal/validators'); const { triggerUncaughtException } = internalBinding('errors'); const { WeakReference } = require('internal/util'); // Can't delete when weakref count reaches 0 as it could increment again. // Only GC can be used as a valid time to clean up the channels map. class WeakRefMap extends SafeMap { #finalizers = new SafeFinalizationRegistry((key) => { this.delete(key); }); set(key, value) { this.#finalizers.register(value, key); return super.set(key, new WeakReference(value)); } get(key) { return super.get(key)?.get(); } incRef(key) { return super.get(key)?.incRef(); } decRef(key) { return super.get(key)?.decRef(); } } function markActive(channel) { // eslint-disable-next-line no-use-before-define ObjectSetPrototypeOf(channel, ActiveChannel.prototype); channel._subscribers = []; channel._stores = new SafeMap(); } function maybeMarkInactive(channel) { // When there are no more active subscribers or bound, restore to fast prototype. if (!channel._subscribers.length && !channel._stores.size) { // eslint-disable-next-line no-use-before-define ObjectSetPrototypeOf(channel, Channel.prototype); channel._subscribers = undefined; channel._stores = undefined; } } function defaultTransform(data) { return data; } function wrapStoreRun(store, data, next, transform = defaultTransform) { return () => { let context; try { context = transform(data); } catch (err) { process.nextTick(() => { triggerUncaughtException(err, false); }); return next(); } return store.run(context, next); }; } // TODO(qard): should there be a C++ channel interface? class ActiveChannel { subscribe(subscription) { validateFunction(subscription, 'subscription'); this._subscribers = ArrayPrototypeSlice(this._subscribers); ArrayPrototypePush(this._subscribers, subscription); channels.incRef(this.name); } unsubscribe(subscription) { const index = ArrayPrototypeIndexOf(this._subscribers, subscription); if (index === -1) return false; const before = ArrayPrototypeSlice(this._subscribers, 0, index); const after = ArrayPrototypeSlice(this._subscribers, index + 1); this._subscribers = before; ArrayPrototypePushApply(this._subscribers, after); channels.decRef(this.name); maybeMarkInactive(this); return true; } bindStore(store, transform) { const replacing = this._stores.has(store); if (!replacing) channels.incRef(this.name); this._stores.set(store, transform); } unbindStore(store) { if (!this._stores.has(store)) { return false; } this._stores.delete(store); channels.decRef(this.name); maybeMarkInactive(this); return true; } get hasSubscribers() { return true; } publish(data) { const subscribers = this._subscribers; for (let i = 0; i < (subscribers?.length || 0); i++) { try { const onMessage = subscribers[i]; onMessage(data, this.name); } catch (err) { process.nextTick(() => { triggerUncaughtException(err, false); }); } } } runStores(data, fn, thisArg, ...args) { let run = () => { this.publish(data); return ReflectApply(fn, thisArg, args); }; for (const entry of this._stores.entries()) { const store = entry[0]; const transform = entry[1]; run = wrapStoreRun(store, data, run, transform); } return run(); } } class Channel { constructor(name) { this._subscribers = undefined; this._stores = undefined; this.name = name; channels.set(name, this); } static [SymbolHasInstance](instance) { const prototype = ObjectGetPrototypeOf(instance); return prototype === Channel.prototype || prototype === ActiveChannel.prototype; } subscribe(subscription) { markActive(this); this.subscribe(subscription); } unsubscribe() { return false; } bindStore(store, transform) { markActive(this); this.bindStore(store, transform); } unbindStore() { return false; } get hasSubscribers() { return false; } publish() {} runStores(data, fn, thisArg, ...args) { return ReflectApply(fn, thisArg, args); } } const channels = new WeakRefMap(); function channel(name) { const channel = channels.get(name); if (channel) return channel; if (typeof name !== 'string' && typeof name !== 'symbol') { throw new ERR_INVALID_ARG_TYPE('channel', ['string', 'symbol'], name); } return new Channel(name); } function subscribe(name, subscription) { return channel(name).subscribe(subscription); } function unsubscribe(name, subscription) { return channel(name).unsubscribe(subscription); } function hasSubscribers(name) { const channel = channels.get(name); if (!channel) return false; return channel.hasSubscribers; } const traceEvents = [ 'start', 'end', 'asyncStart', 'asyncEnd', 'error', ]; function assertChannel(value, name) { if (!(value instanceof Channel)) { throw new ERR_INVALID_ARG_TYPE(name, ['Channel'], value); } } function tracingChannelFrom(nameOrChannels, name) { if (typeof nameOrChannels === 'string') { return channel(`tracing:${nameOrChannels}:${name}`); } if (typeof nameOrChannels === 'object' && nameOrChannels !== null) { const channel = nameOrChannels[name]; assertChannel(channel, `nameOrChannels.${name}`); return channel; } throw new ERR_INVALID_ARG_TYPE('nameOrChannels', ['string', 'object', 'TracingChannel'], nameOrChannels); } class TracingChannel { constructor(nameOrChannels) { for (let i = 0; i < traceEvents.length; ++i) { const eventName = traceEvents[i]; ObjectDefineProperty(this, eventName, { __proto__: null, value: tracingChannelFrom(nameOrChannels, eventName), }); } } get hasSubscribers() { return this.start?.hasSubscribers || this.end?.hasSubscribers || this.asyncStart?.hasSubscribers || this.asyncEnd?.hasSubscribers || this.error?.hasSubscribers; } subscribe(handlers) { for (let i = 0; i < traceEvents.length; ++i) { const name = traceEvents[i]; if (!handlers[name]) continue; this[name]?.subscribe(handlers[name]); } } unsubscribe(handlers) { let done = true; for (let i = 0; i < traceEvents.length; ++i) { const name = traceEvents[i]; if (!handlers[name]) continue; if (!this[name]?.unsubscribe(handlers[name])) { done = false; } } return done; } traceSync(fn, context = {}, thisArg, ...args) { if (!this.hasSubscribers) { return ReflectApply(fn, thisArg, args); } const { start, end, error } = this; return start.runStores(context, () => { try { const result = ReflectApply(fn, thisArg, args); context.result = result; return result; } catch (err) { context.error = err; error.publish(context); throw err; } finally { end.publish(context); } }); } tracePromise(fn, context = {}, thisArg, ...args) { if (!this.hasSubscribers) { return ReflectApply(fn, thisArg, args); } const { start, end, asyncStart, asyncEnd, error } = this; function reject(err) { context.error = err; error.publish(context); asyncStart.publish(context); // TODO: Is there a way to have asyncEnd _after_ the continuation? asyncEnd.publish(context); return PromiseReject(err); } function resolve(result) { context.result = result; asyncStart.publish(context); // TODO: Is there a way to have asyncEnd _after_ the continuation? asyncEnd.publish(context); return result; } return start.runStores(context, () => { try { let promise = ReflectApply(fn, thisArg, args); // Convert thenables to native promises if (!(promise instanceof Promise)) { promise = PromiseResolve(promise); } return PromisePrototypeThen(promise, resolve, reject); } catch (err) { context.error = err; error.publish(context); throw err; } finally { end.publish(context); } }); } traceCallback(fn, position = -1, context = {}, thisArg, ...args) { if (!this.hasSubscribers) { return ReflectApply(fn, thisArg, args); } const { start, end, asyncStart, asyncEnd, error } = this; function wrappedCallback(err, res) { if (err) { context.error = err; error.publish(context); } else { context.result = res; } // Using runStores here enables manual context failure recovery asyncStart.runStores(context, () => { try { return ReflectApply(callback, this, arguments); } finally { asyncEnd.publish(context); } }); } const callback = ArrayPrototypeAt(args, position); validateFunction(callback, 'callback'); ArrayPrototypeSplice(args, position, 1, wrappedCallback); return start.runStores(context, () => { try { return ReflectApply(fn, thisArg, args); } catch (err) { context.error = err; error.publish(context); throw err; } finally { end.publish(context); } }); } } function tracingChannel(nameOrChannels) { return new TracingChannel(nameOrChannels); } module.exports = { channel, hasSubscribers, subscribe, tracingChannel, unsubscribe, Channel, };