mirror of
https://github.com/nodejs/node.git
synced 2024-11-21 10:59:27 +00:00
module: have a single hooks thread for all workers
PR-URL: https://github.com/nodejs/node/pull/52706 Reviewed-By: Geoffrey Booth <webadmin@geoffreybooth.com> Reviewed-By: Jacob Smith <jacob@frende.me> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com> Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de>
This commit is contained in:
parent
95c16b2baf
commit
22cb99d073
@ -95,6 +95,7 @@ port.on('message', (message) => {
|
||||
filename,
|
||||
hasStdin,
|
||||
publicPort,
|
||||
hooksPort,
|
||||
workerData,
|
||||
} = message;
|
||||
|
||||
@ -109,6 +110,7 @@ port.on('message', (message) => {
|
||||
}
|
||||
|
||||
require('internal/worker').assignEnvironmentData(environmentData);
|
||||
require('internal/worker').hooksPort = hooksPort;
|
||||
|
||||
if (SharedArrayBuffer !== undefined) {
|
||||
// The counter is only passed to the workers created by the main thread,
|
||||
|
@ -35,7 +35,7 @@ const {
|
||||
const { exitCodes: { kUnsettledTopLevelAwait } } = internalBinding('errors');
|
||||
const { URL } = require('internal/url');
|
||||
const { canParse: URLCanParse } = internalBinding('url');
|
||||
const { receiveMessageOnPort } = require('worker_threads');
|
||||
const { receiveMessageOnPort, isMainThread } = require('worker_threads');
|
||||
const {
|
||||
isAnyArrayBuffer,
|
||||
isArrayBufferView,
|
||||
@ -482,6 +482,8 @@ class HooksProxy {
|
||||
*/
|
||||
#worker;
|
||||
|
||||
#portToHooksThread;
|
||||
|
||||
/**
|
||||
* The last notification ID received from the worker. This is used to detect
|
||||
* if the worker has already sent a notification before putting the main
|
||||
@ -499,26 +501,38 @@ class HooksProxy {
|
||||
#isReady = false;
|
||||
|
||||
constructor() {
|
||||
const { InternalWorker } = require('internal/worker');
|
||||
MessageChannel ??= require('internal/worker/io').MessageChannel;
|
||||
|
||||
const { InternalWorker, hooksPort } = require('internal/worker');
|
||||
const lock = new SharedArrayBuffer(SHARED_MEMORY_BYTE_LENGTH);
|
||||
this.#lock = new Int32Array(lock);
|
||||
|
||||
this.#worker = new InternalWorker(loaderWorkerId, {
|
||||
stderr: false,
|
||||
stdin: false,
|
||||
stdout: false,
|
||||
trackUnmanagedFds: false,
|
||||
workerData: {
|
||||
lock,
|
||||
},
|
||||
});
|
||||
this.#worker.unref(); // ! Allows the process to eventually exit.
|
||||
this.#worker.on('exit', process.exit);
|
||||
if (isMainThread) {
|
||||
// Main thread is the only one that creates the internal single hooks worker
|
||||
this.#worker = new InternalWorker(loaderWorkerId, {
|
||||
stderr: false,
|
||||
stdin: false,
|
||||
stdout: false,
|
||||
trackUnmanagedFds: false,
|
||||
workerData: {
|
||||
lock,
|
||||
},
|
||||
});
|
||||
this.#worker.unref(); // ! Allows the process to eventually exit.
|
||||
this.#worker.on('exit', process.exit);
|
||||
this.#portToHooksThread = this.#worker;
|
||||
} else {
|
||||
this.#portToHooksThread = hooksPort;
|
||||
}
|
||||
}
|
||||
|
||||
waitForWorker() {
|
||||
// There is one Hooks instance for each worker thread. But only one of these Hooks instances
|
||||
// has an InternalWorker. That was the Hooks instance created for the main thread.
|
||||
// It means for all Hooks instances that are not on the main thread => they are ready because they
|
||||
// delegate to the single InternalWorker anyway.
|
||||
if (!isMainThread) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (!this.#isReady) {
|
||||
const { kIsOnline } = require('internal/worker');
|
||||
if (!this.#worker[kIsOnline]) {
|
||||
@ -535,6 +549,37 @@ class HooksProxy {
|
||||
}
|
||||
}
|
||||
|
||||
#postMessageToWorker(method, type, transferList, args) {
|
||||
this.waitForWorker();
|
||||
|
||||
MessageChannel ??= require('internal/worker/io').MessageChannel;
|
||||
|
||||
const {
|
||||
port1: fromHooksThread,
|
||||
port2: toHooksThread,
|
||||
} = new MessageChannel();
|
||||
|
||||
// Pass work to the worker.
|
||||
debug(`post ${type} message to worker`, { method, args, transferList });
|
||||
const usedTransferList = [toHooksThread];
|
||||
if (transferList) {
|
||||
ArrayPrototypePushApply(usedTransferList, transferList);
|
||||
}
|
||||
|
||||
this.#portToHooksThread.postMessage(
|
||||
{
|
||||
__proto__: null,
|
||||
args,
|
||||
lock: this.#lock,
|
||||
method,
|
||||
port: toHooksThread,
|
||||
},
|
||||
usedTransferList,
|
||||
);
|
||||
|
||||
return fromHooksThread;
|
||||
}
|
||||
|
||||
/**
|
||||
* Invoke a remote method asynchronously.
|
||||
* @param {string} method Method to invoke
|
||||
@ -543,22 +588,7 @@ class HooksProxy {
|
||||
* @returns {Promise<any>}
|
||||
*/
|
||||
async makeAsyncRequest(method, transferList, ...args) {
|
||||
this.waitForWorker();
|
||||
|
||||
MessageChannel ??= require('internal/worker/io').MessageChannel;
|
||||
const asyncCommChannel = new MessageChannel();
|
||||
|
||||
// Pass work to the worker.
|
||||
debug('post async message to worker', { method, args, transferList });
|
||||
const finalTransferList = [asyncCommChannel.port2];
|
||||
if (transferList) {
|
||||
ArrayPrototypePushApply(finalTransferList, transferList);
|
||||
}
|
||||
this.#worker.postMessage({
|
||||
__proto__: null,
|
||||
method, args,
|
||||
port: asyncCommChannel.port2,
|
||||
}, finalTransferList);
|
||||
const fromHooksThread = this.#postMessageToWorker(method, 'Async', transferList, args);
|
||||
|
||||
if (this.#numberOfPendingAsyncResponses++ === 0) {
|
||||
// On the next lines, the main thread will await a response from the worker thread that might
|
||||
@ -567,7 +597,11 @@ class HooksProxy {
|
||||
// However we want to keep the process alive until the worker thread responds (or until the
|
||||
// event loop of the worker thread is also empty), so we ref the worker until we get all the
|
||||
// responses back.
|
||||
this.#worker.ref();
|
||||
if (this.#worker) {
|
||||
this.#worker.ref();
|
||||
} else {
|
||||
this.#portToHooksThread.ref();
|
||||
}
|
||||
}
|
||||
|
||||
let response;
|
||||
@ -576,18 +610,26 @@ class HooksProxy {
|
||||
await AtomicsWaitAsync(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId).value;
|
||||
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
|
||||
|
||||
response = receiveMessageOnPort(asyncCommChannel.port1);
|
||||
response = receiveMessageOnPort(fromHooksThread);
|
||||
} while (response == null);
|
||||
debug('got async response from worker', { method, args }, this.#lock);
|
||||
|
||||
if (--this.#numberOfPendingAsyncResponses === 0) {
|
||||
// We got all the responses from the worker, its job is done (until next time).
|
||||
this.#worker.unref();
|
||||
if (this.#worker) {
|
||||
this.#worker.unref();
|
||||
} else {
|
||||
this.#portToHooksThread.unref();
|
||||
}
|
||||
}
|
||||
|
||||
const body = this.#unwrapMessage(response);
|
||||
asyncCommChannel.port1.close();
|
||||
return body;
|
||||
if (response.message.status === 'exit') {
|
||||
process.exit(response.message.body);
|
||||
}
|
||||
|
||||
fromHooksThread.close();
|
||||
|
||||
return this.#unwrapMessage(response);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -598,11 +640,7 @@ class HooksProxy {
|
||||
* @returns {any}
|
||||
*/
|
||||
makeSyncRequest(method, transferList, ...args) {
|
||||
this.waitForWorker();
|
||||
|
||||
// Pass work to the worker.
|
||||
debug('post sync message to worker', { method, args, transferList });
|
||||
this.#worker.postMessage({ __proto__: null, method, args }, transferList);
|
||||
const fromHooksThread = this.#postMessageToWorker(method, 'Sync', transferList, args);
|
||||
|
||||
let response;
|
||||
do {
|
||||
@ -611,7 +649,7 @@ class HooksProxy {
|
||||
AtomicsWait(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, this.#workerNotificationLastId);
|
||||
this.#workerNotificationLastId = AtomicsLoad(this.#lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
|
||||
|
||||
response = this.#worker.receiveMessageSync();
|
||||
response = receiveMessageOnPort(fromHooksThread);
|
||||
} while (response == null);
|
||||
debug('got sync response from worker', { method, args });
|
||||
if (response.message.status === 'never-settle') {
|
||||
@ -619,6 +657,9 @@ class HooksProxy {
|
||||
} else if (response.message.status === 'exit') {
|
||||
process.exit(response.message.body);
|
||||
}
|
||||
|
||||
fromHooksThread.close();
|
||||
|
||||
return this.#unwrapMessage(response);
|
||||
}
|
||||
|
||||
|
@ -41,6 +41,7 @@ const { ModuleWrap, kEvaluating, kEvaluated } = internalBinding('module_wrap');
|
||||
const {
|
||||
urlToFilename,
|
||||
} = require('internal/modules/helpers');
|
||||
const { isMainThread } = require('worker_threads');
|
||||
let defaultResolve, defaultLoad, defaultLoadSync, importMetaInitializer;
|
||||
|
||||
/**
|
||||
@ -607,10 +608,11 @@ class CustomizedModuleLoader {
|
||||
*/
|
||||
constructor() {
|
||||
getHooksProxy();
|
||||
_hasCustomizations = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register some loader specifier.
|
||||
* Register a loader specifier.
|
||||
* @param {string} originalSpecifier The specified URL path of the loader to
|
||||
* be registered.
|
||||
* @param {string} parentURL The parent URL from where the loader will be
|
||||
@ -618,10 +620,14 @@ class CustomizedModuleLoader {
|
||||
* @param {any} [data] Arbitrary data to be passed from the custom loader
|
||||
* (user-land) to the worker.
|
||||
* @param {any[]} [transferList] Objects in `data` that are changing ownership
|
||||
* @returns {{ format: string, url: URL['href'] }}
|
||||
* @returns {{ format: string, url: URL['href'] } | undefined}
|
||||
*/
|
||||
register(originalSpecifier, parentURL, data, transferList) {
|
||||
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
|
||||
if (isMainThread) {
|
||||
// Only the main thread has a Hooks instance with worker thread. All other Worker threads
|
||||
// delegate their hooks to the HooksThread of the main thread.
|
||||
return hooksProxy.makeSyncRequest('register', transferList, originalSpecifier, parentURL, data);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -719,6 +725,12 @@ function getHooksProxy() {
|
||||
return hooksProxy;
|
||||
}
|
||||
|
||||
let _hasCustomizations = false;
|
||||
function hasCustomizations() {
|
||||
return _hasCustomizations;
|
||||
}
|
||||
|
||||
|
||||
let cascadedLoader;
|
||||
|
||||
/**
|
||||
@ -780,6 +792,7 @@ function register(specifier, parentURL = undefined, options) {
|
||||
|
||||
module.exports = {
|
||||
createModuleLoader,
|
||||
hasCustomizations,
|
||||
getHooksProxy,
|
||||
getOrInitializeCascadedLoader,
|
||||
register,
|
||||
|
@ -1,6 +1,8 @@
|
||||
'use strict';
|
||||
|
||||
const {
|
||||
ArrayPrototypeFilter,
|
||||
ArrayPrototypePush,
|
||||
AtomicsAdd,
|
||||
AtomicsNotify,
|
||||
DataViewPrototypeGetBuffer,
|
||||
@ -97,7 +99,21 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
|
||||
// so it can detect the exit event.
|
||||
const { exit } = process;
|
||||
process.exit = function(code) {
|
||||
syncCommPort.postMessage(wrapMessage('exit', code ?? process.exitCode));
|
||||
const exitMsg = wrapMessage('exit', code ?? process.exitCode);
|
||||
if (hooks) {
|
||||
for (let i = 0; i < allThreadRegisteredHandlerPorts.length; i++) {
|
||||
const { port: registeredPort } = allThreadRegisteredHandlerPorts[i];
|
||||
registeredPort.postMessage(exitMsg);
|
||||
}
|
||||
|
||||
for (const { port, lock: operationLock } of unsettledResponsePorts) {
|
||||
port.postMessage(exitMsg);
|
||||
// Wake all threads that have pending operations.
|
||||
AtomicsAdd(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
|
||||
AtomicsNotify(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
|
||||
}
|
||||
}
|
||||
syncCommPort.postMessage(exitMsg);
|
||||
AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
|
||||
AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
|
||||
return ReflectApply(exit, this, arguments);
|
||||
@ -145,8 +161,11 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
|
||||
const unsettledResponsePorts = new SafeSet();
|
||||
|
||||
process.on('beforeExit', () => {
|
||||
for (const port of unsettledResponsePorts) {
|
||||
for (const { port, lock: operationLock } of unsettledResponsePorts) {
|
||||
port.postMessage(wrapMessage('never-settle'));
|
||||
// Wake all threads that have pending operations.
|
||||
AtomicsAdd(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
|
||||
AtomicsNotify(operationLock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
|
||||
}
|
||||
unsettledResponsePorts.clear();
|
||||
|
||||
@ -164,24 +183,59 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
|
||||
setImmediate(() => {});
|
||||
});
|
||||
|
||||
let allThreadRegisteredHandlerPorts = [];
|
||||
/**
|
||||
* @callback registerHandler
|
||||
* @param {MessagePort} toWorkerThread - Upon Worker creation a message channel between the new Worker
|
||||
* and the Hooks thread is being initialized. This is the MessagePort that the Hooks thread will use post
|
||||
* messages to the worker. The other MessagePort is passed to the new Worker itself via LOAD_SCRIPT message
|
||||
*/
|
||||
function registerHandler(toWorkerThread, registeredThreadId) {
|
||||
toWorkerThread.on('message', handleMessage);
|
||||
ArrayPrototypePush(allThreadRegisteredHandlerPorts, { port: toWorkerThread, registeredThreadId });
|
||||
}
|
||||
|
||||
/**
|
||||
* @callback registerHandler
|
||||
* @param {number} unregisteredThreadId - the thread id of the worker thread that is being unregistered
|
||||
* from the Hooks Thread
|
||||
*/
|
||||
function unregisterHandler(unregisteredThreadId) {
|
||||
allThreadRegisteredHandlerPorts = ArrayPrototypeFilter(
|
||||
allThreadRegisteredHandlerPorts, (el) => el.registeredThreadId !== unregisteredThreadId);
|
||||
}
|
||||
|
||||
function getMessageHandler(method) {
|
||||
if (method === '#registerWorkerClient') {
|
||||
return registerHandler;
|
||||
}
|
||||
if (method === '#unregisterWorkerClient') {
|
||||
return unregisterHandler;
|
||||
}
|
||||
return hooks[method];
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles incoming messages from the main thread or other workers.
|
||||
* @param {object} options - The options object.
|
||||
* @param {string} options.method - The name of the hook.
|
||||
* @param {Array} options.args - The arguments to pass to the method.
|
||||
* @param {MessagePort} options.port - The message port to use for communication.
|
||||
* @param {Int32Array} options.lock - The shared memory where the caller expects to get awaken.
|
||||
*/
|
||||
async function handleMessage({ method, args, port }) {
|
||||
async function handleMessage({ method, args, port, lock: msgLock }) {
|
||||
// Each potential exception needs to be caught individually so that the correct error is sent to
|
||||
// the main thread.
|
||||
let hasError = false;
|
||||
let shouldRemoveGlobalErrorHandler = false;
|
||||
assert(typeof hooks[method] === 'function');
|
||||
const messageHandler = getMessageHandler(method);
|
||||
assert(typeof messageHandler === 'function');
|
||||
if (port == null && !hasUncaughtExceptionCaptureCallback()) {
|
||||
// When receiving sync messages, we want to unlock the main thread when there's an exception.
|
||||
process.on('uncaughtException', errorHandler);
|
||||
shouldRemoveGlobalErrorHandler = true;
|
||||
}
|
||||
const usedLock = msgLock ?? lock;
|
||||
|
||||
// We are about to yield the execution with `await ReflectApply` below. In case the code
|
||||
// following the `await` never runs, we remove the message handler so the `beforeExit` event
|
||||
@ -192,17 +246,19 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
|
||||
clearImmediate(immediate);
|
||||
immediate = setImmediate(checkForMessages).unref();
|
||||
|
||||
unsettledResponsePorts.add(port ?? syncCommPort);
|
||||
const unsettledActionData = { port: port ?? syncCommPort, lock: usedLock };
|
||||
|
||||
unsettledResponsePorts.add(unsettledActionData);
|
||||
|
||||
let response;
|
||||
try {
|
||||
response = await ReflectApply(hooks[method], hooks, args);
|
||||
response = await ReflectApply(messageHandler, hooks, args);
|
||||
} catch (exception) {
|
||||
hasError = true;
|
||||
response = exception;
|
||||
}
|
||||
|
||||
unsettledResponsePorts.delete(port ?? syncCommPort);
|
||||
unsettledResponsePorts.delete(unsettledActionData);
|
||||
|
||||
// Send the method response (or exception) to the main thread.
|
||||
try {
|
||||
@ -215,8 +271,8 @@ async function customizedModuleWorker(lock, syncCommPort, errorHandler) {
|
||||
(port ?? syncCommPort).postMessage(wrapMessage('error', exception));
|
||||
}
|
||||
|
||||
AtomicsAdd(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
|
||||
AtomicsNotify(lock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
|
||||
AtomicsAdd(usedLock, WORKER_TO_MAIN_THREAD_NOTIFICATION, 1);
|
||||
AtomicsNotify(usedLock, WORKER_TO_MAIN_THREAD_NOTIFICATION);
|
||||
if (shouldRemoveGlobalErrorHandler) {
|
||||
process.off('uncaughtException', errorHandler);
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ class Worker extends EventEmitter {
|
||||
constructor(filename, options = kEmptyObject) {
|
||||
throwIfBuildingSnapshot('Creating workers');
|
||||
super();
|
||||
const isInternal = arguments[2] === kIsInternal;
|
||||
const isInternal = this.#isInternal = arguments[2] === kIsInternal;
|
||||
debug(
|
||||
`[${threadId}] create new worker`,
|
||||
filename,
|
||||
@ -258,6 +258,15 @@ class Worker extends EventEmitter {
|
||||
...new SafeArrayIterator(options.transferList));
|
||||
|
||||
this[kPublicPort] = port1;
|
||||
const {
|
||||
port1: toWorkerThread,
|
||||
port2: toHooksThread,
|
||||
} = new MessageChannel();
|
||||
if (!isInternal) {
|
||||
// This is not an internal hooks thread => it needs a channel to the hooks thread:
|
||||
// - send it one side of a channel here
|
||||
ArrayPrototypePush(transferList, toHooksThread);
|
||||
}
|
||||
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
|
||||
this[kPublicPort].on(event, (message) => this.emit(event, message));
|
||||
});
|
||||
@ -272,8 +281,20 @@ class Worker extends EventEmitter {
|
||||
workerData: options.workerData,
|
||||
environmentData,
|
||||
publicPort: port2,
|
||||
hooksPort: !isInternal ? toHooksThread : undefined,
|
||||
hasStdin: !!options.stdin,
|
||||
}, transferList);
|
||||
|
||||
const loaderModule = require('internal/modules/esm/loader');
|
||||
const hasCustomizations = loaderModule.hasCustomizations();
|
||||
|
||||
if (!isInternal && hasCustomizations) {
|
||||
// - send the second side of the channel to the hooks thread,
|
||||
// also announce the threadId of the Worker that will use that port.
|
||||
// This is needed for the cleanup stage
|
||||
loaderModule.getHooksProxy().makeSyncRequest(
|
||||
'#registerWorkerClient', [toWorkerThread], toWorkerThread, this.threadId);
|
||||
}
|
||||
// Use this to cache the Worker's loopStart value once available.
|
||||
this[kLoopStartTime] = -1;
|
||||
this[kIsOnline] = false;
|
||||
@ -293,6 +314,12 @@ class Worker extends EventEmitter {
|
||||
|
||||
[kOnExit](code, customErr, customErrReason) {
|
||||
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
|
||||
const loaderModule = require('internal/modules/esm/loader');
|
||||
const hasCustomizations = loaderModule.hasCustomizations();
|
||||
|
||||
if (!this.#isInternal && hasCustomizations) {
|
||||
loaderModule.getHooksProxy()?.makeAsyncRequest('#unregisterWorkerClient', undefined, this.threadId);
|
||||
}
|
||||
drainMessagePort(this[kPublicPort]);
|
||||
drainMessagePort(this[kPort]);
|
||||
this.removeAllListeners('message');
|
||||
@ -435,6 +462,8 @@ class Worker extends EventEmitter {
|
||||
return makeResourceLimits(this[kHandle].getResourceLimits());
|
||||
}
|
||||
|
||||
#isInternal = false;
|
||||
|
||||
getHeapSnapshot(options) {
|
||||
const {
|
||||
HeapSnapshotStream,
|
||||
@ -532,6 +561,7 @@ module.exports = {
|
||||
kIsOnline,
|
||||
isMainThread,
|
||||
SHARE_ENV,
|
||||
hooksPort: undefined,
|
||||
resourceLimits:
|
||||
!isMainThread ? makeResourceLimits(resourceLimitsRaw) : {},
|
||||
setEnvironmentData,
|
||||
|
@ -148,9 +148,11 @@ void HandleWrap::OnClose(uv_handle_t* handle) {
|
||||
wrap->OnClose();
|
||||
wrap->handle_wrap_queue_.Remove();
|
||||
|
||||
if (!wrap->persistent().IsEmpty() &&
|
||||
wrap->object()->Has(env->context(), env->handle_onclose_symbol())
|
||||
.FromMaybe(false)) {
|
||||
if (!env->isolate()->IsExecutionTerminating() &&
|
||||
!wrap->persistent().IsEmpty() &&
|
||||
wrap->object()
|
||||
->Has(env->context(), env->handle_onclose_symbol())
|
||||
.FromMaybe(false)) {
|
||||
wrap->MakeCallback(env->handle_onclose_symbol(), 0, nullptr);
|
||||
}
|
||||
}
|
||||
|
@ -50,6 +50,7 @@ const {
|
||||
skipIfDumbTerminal,
|
||||
skipIfEslintMissing,
|
||||
skipIfInspectorDisabled,
|
||||
skipIfWorker,
|
||||
spawnPromisified,
|
||||
} = common;
|
||||
|
||||
@ -104,5 +105,6 @@ export {
|
||||
skipIfDumbTerminal,
|
||||
skipIfEslintMissing,
|
||||
skipIfInspectorDisabled,
|
||||
skipIfWorker,
|
||||
spawnPromisified,
|
||||
};
|
||||
|
@ -1,6 +1,11 @@
|
||||
import '../common/index.mjs';
|
||||
import { skipIfWorker } from '../common/index.mjs';
|
||||
import assert from 'node:assert/strict';
|
||||
import { mock } from '../fixtures/es-module-loaders/mock.mjs';
|
||||
// Importing mock.mjs above will call `register` to modify the loaders chain.
|
||||
// Modifying the loader chain is not supported currently when running from a worker thread.
|
||||
// Relevant PR: https://github.com/nodejs/node/pull/52706
|
||||
// See comment: https://github.com/nodejs/node/pull/52706/files#r1585144580
|
||||
skipIfWorker();
|
||||
|
||||
mock('node:events', {
|
||||
EventEmitter: 'This is mocked!'
|
||||
|
74
test/es-module/test-esm-loader-threads.mjs
Normal file
74
test/es-module/test-esm-loader-threads.mjs
Normal file
@ -0,0 +1,74 @@
|
||||
import { spawnPromisified } from '../common/index.mjs';
|
||||
import * as fixtures from '../common/fixtures.mjs';
|
||||
import { strictEqual } from 'node:assert';
|
||||
import { execPath } from 'node:process';
|
||||
import { describe, it } from 'node:test';
|
||||
|
||||
describe('off-thread hooks', { concurrency: true }, () => {
|
||||
it('uses only one hooks thread to support multiple application threads', async () => {
|
||||
const { code, signal, stdout, stderr } = await spawnPromisified(execPath, [
|
||||
'--no-warnings',
|
||||
'--import',
|
||||
`data:text/javascript,${encodeURIComponent(`
|
||||
import { register } from 'node:module';
|
||||
register(${JSON.stringify(fixtures.fileURL('es-module-loaders/hooks-log.mjs'))});
|
||||
`)}`,
|
||||
fixtures.path('es-module-loaders/workers-spawned.mjs'),
|
||||
]);
|
||||
|
||||
strictEqual(stderr, '');
|
||||
strictEqual(stdout.split('\n').filter((line) => line.startsWith('initialize')).length, 1);
|
||||
strictEqual(stdout.split('\n').filter((line) => line === 'foo').length, 2);
|
||||
strictEqual(stdout.split('\n').filter((line) => line === 'bar').length, 4);
|
||||
// Calls to resolve/load:
|
||||
// 1x main script: test/fixtures/es-module-loaders/workers-spawned.mjs
|
||||
// 3x worker_threads
|
||||
// => 1x test/fixtures/es-module-loaders/worker-log.mjs
|
||||
// 2x test/fixtures/es-module-loaders/worker-log-again.mjs => once per worker-log.mjs Worker instance
|
||||
// 2x test/fixtures/es-module-loaders/worker-log.mjs => once per worker-log.mjs Worker instance
|
||||
// 4x test/fixtures/es-module-loaders/worker-log-again.mjs => 2x for each worker-log
|
||||
// 6x module-named-exports.mjs => 2x worker-log.mjs + 4x worker-log-again.mjs
|
||||
// ===========================
|
||||
// 16 calls to resolve + 16 calls to load hook for the registered custom loader
|
||||
strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked resolve')).length, 16);
|
||||
strictEqual(stdout.split('\n').filter((line) => line.startsWith('hooked load')).length, 16);
|
||||
strictEqual(code, 0);
|
||||
strictEqual(signal, null);
|
||||
});
|
||||
|
||||
it('propagates the exit code from worker thread import exiting from resolve hook', async () => {
|
||||
const { code, signal, stdout, stderr } = await spawnPromisified(execPath, [
|
||||
'--no-warnings',
|
||||
'--import',
|
||||
`data:text/javascript,${encodeURIComponent(`
|
||||
import { register } from 'node:module';
|
||||
register(${JSON.stringify(fixtures.fileURL('es-module-loaders/hooks-exit-worker.mjs'))});
|
||||
`)}`,
|
||||
fixtures.path('es-module-loaders/worker-log-fail-worker-resolve.mjs'),
|
||||
]);
|
||||
|
||||
strictEqual(stderr, '');
|
||||
strictEqual(stdout.split('\n').filter((line) => line.startsWith('resolve process-exit-module-resolve')).length, 1);
|
||||
strictEqual(code, 42);
|
||||
strictEqual(signal, null);
|
||||
});
|
||||
|
||||
it('propagates the exit code from worker thread import exiting from load hook', async () => {
|
||||
const { code, signal, stdout, stderr } = await spawnPromisified(execPath, [
|
||||
'--no-warnings',
|
||||
'--import',
|
||||
`data:text/javascript,${encodeURIComponent(`
|
||||
import { register } from 'node:module';
|
||||
register(${JSON.stringify(fixtures.fileURL('es-module-loaders/hooks-exit-worker.mjs'))});
|
||||
`)}`,
|
||||
fixtures.path('es-module-loaders/worker-log-fail-worker-load.mjs'),
|
||||
]);
|
||||
|
||||
strictEqual(stderr, '');
|
||||
strictEqual(stdout.split('\n').filter((line) => line.startsWith('resolve process-exit-module-load')).length, 1);
|
||||
strictEqual(stdout.split('\n').filter((line) => line.startsWith('load process-exit-on-load:///')).length, 1);
|
||||
strictEqual(code, 43);
|
||||
strictEqual(signal, null);
|
||||
});
|
||||
|
||||
});
|
@ -1,7 +1,9 @@
|
||||
// Flags: --import ./test/fixtures/es-module-loaders/builtin-named-exports.mjs
|
||||
'use strict';
|
||||
|
||||
require('../common');
|
||||
const common = require('../common');
|
||||
common.skipIfWorker();
|
||||
|
||||
const { readFile, __fromLoader } = require('fs');
|
||||
const assert = require('assert');
|
||||
|
||||
|
@ -1,9 +1,10 @@
|
||||
// Flags: --import ./test/fixtures/es-module-loaders/builtin-named-exports.mjs
|
||||
import '../common/index.mjs';
|
||||
import { readFile, __fromLoader } from 'fs';
|
||||
import { skipIfWorker } from '../common/index.mjs';
|
||||
import * as fs from 'fs';
|
||||
import assert from 'assert';
|
||||
import ok from '../fixtures/es-modules/test-esm-ok.mjs';
|
||||
skipIfWorker();
|
||||
|
||||
assert(ok);
|
||||
assert(readFile);
|
||||
assert(__fromLoader);
|
||||
assert(fs.readFile);
|
||||
assert(fs.__fromLoader);
|
||||
|
@ -1,7 +1,8 @@
|
||||
import '../common/index.mjs';
|
||||
import { skipIfWorker } from '../common/index.mjs';
|
||||
import * as fixtures from '../common/fixtures.mjs';
|
||||
import { register } from 'node:module';
|
||||
import assert from 'node:assert';
|
||||
skipIfWorker();
|
||||
|
||||
async function resolve(referrer, context, next) {
|
||||
const result = await next(referrer, context);
|
||||
|
@ -1,3 +1,4 @@
|
||||
import { isMainThread } from '../../common/index.mjs';
|
||||
import * as fixtures from '../../common/fixtures.mjs';
|
||||
import { createRequire, register } from 'node:module';
|
||||
|
||||
@ -10,8 +11,10 @@ Object.defineProperty(globalThis, GET_BUILTIN, {
|
||||
configurable: false,
|
||||
});
|
||||
|
||||
register(fixtures.fileURL('es-module-loaders/builtin-named-exports-loader.mjs'), {
|
||||
data: {
|
||||
GET_BUILTIN,
|
||||
},
|
||||
});
|
||||
if (isMainThread) {
|
||||
register(fixtures.fileURL('es-module-loaders/builtin-named-exports-loader.mjs'), {
|
||||
data: {
|
||||
GET_BUILTIN,
|
||||
},
|
||||
});
|
||||
}
|
||||
|
21
test/fixtures/es-module-loaders/hooks-exit-worker.mjs
vendored
Normal file
21
test/fixtures/es-module-loaders/hooks-exit-worker.mjs
vendored
Normal file
@ -0,0 +1,21 @@
|
||||
import { writeFileSync } from 'node:fs';
|
||||
|
||||
export function resolve(specifier, context, next) {
|
||||
writeFileSync(1, `resolve ${specifier}\n`);
|
||||
if (specifier === 'process-exit-module-resolve') {
|
||||
process.exit(42);
|
||||
}
|
||||
|
||||
if (specifier === 'process-exit-module-load') {
|
||||
return { __proto__: null, shortCircuit: true, url: 'process-exit-on-load:///' }
|
||||
}
|
||||
return next(specifier, context);
|
||||
}
|
||||
|
||||
export function load(url, context, next) {
|
||||
writeFileSync(1, `load ${url}\n`);
|
||||
if (url === 'process-exit-on-load:///') {
|
||||
process.exit(43);
|
||||
}
|
||||
return next(url, context);
|
||||
}
|
19
test/fixtures/es-module-loaders/hooks-log.mjs
vendored
Normal file
19
test/fixtures/es-module-loaders/hooks-log.mjs
vendored
Normal file
@ -0,0 +1,19 @@
|
||||
import { writeFileSync } from 'node:fs';
|
||||
|
||||
let initializeCount = 0;
|
||||
let resolveCount = 0;
|
||||
let loadCount = 0;
|
||||
|
||||
export function initialize() {
|
||||
writeFileSync(1, `initialize ${++initializeCount}\n`);
|
||||
}
|
||||
|
||||
export function resolve(specifier, context, next) {
|
||||
writeFileSync(1, `hooked resolve ${++resolveCount} ${specifier}\n`);
|
||||
return next(specifier, context);
|
||||
}
|
||||
|
||||
export function load(url, context, next) {
|
||||
writeFileSync(1, `hooked load ${++loadCount} ${url}\n`);
|
||||
return next(url, context);
|
||||
}
|
@ -1,16 +1,13 @@
|
||||
import assert from 'node:assert';
|
||||
|
||||
// A loader that asserts that the defaultResolve will throw "not found"
|
||||
// (skipping the top-level main of course, and the built-in ones needed for run-worker).
|
||||
let mainLoad = true;
|
||||
export async function resolve(specifier, { importAttributes }, next) {
|
||||
if (mainLoad || specifier === 'path' || specifier === 'worker_threads') {
|
||||
mainLoad = false;
|
||||
return next(specifier);
|
||||
if (specifier.startsWith('./not-found')) {
|
||||
await assert.rejects(next(specifier), { code: 'ERR_MODULE_NOT_FOUND' });
|
||||
return {
|
||||
url: 'node:fs',
|
||||
importAttributes,
|
||||
};
|
||||
}
|
||||
await assert.rejects(next(specifier), { code: 'ERR_MODULE_NOT_FOUND' });
|
||||
return {
|
||||
url: 'node:fs',
|
||||
importAttributes,
|
||||
};
|
||||
return next(specifier);
|
||||
}
|
||||
|
1
test/fixtures/es-module-loaders/worker-fail-on-load.mjs
vendored
Normal file
1
test/fixtures/es-module-loaders/worker-fail-on-load.mjs
vendored
Normal file
@ -0,0 +1 @@
|
||||
import 'process-exit-module-load';
|
1
test/fixtures/es-module-loaders/worker-fail-on-resolve.mjs
vendored
Normal file
1
test/fixtures/es-module-loaders/worker-fail-on-resolve.mjs
vendored
Normal file
@ -0,0 +1 @@
|
||||
import 'process-exit-module-resolve';
|
3
test/fixtures/es-module-loaders/worker-log-again.mjs
vendored
Normal file
3
test/fixtures/es-module-loaders/worker-log-again.mjs
vendored
Normal file
@ -0,0 +1,3 @@
|
||||
import { bar } from './module-named-exports.mjs';
|
||||
|
||||
console.log(bar);
|
12
test/fixtures/es-module-loaders/worker-log-fail-worker-load.mjs
vendored
Normal file
12
test/fixtures/es-module-loaders/worker-log-fail-worker-load.mjs
vendored
Normal file
@ -0,0 +1,12 @@
|
||||
import { Worker } from 'worker_threads';
|
||||
import { foo } from './module-named-exports.mjs';
|
||||
|
||||
const workerURLFailOnLoad = new URL('./worker-fail-on-load.mjs', import.meta.url);
|
||||
console.log(foo);
|
||||
|
||||
// Spawn a worker that will fail to import a dependant module
|
||||
new Worker(workerURLFailOnLoad);
|
||||
|
||||
process.on('exit', (code) => {
|
||||
console.log(`process exit code: ${code}`)
|
||||
});
|
12
test/fixtures/es-module-loaders/worker-log-fail-worker-resolve.mjs
vendored
Normal file
12
test/fixtures/es-module-loaders/worker-log-fail-worker-resolve.mjs
vendored
Normal file
@ -0,0 +1,12 @@
|
||||
import { Worker } from 'worker_threads';
|
||||
import { foo } from './module-named-exports.mjs';
|
||||
|
||||
const workerURLFailOnResolve = new URL('./worker-fail-on-resolve.mjs', import.meta.url);
|
||||
console.log(foo);
|
||||
|
||||
// Spawn a worker that will fail to import a dependant module
|
||||
new Worker(workerURLFailOnResolve);
|
||||
|
||||
process.on('exit', (code) => {
|
||||
console.log(`process exit code: ${code}`)
|
||||
});
|
9
test/fixtures/es-module-loaders/worker-log.mjs
vendored
Normal file
9
test/fixtures/es-module-loaders/worker-log.mjs
vendored
Normal file
@ -0,0 +1,9 @@
|
||||
import { Worker } from 'worker_threads';
|
||||
import { foo } from './module-named-exports.mjs';
|
||||
|
||||
const workerURL = new URL('./worker-log-again.mjs', import.meta.url);
|
||||
console.log(foo);
|
||||
|
||||
// Spawn two workers
|
||||
new Worker(workerURL);
|
||||
new Worker(workerURL);
|
7
test/fixtures/es-module-loaders/workers-spawned.mjs
vendored
Normal file
7
test/fixtures/es-module-loaders/workers-spawned.mjs
vendored
Normal file
@ -0,0 +1,7 @@
|
||||
import { Worker } from 'worker_threads';
|
||||
|
||||
const workerURL = new URL('./worker-log.mjs', import.meta.url);
|
||||
|
||||
// Spawn two workers
|
||||
new Worker(workerURL);
|
||||
new Worker(workerURL);
|
Loading…
Reference in New Issue
Block a user