mirror of
https://github.com/nodejs/node.git
synced 2024-11-21 10:59:27 +00:00
worker: add postMessageToThread
PR-URL: https://github.com/nodejs/node/pull/53682 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Gerhard Stöbich <deb2001-github@yahoo.de> Reviewed-By: Marco Ippolito <marcoippolito54@gmail.com>
This commit is contained in:
parent
b9289a6e29
commit
66a635cece
@ -3104,6 +3104,54 @@ The `Worker` instance terminated because it reached its memory limit.
|
||||
The path for the main script of a worker is neither an absolute path
|
||||
nor a relative path starting with `./` or `../`.
|
||||
|
||||
<a id="ERR_WORKER_MESSAGING_ERRORED"></a>
|
||||
|
||||
### `ERR_WORKER_MESSAGING_ERRORED`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1.1 - Active development
|
||||
|
||||
The destination thread threw an error while processing a message sent via [`postMessageToThread()`][].
|
||||
|
||||
<a id="ERR_WORKER_MESSAGING_FAILED"></a>
|
||||
|
||||
### `ERR_WORKER_MESSAGING_FAILED`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1.1 - Active development
|
||||
|
||||
The thread requested in [`postMessageToThread()`][] is invalid or has no `workerMessage` listener.
|
||||
|
||||
<a id="ERR_WORKER_MESSAGING_SAME_THREAD"></a>
|
||||
|
||||
### `ERR_WORKER_MESSAGING_SAME_THREAD`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1.1 - Active development
|
||||
|
||||
The thread id requested in [`postMessageToThread()`][] is the current thread id.
|
||||
|
||||
<a id="ERR_WORKER_MESSAGING_TIMEOUT"></a>
|
||||
|
||||
### `ERR_WORKER_MESSAGING_TIMEOUT`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1.1 - Active development
|
||||
|
||||
Sending a message via [`postMessageToThread()`][] timed out.
|
||||
|
||||
<a id="ERR_WORKER_UNSERIALIZABLE_ERROR"></a>
|
||||
|
||||
### `ERR_WORKER_UNSERIALIZABLE_ERROR`
|
||||
@ -4027,6 +4075,7 @@ An error occurred trying to allocate memory. This should never happen.
|
||||
[`new URLSearchParams(iterable)`]: url.md#new-urlsearchparamsiterable
|
||||
[`package.json`]: packages.md#nodejs-packagejson-field-definitions
|
||||
[`postMessage()`]: worker_threads.md#portpostmessagevalue-transferlist
|
||||
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
|
||||
[`process.on('exit')`]: process.md#event-exit
|
||||
[`process.send()`]: process.md#processsendmessage-sendhandle-options-callback
|
||||
[`process.setUncaughtExceptionCaptureCallback()`]: process.md#processsetuncaughtexceptioncapturecallbackfn
|
||||
|
@ -327,6 +327,18 @@ possible to record such errors in an error log, either periodically (which is
|
||||
likely best for long-running application) or upon process exit (which is likely
|
||||
most convenient for scripts).
|
||||
|
||||
### Event: `'workerMessage'`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
* `value` {any} A value transmitted using [`postMessageToThread()`][].
|
||||
* `source` {number} The transmitting worker thread ID or `0` for the main thread.
|
||||
|
||||
The `'workerMessage'` event is emitted for any incoming message send by the other
|
||||
party by using [`postMessageToThread()`][].
|
||||
|
||||
### Event: `'uncaughtException'`
|
||||
|
||||
<!-- YAML
|
||||
@ -4073,6 +4085,7 @@ cases:
|
||||
[`net.Server`]: net.md#class-netserver
|
||||
[`net.Socket`]: net.md#class-netsocket
|
||||
[`os.constants.dlopen`]: os.md#dlopen-constants
|
||||
[`postMessageToThread()`]: worker_threads.md#workerpostmessagetothreadthreadid-value-transferlist-timeout
|
||||
[`process.argv`]: #processargv
|
||||
[`process.config`]: #processconfig
|
||||
[`process.execPath`]: #processexecpath
|
||||
|
@ -252,6 +252,118 @@ if (isMainThread) {
|
||||
}
|
||||
```
|
||||
|
||||
## `worker.postMessageToThread(threadId, value[, transferList][, timeout])`
|
||||
|
||||
<!-- YAML
|
||||
added: REPLACEME
|
||||
-->
|
||||
|
||||
> Stability: 1.1 - Active development
|
||||
|
||||
* `destination` {number} The target thread ID. If the thread ID is invalid, a
|
||||
[`ERR_WORKER_MESSAGING_FAILED`][] error will be thrown. If the target thread ID is the current thread ID,
|
||||
a [`ERR_WORKER_MESSAGING_SAME_THREAD`][] error will be thrown.
|
||||
* `value` {any} The value to send.
|
||||
* `transferList` {Object\[]} If one or more `MessagePort`-like objects are passed in `value`,
|
||||
a `transferList` is required for those items or [`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`][] is thrown.
|
||||
See [`port.postMessage()`][] for more information.
|
||||
* `timeout` {number} Time to wait for the message to be delivered in milliseconds.
|
||||
By default it's `undefined`, which means wait forever. If the operation times out,
|
||||
a [`ERR_WORKER_MESSAGING_TIMEOUT`][] error is thrown.
|
||||
* Returns: {Promise} A promise which is fulfilled if the message was successfully processed by destination thread.
|
||||
|
||||
Sends a value to another worker, identified by its thread ID.
|
||||
|
||||
If the target thread has no listener for the `workerMessage` event, then the operation will throw
|
||||
a [`ERR_WORKER_MESSAGING_FAILED`][] error.
|
||||
|
||||
If the target thread threw an error while processing the `workerMessage` event, then the operation will throw
|
||||
a [`ERR_WORKER_MESSAGING_ERRORED`][] error.
|
||||
|
||||
This method should be used when the target thread is not the direct
|
||||
parent or child of the current thread.
|
||||
If the two threads are parent-children, use the [`require('node:worker_threads').parentPort.postMessage()`][]
|
||||
and the [`worker.postMessage()`][] to let the threads communicate.
|
||||
|
||||
The example below shows the use of of `postMessageToThread`: it creates 10 nested threads,
|
||||
the last one will try to communicate with the main thread.
|
||||
|
||||
```mjs
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import { once } from 'node:events';
|
||||
import process from 'node:process';
|
||||
import {
|
||||
isMainThread,
|
||||
postMessageToThread,
|
||||
threadId,
|
||||
workerData,
|
||||
Worker,
|
||||
} from 'node:worker_threads';
|
||||
|
||||
const channel = new BroadcastChannel('sync');
|
||||
const level = workerData?.level ?? 0;
|
||||
|
||||
if (level < 10) {
|
||||
const worker = new Worker(fileURLToPath(import.meta.url), {
|
||||
workerData: { level: level + 1 },
|
||||
});
|
||||
}
|
||||
|
||||
if (level === 0) {
|
||||
process.on('workerMessage', (value, source) => {
|
||||
console.log(`${source} -> ${threadId}:`, value);
|
||||
postMessageToThread(source, { message: 'pong' });
|
||||
});
|
||||
} else if (level === 10) {
|
||||
process.on('workerMessage', (value, source) => {
|
||||
console.log(`${source} -> ${threadId}:`, value);
|
||||
channel.postMessage('done');
|
||||
channel.close();
|
||||
});
|
||||
|
||||
await postMessageToThread(0, { message: 'ping' });
|
||||
}
|
||||
|
||||
channel.onmessage = channel.close;
|
||||
```
|
||||
|
||||
```cjs
|
||||
const { once } = require('node:events');
|
||||
const {
|
||||
isMainThread,
|
||||
postMessageToThread,
|
||||
threadId,
|
||||
workerData,
|
||||
Worker,
|
||||
} = require('node:worker_threads');
|
||||
|
||||
const channel = new BroadcastChannel('sync');
|
||||
const level = workerData?.level ?? 0;
|
||||
|
||||
if (level < 10) {
|
||||
const worker = new Worker(__filename, {
|
||||
workerData: { level: level + 1 },
|
||||
});
|
||||
}
|
||||
|
||||
if (level === 0) {
|
||||
process.on('workerMessage', (value, source) => {
|
||||
console.log(`${source} -> ${threadId}:`, value);
|
||||
postMessageToThread(source, { message: 'pong' });
|
||||
});
|
||||
} else if (level === 10) {
|
||||
process.on('workerMessage', (value, source) => {
|
||||
console.log(`${source} -> ${threadId}:`, value);
|
||||
channel.postMessage('done');
|
||||
channel.close();
|
||||
});
|
||||
|
||||
postMessageToThread(0, { message: 'ping' });
|
||||
}
|
||||
|
||||
channel.onmessage = channel.close;
|
||||
```
|
||||
|
||||
## `worker.receiveMessageOnPort(port)`
|
||||
|
||||
<!-- YAML
|
||||
@ -1399,6 +1511,10 @@ thread spawned will spawn another until the application crashes.
|
||||
[`Buffer.allocUnsafe()`]: buffer.md#static-method-bufferallocunsafesize
|
||||
[`Buffer`]: buffer.md
|
||||
[`ERR_MISSING_MESSAGE_PORT_IN_TRANSFER_LIST`]: errors.md#err_missing_message_port_in_transfer_list
|
||||
[`ERR_WORKER_MESSAGING_ERRORED`]: errors.md#err_worker_messaging_errored
|
||||
[`ERR_WORKER_MESSAGING_FAILED`]: errors.md#err_worker_messaging_failed
|
||||
[`ERR_WORKER_MESSAGING_SAME_THREAD`]: errors.md#err_worker_messaging_same_thread
|
||||
[`ERR_WORKER_MESSAGING_TIMEOUT`]: errors.md#err_worker_messaging_timeout
|
||||
[`ERR_WORKER_NOT_RUNNING`]: errors.md#err_worker_not_running
|
||||
[`EventTarget`]: https://developer.mozilla.org/en-US/docs/Web/API/EventTarget
|
||||
[`FileHandle`]: fs.md#class-filehandle
|
||||
|
@ -1866,6 +1866,10 @@ E('ERR_WORKER_INIT_FAILED', 'Worker initialization failure: %s', Error);
|
||||
E('ERR_WORKER_INVALID_EXEC_ARGV', (errors, msg = 'invalid execArgv flags') =>
|
||||
`Initiated Worker with ${msg}: ${ArrayPrototypeJoin(errors, ', ')}`,
|
||||
Error);
|
||||
E('ERR_WORKER_MESSAGING_ERRORED', 'The destination thread threw an error while processing the message', Error);
|
||||
E('ERR_WORKER_MESSAGING_FAILED', 'Cannot find the destination thread or listener', Error);
|
||||
E('ERR_WORKER_MESSAGING_SAME_THREAD', 'Cannot sent a message to the same thread', Error);
|
||||
E('ERR_WORKER_MESSAGING_TIMEOUT', 'Sending a message to another thread timed out', Error);
|
||||
E('ERR_WORKER_NOT_RUNNING', 'Worker instance not running', Error);
|
||||
E('ERR_WORKER_OUT_OF_MEMORY',
|
||||
'Worker terminated due to reaching memory limit: %s', Error);
|
||||
|
@ -44,6 +44,8 @@ const {
|
||||
kStdioWantsMoreDataCallback,
|
||||
} = workerIo;
|
||||
|
||||
const { setupMainThreadPort } = require('internal/worker/messaging');
|
||||
|
||||
const {
|
||||
onGlobalUncaughtException,
|
||||
} = require('internal/process/execution');
|
||||
@ -96,6 +98,7 @@ port.on('message', (message) => {
|
||||
hasStdin,
|
||||
publicPort,
|
||||
workerData,
|
||||
mainThreadPort,
|
||||
} = message;
|
||||
|
||||
if (doEval !== 'internal') {
|
||||
@ -109,6 +112,7 @@ port.on('message', (message) => {
|
||||
}
|
||||
|
||||
require('internal/worker').assignEnvironmentData(environmentData);
|
||||
setupMainThreadPort(mainThreadPort);
|
||||
|
||||
if (SharedArrayBuffer !== undefined) {
|
||||
// The counter is only passed to the workers created by the main thread,
|
||||
|
@ -56,6 +56,7 @@ const {
|
||||
ReadableWorkerStdio,
|
||||
WritableWorkerStdio,
|
||||
} = workerIo;
|
||||
const { createMainThreadPort, destroyMainThreadPort } = require('internal/worker/messaging');
|
||||
const { deserializeError } = require('internal/error_serdes');
|
||||
const { fileURLToPath, isURL, pathToFileURL } = require('internal/url');
|
||||
const { kEmptyObject } = require('internal/util');
|
||||
@ -250,14 +251,18 @@ class Worker extends EventEmitter {
|
||||
|
||||
this[kParentSideStdio] = { stdin, stdout, stderr };
|
||||
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
const transferList = [port2];
|
||||
const mainThreadPortToWorker = createMainThreadPort(this.threadId);
|
||||
const {
|
||||
port1: publicPortToParent,
|
||||
port2: publicPortToWorker,
|
||||
} = new MessageChannel();
|
||||
const transferList = [mainThreadPortToWorker, publicPortToWorker];
|
||||
// If transferList is provided.
|
||||
if (options.transferList)
|
||||
ArrayPrototypePush(transferList,
|
||||
...new SafeArrayIterator(options.transferList));
|
||||
|
||||
this[kPublicPort] = port1;
|
||||
this[kPublicPort] = publicPortToParent;
|
||||
ArrayPrototypeForEach(['message', 'messageerror'], (event) => {
|
||||
this[kPublicPort].on(event, (message) => this.emit(event, message));
|
||||
});
|
||||
@ -271,8 +276,9 @@ class Worker extends EventEmitter {
|
||||
cwdCounter: cwdCounter || workerIo.sharedCwdCounter,
|
||||
workerData: options.workerData,
|
||||
environmentData,
|
||||
publicPort: port2,
|
||||
hasStdin: !!options.stdin,
|
||||
publicPort: publicPortToWorker,
|
||||
mainThreadPort: mainThreadPortToWorker,
|
||||
}, transferList);
|
||||
// Use this to cache the Worker's loopStart value once available.
|
||||
this[kLoopStartTime] = -1;
|
||||
@ -295,6 +301,7 @@ class Worker extends EventEmitter {
|
||||
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
|
||||
drainMessagePort(this[kPublicPort]);
|
||||
drainMessagePort(this[kPort]);
|
||||
destroyMainThreadPort(this.threadId);
|
||||
this.removeAllListeners('message');
|
||||
this.removeAllListeners('messageerrors');
|
||||
this[kPublicPort].unref();
|
||||
|
242
lib/internal/worker/messaging.js
Normal file
242
lib/internal/worker/messaging.js
Normal file
@ -0,0 +1,242 @@
|
||||
'use strict';
|
||||
|
||||
const {
|
||||
AtomicsNotify,
|
||||
AtomicsStore,
|
||||
AtomicsWaitAsync,
|
||||
Int32Array,
|
||||
SafeMap,
|
||||
globalThis,
|
||||
} = primordials;
|
||||
|
||||
const {
|
||||
SharedArrayBuffer,
|
||||
} = globalThis;
|
||||
|
||||
const {
|
||||
isMainThread,
|
||||
threadId: currentThreadId,
|
||||
} = internalBinding('worker');
|
||||
|
||||
const {
|
||||
codes: {
|
||||
ERR_WORKER_MESSAGING_ERRORED,
|
||||
ERR_WORKER_MESSAGING_FAILED,
|
||||
ERR_WORKER_MESSAGING_SAME_THREAD,
|
||||
ERR_WORKER_MESSAGING_TIMEOUT,
|
||||
},
|
||||
} = require('internal/errors');
|
||||
|
||||
const { MessageChannel } = require('internal/worker/io');
|
||||
|
||||
const { validateNumber } = require('internal/validators');
|
||||
|
||||
const messageTypes = {
|
||||
REGISTER_MAIN_THREAD_PORT: 'registerMainThreadPort',
|
||||
UNREGISTER_MAIN_THREAD_PORT: 'unregisterMainThreadPort',
|
||||
SEND_MESSAGE_TO_WORKER: 'sendMessageToWorker',
|
||||
RECEIVE_MESSAGE_FROM_WORKER: 'receiveMessageFromWorker',
|
||||
};
|
||||
|
||||
// This is only populated by main thread and always empty in other threads
|
||||
const threadsPorts = new SafeMap();
|
||||
|
||||
// This is only populated in child threads and always undefined in main thread
|
||||
let mainThreadPort;
|
||||
|
||||
// SharedArrayBuffer must always be Int32, so it's * 4.
|
||||
// We need one for the operation status (performing / performed) and one for the result (success / failure).
|
||||
const WORKER_MESSAGING_SHARED_DATA = 2 * 4;
|
||||
const WORKER_MESSAGING_STATUS_INDEX = 0;
|
||||
const WORKER_MESSAGING_RESULT_INDEX = 1;
|
||||
|
||||
// Response codes
|
||||
const WORKER_MESSAGING_RESULT_DELIVERED = 0;
|
||||
const WORKER_MESSAGING_RESULT_NO_LISTENERS = 1;
|
||||
const WORKER_MESSAGING_RESULT_LISTENER_ERROR = 2;
|
||||
|
||||
// This event handler is always executed on the main thread only
|
||||
function handleMessageFromThread(message) {
|
||||
switch (message.type) {
|
||||
case messageTypes.REGISTER_MAIN_THREAD_PORT:
|
||||
{
|
||||
const { threadId, port } = message;
|
||||
|
||||
// Register the port
|
||||
threadsPorts.set(threadId, port);
|
||||
|
||||
// Handle messages on this port
|
||||
// When a new thread wants to register a children
|
||||
// this take care of doing that.
|
||||
// This way any thread can be linked to the main one.
|
||||
port.on('message', handleMessageFromThread);
|
||||
|
||||
// Never block the thread on this port
|
||||
port.unref();
|
||||
}
|
||||
|
||||
break;
|
||||
case messageTypes.UNREGISTER_MAIN_THREAD_PORT:
|
||||
threadsPorts.get(message.threadId).close();
|
||||
threadsPorts.delete(message.threadId);
|
||||
break;
|
||||
case messageTypes.SEND_MESSAGE_TO_WORKER:
|
||||
{
|
||||
// Send the message to the target thread
|
||||
const { source, destination, value, transferList, memory } = message;
|
||||
sendMessageToWorker(source, destination, value, transferList, memory);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
function handleMessageFromMainThread(message) {
|
||||
switch (message.type) {
|
||||
case messageTypes.RECEIVE_MESSAGE_FROM_WORKER:
|
||||
receiveMessageFromWorker(message.source, message.value, message.memory);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
function sendMessageToWorker(source, destination, value, transferList, memory) {
|
||||
// We are on the main thread, we can directly process the message
|
||||
if (destination === 0) {
|
||||
receiveMessageFromWorker(source, value, memory);
|
||||
return;
|
||||
}
|
||||
|
||||
// Search the port to the target thread
|
||||
const port = threadsPorts.get(destination);
|
||||
|
||||
if (!port) {
|
||||
const status = new Int32Array(memory);
|
||||
AtomicsStore(status, WORKER_MESSAGING_RESULT_INDEX, WORKER_MESSAGING_RESULT_NO_LISTENERS);
|
||||
AtomicsStore(status, WORKER_MESSAGING_STATUS_INDEX, 1);
|
||||
AtomicsNotify(status, WORKER_MESSAGING_STATUS_INDEX, 1);
|
||||
return;
|
||||
}
|
||||
|
||||
port.postMessage(
|
||||
{
|
||||
type: messageTypes.RECEIVE_MESSAGE_FROM_WORKER,
|
||||
source,
|
||||
destination,
|
||||
value,
|
||||
memory,
|
||||
},
|
||||
transferList,
|
||||
);
|
||||
}
|
||||
|
||||
function receiveMessageFromWorker(source, value, memory) {
|
||||
let response = WORKER_MESSAGING_RESULT_NO_LISTENERS;
|
||||
|
||||
try {
|
||||
if (process.emit('workerMessage', value, source)) {
|
||||
response = WORKER_MESSAGING_RESULT_DELIVERED;
|
||||
}
|
||||
} catch {
|
||||
response = WORKER_MESSAGING_RESULT_LISTENER_ERROR;
|
||||
}
|
||||
|
||||
// Populate the result
|
||||
const status = new Int32Array(memory);
|
||||
AtomicsStore(status, WORKER_MESSAGING_RESULT_INDEX, response);
|
||||
AtomicsStore(status, WORKER_MESSAGING_STATUS_INDEX, 1);
|
||||
AtomicsNotify(status, WORKER_MESSAGING_STATUS_INDEX, 1);
|
||||
}
|
||||
|
||||
function createMainThreadPort(threadId) {
|
||||
// Create a channel that links the new thread to the main thread
|
||||
const {
|
||||
port1: mainThreadPortToMain,
|
||||
port2: mainThreadPortToThread,
|
||||
} = new MessageChannel();
|
||||
|
||||
const registrationMessage = {
|
||||
type: messageTypes.REGISTER_MAIN_THREAD_PORT,
|
||||
threadId,
|
||||
port: mainThreadPortToMain,
|
||||
};
|
||||
|
||||
if (isMainThread) {
|
||||
handleMessageFromThread(registrationMessage);
|
||||
} else {
|
||||
mainThreadPort.postMessage(registrationMessage, [mainThreadPortToMain]);
|
||||
}
|
||||
|
||||
return mainThreadPortToThread;
|
||||
}
|
||||
|
||||
function destroyMainThreadPort(threadId) {
|
||||
const unregistrationMessage = {
|
||||
type: messageTypes.UNREGISTER_MAIN_THREAD_PORT,
|
||||
threadId,
|
||||
};
|
||||
|
||||
if (isMainThread) {
|
||||
handleMessageFromThread(unregistrationMessage);
|
||||
} else {
|
||||
mainThreadPort.postMessage(unregistrationMessage);
|
||||
}
|
||||
}
|
||||
|
||||
function setupMainThreadPort(port) {
|
||||
mainThreadPort = port;
|
||||
mainThreadPort.on('message', handleMessageFromMainThread);
|
||||
|
||||
// Never block the process on this port
|
||||
mainThreadPort.unref();
|
||||
}
|
||||
|
||||
async function postMessageToThread(threadId, value, transferList, timeout) {
|
||||
if (typeof transferList === 'number' && typeof timeout === 'undefined') {
|
||||
timeout = transferList;
|
||||
transferList = [];
|
||||
}
|
||||
|
||||
if (typeof timeout !== 'undefined') {
|
||||
validateNumber(timeout, 'timeout', 0);
|
||||
}
|
||||
|
||||
if (threadId === currentThreadId) {
|
||||
throw new ERR_WORKER_MESSAGING_SAME_THREAD();
|
||||
}
|
||||
|
||||
const memory = new SharedArrayBuffer(WORKER_MESSAGING_SHARED_DATA);
|
||||
const status = new Int32Array(memory);
|
||||
const promise = AtomicsWaitAsync(status, WORKER_MESSAGING_STATUS_INDEX, 0, timeout).value;
|
||||
|
||||
const message = {
|
||||
type: messageTypes.SEND_MESSAGE_TO_WORKER,
|
||||
source: currentThreadId,
|
||||
destination: threadId,
|
||||
value,
|
||||
memory,
|
||||
transferList,
|
||||
};
|
||||
|
||||
if (isMainThread) {
|
||||
handleMessageFromThread(message);
|
||||
} else {
|
||||
mainThreadPort.postMessage(message, transferList);
|
||||
}
|
||||
|
||||
// Wait for the response
|
||||
const response = await promise;
|
||||
|
||||
if (response === 'timed-out') {
|
||||
throw new ERR_WORKER_MESSAGING_TIMEOUT();
|
||||
} else if (status[WORKER_MESSAGING_RESULT_INDEX] === WORKER_MESSAGING_RESULT_NO_LISTENERS) {
|
||||
throw new ERR_WORKER_MESSAGING_FAILED();
|
||||
} else if (status[WORKER_MESSAGING_RESULT_INDEX] === WORKER_MESSAGING_RESULT_LISTENER_ERROR) {
|
||||
throw new ERR_WORKER_MESSAGING_ERRORED();
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
createMainThreadPort,
|
||||
destroyMainThreadPort,
|
||||
setupMainThreadPort,
|
||||
postMessageToThread,
|
||||
};
|
@ -18,6 +18,10 @@ const {
|
||||
BroadcastChannel,
|
||||
} = require('internal/worker/io');
|
||||
|
||||
const {
|
||||
postMessageToThread,
|
||||
} = require('internal/worker/messaging');
|
||||
|
||||
const {
|
||||
markAsUntransferable,
|
||||
isMarkedAsUntransferable,
|
||||
@ -32,6 +36,7 @@ module.exports = {
|
||||
moveMessagePortToContext,
|
||||
receiveMessageOnPort,
|
||||
resourceLimits,
|
||||
postMessageToThread,
|
||||
threadId,
|
||||
SHARE_ENV,
|
||||
Worker,
|
||||
|
@ -141,6 +141,7 @@ if (common.isMainThread) {
|
||||
'NativeModule internal/streams/writable',
|
||||
'NativeModule internal/worker',
|
||||
'NativeModule internal/worker/io',
|
||||
'NativeModule internal/worker/messaging',
|
||||
'NativeModule stream',
|
||||
'NativeModule stream/promises',
|
||||
'NativeModule string_decoder',
|
||||
|
@ -1,6 +1,5 @@
|
||||
'use strict';
|
||||
|
||||
// TODO@PI: Run all tests
|
||||
const common = require('../common');
|
||||
const assert = require('assert');
|
||||
const { createServer, request } = require('http');
|
||||
|
34
test/parallel/test-worker-messaging-errors-handler.js
Normal file
34
test/parallel/test-worker-messaging-errors-handler.js
Normal file
@ -0,0 +1,34 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const {
|
||||
parentPort,
|
||||
postMessageToThread,
|
||||
Worker,
|
||||
workerData,
|
||||
} = require('node:worker_threads');
|
||||
const { rejects } = require('node:assert');
|
||||
|
||||
async function test() {
|
||||
const worker = new Worker(__filename, { workerData: { children: true } });
|
||||
|
||||
await rejects(common.mustCall(function() {
|
||||
return postMessageToThread(worker.threadId);
|
||||
}), {
|
||||
name: 'Error',
|
||||
code: 'ERR_WORKER_MESSAGING_ERRORED',
|
||||
});
|
||||
|
||||
worker.postMessage('success');
|
||||
}
|
||||
|
||||
if (!workerData?.children) {
|
||||
test();
|
||||
} else {
|
||||
process.on('workerMessage', () => {
|
||||
throw new Error('KABOOM');
|
||||
});
|
||||
|
||||
parentPort.postMessage('ready');
|
||||
parentPort.once('message', common.mustCall());
|
||||
}
|
48
test/parallel/test-worker-messaging-errors-invalid.js
Normal file
48
test/parallel/test-worker-messaging-errors-invalid.js
Normal file
@ -0,0 +1,48 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const { once } = require('node:events');
|
||||
const {
|
||||
parentPort,
|
||||
postMessageToThread,
|
||||
threadId,
|
||||
Worker,
|
||||
workerData,
|
||||
} = require('node:worker_threads');
|
||||
const { rejects } = require('node:assert');
|
||||
|
||||
async function test() {
|
||||
await rejects(common.mustCall(function() {
|
||||
return postMessageToThread(threadId);
|
||||
}), {
|
||||
name: 'Error',
|
||||
code: 'ERR_WORKER_MESSAGING_SAME_THREAD',
|
||||
});
|
||||
|
||||
await rejects(common.mustCall(function() {
|
||||
return postMessageToThread(Date.now());
|
||||
}), {
|
||||
name: 'Error',
|
||||
code: 'ERR_WORKER_MESSAGING_FAILED',
|
||||
});
|
||||
|
||||
// The delivery to the first worker will fail as there is no listener for `workerMessage`
|
||||
const worker = new Worker(__filename, { workerData: { children: true } });
|
||||
await once(worker, 'message');
|
||||
|
||||
await rejects(common.mustCall(function() {
|
||||
return postMessageToThread(worker.threadId);
|
||||
}), {
|
||||
name: 'Error',
|
||||
code: 'ERR_WORKER_MESSAGING_FAILED',
|
||||
});
|
||||
|
||||
worker.postMessage('success');
|
||||
}
|
||||
|
||||
if (!workerData?.children) {
|
||||
test();
|
||||
} else {
|
||||
parentPort.postMessage('ready');
|
||||
parentPort.once('message', common.mustCall());
|
||||
}
|
38
test/parallel/test-worker-messaging-errors-timeout.js
Normal file
38
test/parallel/test-worker-messaging-errors-timeout.js
Normal file
@ -0,0 +1,38 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const {
|
||||
postMessageToThread,
|
||||
workerData,
|
||||
Worker,
|
||||
} = require('node:worker_threads');
|
||||
const { rejects } = require('node:assert');
|
||||
|
||||
const memory = new SharedArrayBuffer(4);
|
||||
|
||||
async function test() {
|
||||
const worker = new Worker(__filename, { workerData: { memory, children: true } });
|
||||
const array = new Int32Array(memory);
|
||||
|
||||
await rejects(common.mustCall(function() {
|
||||
return postMessageToThread(worker.threadId, 0, common.platformTimeout(500));
|
||||
}), {
|
||||
name: 'Error',
|
||||
code: 'ERR_WORKER_MESSAGING_TIMEOUT',
|
||||
});
|
||||
|
||||
Atomics.store(array, 0, 1);
|
||||
Atomics.notify(array, 0);
|
||||
}
|
||||
|
||||
if (!workerData?.children) {
|
||||
test();
|
||||
} else {
|
||||
process.on('beforeExit', common.mustCall());
|
||||
|
||||
const array = new Int32Array(workerData.memory);
|
||||
|
||||
// Starve this thread waiting for the status to be unlocked.
|
||||
// This happens in the main thread AFTER the timeout.
|
||||
Atomics.wait(array, 0, 0);
|
||||
}
|
112
test/parallel/test-worker-messaging.js
Normal file
112
test/parallel/test-worker-messaging.js
Normal file
@ -0,0 +1,112 @@
|
||||
'use strict';
|
||||
|
||||
const common = require('../common');
|
||||
const Countdown = require('../common/countdown');
|
||||
const {
|
||||
parentPort,
|
||||
postMessageToThread,
|
||||
threadId,
|
||||
workerData,
|
||||
Worker,
|
||||
} = require('node:worker_threads');
|
||||
const { strictEqual, deepStrictEqual } = require('node:assert');
|
||||
const { once } = require('node:events');
|
||||
|
||||
// Spawn threads on three levels: 1 main thread, two children, four grand childrens. 7 threads total, max id = 6
|
||||
const MAX_LEVEL = 2;
|
||||
const MAX_THREAD = 6;
|
||||
|
||||
// This is to allow the test to run in --worker mode
|
||||
const mainThread = workerData?.mainThread ?? threadId;
|
||||
const level = workerData?.level ?? 0;
|
||||
|
||||
const channel = new BroadcastChannel('nodejs:test-worker-connection');
|
||||
let completed;
|
||||
|
||||
if (level === 0) {
|
||||
completed = new Countdown(MAX_THREAD + 1, () => {
|
||||
channel.postMessage('exit');
|
||||
channel.close();
|
||||
});
|
||||
}
|
||||
|
||||
async function createChildren() {
|
||||
const worker = new Worker(__filename, { workerData: { mainThread, level: level + 1 } });
|
||||
await once(worker, 'message');
|
||||
}
|
||||
|
||||
async function ping() {
|
||||
let target;
|
||||
do {
|
||||
target = mainThread + Math.floor(Math.random() * MAX_THREAD);
|
||||
} while (target === threadId);
|
||||
|
||||
const { port1, port2 } = new MessageChannel();
|
||||
await postMessageToThread(target, { level, port: port2 }, [port2]);
|
||||
|
||||
port1.on('message', common.mustCall(function(message) {
|
||||
deepStrictEqual(message, { message: 'pong', source: target, destination: threadId });
|
||||
port1.close();
|
||||
|
||||
if (level === 0) {
|
||||
completed.dec();
|
||||
} else {
|
||||
channel.postMessage('end');
|
||||
}
|
||||
}));
|
||||
|
||||
port1.postMessage({ message: 'ping', source: threadId, destination: target });
|
||||
}
|
||||
|
||||
// Do not use mustCall here as the thread might not receive any connection request
|
||||
process.on('workerMessage', ({ port, level }, source) => {
|
||||
// Let's verify the source hierarchy
|
||||
// Given we do depth first, the level is 1 for thread 1 and 4, 2 for other threads
|
||||
if (source !== mainThread) {
|
||||
const currentThread = source - mainThread;
|
||||
strictEqual(level, (currentThread === 1 || currentThread === 4) ? 1 : 2);
|
||||
} else {
|
||||
strictEqual(level, 0);
|
||||
}
|
||||
|
||||
// Verify communication
|
||||
port.on('message', common.mustCall(function(message) {
|
||||
deepStrictEqual(message, { message: 'ping', source, destination: threadId });
|
||||
port.postMessage({ message: 'pong', source: threadId, destination: source });
|
||||
port.close();
|
||||
}));
|
||||
});
|
||||
|
||||
async function test() {
|
||||
if (level < MAX_LEVEL) {
|
||||
await createChildren();
|
||||
await createChildren();
|
||||
}
|
||||
|
||||
channel.onmessage = function(message) {
|
||||
switch (message.data) {
|
||||
case 'start':
|
||||
ping();
|
||||
break;
|
||||
case 'end':
|
||||
if (level === 0) {
|
||||
completed.dec();
|
||||
}
|
||||
break;
|
||||
case 'exit':
|
||||
channel.close();
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
if (level > 0) {
|
||||
const currentThread = threadId - mainThread;
|
||||
strictEqual(level, (currentThread === 1 || currentThread === 4) ? 1 : 2);
|
||||
parentPort.postMessage({ type: 'ready', threadId });
|
||||
} else {
|
||||
channel.postMessage('start');
|
||||
ping();
|
||||
}
|
||||
}
|
||||
|
||||
test();
|
Loading…
Reference in New Issue
Block a user