fix(ext/node): emit online event after worker thread is initialized (#25243)

Fixes #23281. Part of #20613.

We were emitting the `online` event in the constructor, so the caller
could never receive it (since there was no time for them to add a
listener). Instead, emit the event where it's intended – after the
worker is initialized.

---

After this parcel no longer freezes, but still will fail due to other
bugs (which will be fixed in other PRs)
This commit is contained in:
Nathan Whitaker 2024-08-27 20:05:32 -07:00 committed by Luca Casonato
parent e3c4651676
commit cb0c23a7f1
No known key found for this signature in database
GPG Key ID: 01A83EB62563811F
2 changed files with 65 additions and 3 deletions

View File

@ -52,6 +52,16 @@ function debugWT(...args) {
}
}
interface WorkerOnlineMsg {
type: "WORKER_ONLINE";
}
function isWorkerOnlineMsg(data: unknown): data is WorkerOnlineMsg {
return typeof data === "object" && data !== null &&
ObjectHasOwn(data, "type") &&
(data as { "type": unknown })["type"] === "WORKER_ONLINE";
}
export interface WorkerOptions {
// only for typings
argv?: unknown[];
@ -81,6 +91,7 @@ class NodeWorker extends EventEmitter {
#refCount = 1;
#messagePromise = undefined;
#controlPromise = undefined;
#workerOnline = false;
// "RUNNING" | "CLOSED" | "TERMINATED"
// "TERMINATED" means that any controls or messages received will be
// discarded. "CLOSED" means that we have received a control
@ -141,6 +152,7 @@ class NodeWorker extends EventEmitter {
workerData: options?.workerData,
environmentData: environmentData,
env: env_,
isWorkerThread: true,
}, options?.transferList ?? []);
const id = op_create_worker(
{
@ -159,8 +171,6 @@ class NodeWorker extends EventEmitter {
this.threadId = id;
this.#pollControl();
this.#pollMessages();
// https://nodejs.org/api/worker_threads.html#event-online
this.emit("online");
}
[privateWorkerRef](ref) {
@ -243,7 +253,17 @@ class NodeWorker extends EventEmitter {
this.emit("messageerror", err);
return;
}
this.emit("message", message);
if (
// only emit "online" event once, and since the message
// has to come before user messages, we are safe to assume
// it came from us
!this.#workerOnline && isWorkerOnlineMsg(message)
) {
this.#workerOnline = true;
this.emit("online");
} else {
this.emit("message", message);
}
}
};
@ -358,10 +378,12 @@ internals.__initWorkerThreads = (
parentPort = globalThis as ParentPort;
threadId = workerId;
let isWorkerThread = false;
if (maybeWorkerMetadata) {
const { 0: metadata, 1: _ } = maybeWorkerMetadata;
workerData = metadata.workerData;
environmentData = metadata.environmentData;
isWorkerThread = metadata.isWorkerThread;
const env = metadata.env;
if (env) {
process.env = env;
@ -425,6 +447,15 @@ internals.__initWorkerThreads = (
parentPort.ref = () => {
parentPort[unrefPollForMessages] = false;
};
if (isWorkerThread) {
// Notify the host that the worker is online
parentPort.postMessage(
{
type: "WORKER_ONLINE",
} satisfies WorkerOnlineMsg,
);
}
}
};

View File

@ -590,3 +590,34 @@ Deno.test({
channel.port2.close();
},
});
Deno.test({
name: "[node/worker_threads] Emits online event",
async fn() {
const worker = new workerThreads.Worker(
`
import { parentPort } from "node:worker_threads";
const p = Promise.withResolvers();
let ok = false;
parentPort.on("message", () => {
ok = true;
p.resolve();
});
await Promise.race([p.promise, new Promise(resolve => setTimeout(resolve, 20000))]);
if (ok) {
parentPort.postMessage("ok");
} else {
parentPort.postMessage("timed out");
}
`,
{
eval: true,
},
);
worker.on("online", () => {
worker.postMessage("ok");
});
assertEquals((await once(worker, "message"))[0], "ok");
worker.terminate();
},
});