fix: revert "feat(node/cluster): cluster module for Node compat (#2271)" (#3111)

This commit is contained in:
Yoshiya Hinosawa 2023-01-15 22:21:22 +09:00 committed by GitHub
parent a11c957053
commit 13d469a191
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 100 additions and 2343 deletions

View File

@ -124,15 +124,15 @@ $ deno run --allow-read --allow-net --allow-write node/_tools/setup.ts -n
To run the tests you have set up, do the following:
```shellsession
$ deno test -A --unstable node/_tools/test.ts
```zsh
$ deno test --allow-read --allow-run node/_tools/test.ts
```
If you want to run specific Node.js test files, you can use the following
command
```shellsession
$ deno test -A --unstable node/_tools/test.ts -- <pattern-to-match>
$ deno test -A node/_tools/test.ts -- <pattern-to-match>
```
For example, if you want to run only
@ -140,14 +140,14 @@ For example, if you want to run only
use:
```shellsession
$ deno test -A --unstable node/_tools/test.ts -- test-event-emitter-check-listener-leaks.js
$ deno test -A node/_tools/test.ts -- test-event-emitter-check-listener-leaks.js
```
If you want to run all test files which contains `event-emitter` in filename,
then you can use:
```shellsession
$ deno test -A --unstable node/_tools/test.ts -- event-emitter
$ deno test -A node/_tools/test.ts -- event-emitter
```
The test should be passing with the latest deno, so if the test fails, try the

4
node/_events.d.ts vendored
View File

@ -289,10 +289,6 @@ interface StaticEventEmitterOptions {
* @since v0.1.26
*/
export class EventEmitter {
_events: any;
_eventsCount: any;
_maxListeners: any;
/**
* Alias for `emitter.on(eventName, listener)`.
* @since v0.1.26

View File

@ -8,7 +8,7 @@ Deno.test(
async () => {
const file = await Deno.makeTempFile();
try {
await Deno.chmod(file, 0o600);
Deno.chmod(file, 0o600);
await fs.promises.access(file, fs.constants.R_OK);
await fs.promises.access(file, fs.constants.W_OK);
await assertRejects(async () => {
@ -40,7 +40,7 @@ Deno.test(
() => {
const file = Deno.makeTempFileSync();
try {
Deno.chmodSync(file, 0o600);
Deno.chmod(file, 0o600);
fs.accessSync(file, fs.constants.R_OK);
fs.accessSync(file, fs.constants.W_OK);
assertThrows(() => {

View File

@ -3,7 +3,7 @@
NOTE: This file should not be manually edited. Please edit `config.json` and run
`deno task node:setup` instead.
Total: 2804
Total: 2811
- [abort/test-abort-backtrace.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-backtrace.js)
- [abort/test-abort-fatal-error.js](https://github.com/nodejs/node/tree/v18.12.1/test/abort/test-abort-fatal-error.js)
@ -363,6 +363,8 @@ Total: 2804
- [parallel/test-cluster-bind-privileged-port.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-bind-privileged-port.js)
- [parallel/test-cluster-bind-twice.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-bind-twice.js)
- [parallel/test-cluster-call-and-destroy.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-call-and-destroy.js)
- [parallel/test-cluster-child-index-dgram.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-child-index-dgram.js)
- [parallel/test-cluster-child-index-net.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-child-index-net.js)
- [parallel/test-cluster-concurrent-disconnect.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-concurrent-disconnect.js)
- [parallel/test-cluster-cwd.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-cwd.js)
- [parallel/test-cluster-dgram-1.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-dgram-1.js)
@ -372,8 +374,11 @@ Total: 2804
- [parallel/test-cluster-dgram-reuse.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-dgram-reuse.js)
- [parallel/test-cluster-disconnect-before-exit.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-before-exit.js)
- [parallel/test-cluster-disconnect-exitedAfterDisconnect-race.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-exitedAfterDisconnect-race.js)
- [parallel/test-cluster-disconnect-idle-worker.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-idle-worker.js)
- [parallel/test-cluster-disconnect-leak.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-leak.js)
- [parallel/test-cluster-disconnect-race.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-race.js)
- [parallel/test-cluster-disconnect-unshared-tcp.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-unshared-tcp.js)
- [parallel/test-cluster-disconnect-unshared-udp.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-unshared-udp.js)
- [parallel/test-cluster-disconnect-with-no-workers.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect-with-no-workers.js)
- [parallel/test-cluster-disconnect.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-disconnect.js)
- [parallel/test-cluster-eaccess.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-eaccess.js)
@ -398,6 +403,7 @@ Total: 2804
- [parallel/test-cluster-primary-error.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-primary-error.js)
- [parallel/test-cluster-primary-kill.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-primary-kill.js)
- [parallel/test-cluster-process-disconnect.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-process-disconnect.js)
- [parallel/test-cluster-rr-domain-listen.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-rr-domain-listen.js)
- [parallel/test-cluster-rr-ref.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-rr-ref.js)
- [parallel/test-cluster-send-deadlock.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-send-deadlock.js)
- [parallel/test-cluster-send-handle-twice.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-send-handle-twice.js)
@ -424,6 +430,7 @@ Total: 2804
- [parallel/test-cluster-worker-handle-close.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-handle-close.js)
- [parallel/test-cluster-worker-init.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-init.js)
- [parallel/test-cluster-worker-isconnected.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-isconnected.js)
- [parallel/test-cluster-worker-isdead.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-isdead.js)
- [parallel/test-cluster-worker-kill-signal.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-kill-signal.js)
- [parallel/test-cluster-worker-kill.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-kill.js)
- [parallel/test-cluster-worker-no-exit.js](https://github.com/nodejs/node/tree/v18.12.1/test/parallel/test-cluster-worker-no-exit.js)

View File

@ -202,13 +202,6 @@
"test-child-process-spawnsync-validation-errors.js",
"test-child-process-spawnsync.js",
"test-client-request-destroy.js",
"test-cluster-child-index-dgram.js",
"test-cluster-child-index-net.js",
"test-cluster-disconnect-idle-worker.js",
"test-cluster-disconnect-unshared-tcp.js",
"test-cluster-disconnect-unshared-udp.js",
"test-cluster-rr-domain-listen.js",
"test-cluster-worker-isdead.js",
"test-console-async-write-error.js",
"test-console-group.js",
"test-console-instance.js",

View File

@ -1,17 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
/**
* This module is used as an entry point for test files utilizing the cluster
* module which forks processes and cannot use `.ts` files due to
* incompatibility with Deno's Node module resolution.
* See https://github.com/denoland/deno/blob/main/cli/node/mod.rs#L725
*
* The idea is to emulate a CommonJS environment without having to modify
* the test files in any way
*
* Running with all permissions and unstable is recommended
*
* Usage: `deno run -A --unstable require.mjs my_commonjs_file.js`
*/
import "./require.ts";

View File

@ -88,7 +88,7 @@ for await (const path of testPaths) {
console.log(`Error: "${path}" failed`);
console.log(
"You can repeat only this test with the command:",
magenta(`deno test -A --unstable node/_tools/test.ts -- ${path}`),
magenta(`deno test -A node/_tools/test.ts -- ${path}`),
);
fail(decodedStderr);
}

View File

@ -1,47 +0,0 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
if (common.isWindows)
common.skip('dgram clustering is currently not supported on Windows.');
const cluster = require('cluster');
const dgram = require('dgram');
// Test an edge case when using `cluster` and `dgram.Socket.bind()`
// the port of `0`.
const kPort = 0;
function child() {
const kTime = 2;
const countdown = new Countdown(kTime * 2, () => {
process.exit(0);
});
for (let i = 0; i < kTime; i += 1) {
const socket = new dgram.Socket('udp4');
socket.bind(kPort, common.mustCall(() => {
// `process.nextTick()` or `socket2.close()` would throw
// ERR_SOCKET_DGRAM_NOT_RUNNING
process.nextTick(() => {
socket.close(countdown.dec());
const socket2 = new dgram.Socket('udp4');
socket2.bind(kPort, common.mustCall(() => {
process.nextTick(() => {
socket2.close(countdown.dec());
});
}));
});
}));
}
}
if (cluster.isMaster)
cluster.fork(__filename);
else
child();

View File

@ -1,38 +0,0 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
'use strict';
const common = require('../common');
const Countdown = require('../common/countdown');
const cluster = require('cluster');
const net = require('net');
// Test an edge case when using `cluster` and `net.Server.listen()` to
// the port of `0`.
const kPort = 0;
function child() {
const kTime = 2;
const countdown = new Countdown(kTime * 2, () => {
process.exit(0);
});
for (let i = 0; i < kTime; i += 1) {
const server = net.createServer();
server.listen(kPort, common.mustCall(() => {
server.close(countdown.dec());
const server2 = net.createServer();
server2.listen(kPort, common.mustCall(() => {
server2.close(countdown.dec());
}));
}));
}
}
if (cluster.isMaster)
cluster.fork(__filename);
else
child();

View File

@ -1,41 +0,0 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
const common = require('../common');
const assert = require('assert');
const cluster = require('cluster');
const fork = cluster.fork;
if (cluster.isPrimary) {
fork(); // It is intentionally called `fork` instead of
fork(); // `cluster.fork` to test that `this` is not used
cluster.disconnect(common.mustCall(() => {
assert.deepStrictEqual(Object.keys(cluster.workers), []);
}));
}

View File

@ -1,51 +0,0 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
require('../common');
process.env.NODE_CLUSTER_SCHED_POLICY = 'none';
const cluster = require('cluster');
const net = require('net');
if (cluster.isPrimary) {
const unbound = cluster.fork().on('online', bind);
function bind() {
cluster.fork({ BOUND: 'y' }).on('listening', disconnect);
}
function disconnect() {
unbound.disconnect();
unbound.on('disconnect', cluster.disconnect);
}
} else if (process.env.BOUND === 'y') {
const source = net.createServer();
source.listen(0);
}

View File

@ -1,54 +0,0 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
const common = require('../common');
if (common.isWindows)
common.skip('on windows, because clustered dgram is ENOTSUP');
const cluster = require('cluster');
const dgram = require('dgram');
if (cluster.isPrimary) {
const unbound = cluster.fork().on('online', bind);
function bind() {
cluster.fork({ BOUND: 'y' }).on('listening', disconnect);
}
function disconnect() {
unbound.disconnect();
unbound.on('disconnect', cluster.disconnect);
}
} else if (process.env.BOUND === 'y') {
const source = dgram.createSocket('udp4');
source.bind(0);
}

View File

@ -1,58 +0,0 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
'use strict';
require('../common');
const cluster = require('cluster');
const domain = require('domain');
// RR is the default for v0.11.9+ so the following line is redundant:
// cluster.schedulingPolicy = cluster.SCHED_RR;
if (cluster.isWorker) {
const d = domain.create();
d.run(() => {});
const http = require('http');
http.Server(() => {}).listen(0, '127.0.0.1');
} else if (cluster.isPrimary) {
// Kill worker when listening
cluster.on('listening', function() {
worker.kill();
});
// Kill process when worker is killed
cluster.on('exit', function() {
process.exit(0);
});
// Create worker
const worker = cluster.fork();
}

View File

@ -1,39 +0,0 @@
// deno-fmt-ignore-file
// deno-lint-ignore-file
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
// Taken from Node 18.12.1
// This file is automatically generated by "node/_tools/setup.ts". Do not modify this file manually
'use strict';
require('../common');
const cluster = require('cluster');
const assert = require('assert');
if (cluster.isPrimary) {
const worker = cluster.fork();
let workerDead = worker.isDead();
assert.ok(!workerDead,
`isDead() returned ${workerDead}. isDead() should return ` +
'false right after the worker has been created.');
worker.on('exit', function() {
workerDead = worker.isDead();
assert.ok(workerDead,
`isDead() returned ${workerDead}. After an event has been ` +
'emitted, isDead should return true');
});
worker.on('message', function(msg) {
if (msg === 'readyToDie') {
worker.kill();
}
});
} else if (cluster.isWorker) {
const workerDead = cluster.worker.isDead();
assert.ok(!workerDead,
`isDead() returned ${workerDead}. isDead() should return ` +
'false when called from within a worker');
process.send('readyToDie');
}

View File

@ -41,11 +41,7 @@ import { convertToValidSignal, kEmptyObject } from "./internal/util.mjs";
const MAX_BUFFER = 1024 * 1024;
export interface ForkOptions extends ChildProcessOptions {
execPath?: string | undefined;
execArgv?: string[] | undefined;
silent?: boolean | undefined;
}
type ForkOptions = ChildProcessOptions;
/**
* Spawns a new Node.js process + fork.
@ -54,15 +50,9 @@ export interface ForkOptions extends ChildProcessOptions {
* @param option
* @returns
*/
export function fork(modulePath: string, options?: ForkOptions): ChildProcess;
export function fork(
modulePath: string,
args?: ReadonlyArray<string>,
options?: ForkOptions,
): ChildProcess;
export function fork(
modulePath: string,
_args?: ReadonlyArray<string> | ForkOptions,
_args?: string[],
_options?: ForkOptions,
) {
validateString(modulePath, "modulePath");
@ -149,10 +139,9 @@ export function fork(
options.shell = false;
Object.assign(options.env ??= {}, {
DENO_DONT_USE_INTERNAL_NODE_COMPAT_STATE: (
// deno-lint-ignore no-explicit-any
Deno as any
).core.ops.op_npm_process_state(),
// deno-lint-ignore no-explicit-any
DENO_DONT_USE_INTERNAL_NODE_COMPAT_STATE: (Deno as any).core.ops
.op_npm_process_state(),
});
return spawn(options.execPath, args, options);

View File

@ -1,53 +1,69 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
import { cluster } from "./internal/cluster/cluster.ts";
import { initRoundRobinHandle } from "./internal/cluster/round_robin_handle.ts";
import { initSharedHandle } from "./internal/cluster/shared_handle.ts";
import { _createServerHandle, createServer } from "./net.ts";
import { notImplemented } from "./_utils.ts";
// Lazily initializes the cluster *Handle classes.
// This trick is necessary for avoiding circular dependencies between
// net and cluster modules.
initRoundRobinHandle(createServer);
initSharedHandle(_createServerHandle);
/** A Worker object contains all public information and method about a worker.
* In the primary it can be obtained using cluster.workers. In a worker it can
* be obtained using cluster.worker.
*/
export class Worker {
constructor() {
notImplemented("cluster.Worker.prototype.constructor");
}
}
/** Calls .disconnect() on each worker in cluster.workers. */
export function disconnected() {
notImplemented("cluster.disconnected");
}
/** Spawn a new worker process. */
export function fork() {
notImplemented("cluster.fork");
}
/** True if the process is a primary. This is determined by
* the process.env.NODE_UNIQUE_ID. If process.env.NODE_UNIQUE_ID is undefined,
* then isPrimary is true. */
export const isPrimary = undefined;
/** True if the process is not a primary (it is the negation of
* cluster.isPrimary). */
export const isWorker = undefined;
/** Deprecated alias for cluster.isPrimary. details. */
export const isMaster = isPrimary;
/** The scheduling policy, either cluster.SCHED_RR for round-robin or
* cluster.SCHED_NONE to leave it to the operating system. This is a global
* setting and effectively frozen once either the first worker is spawned, or
* .setupPrimary() is called, whichever comes first. */
export const schedulingPolicy = undefined;
/** The settings object */
export const settings = undefined;
/** Deprecated alias for .setupPrimary(). */
export function setupMaster() {
notImplemented("cluster.setupMaster");
}
/** setupPrimary is used to change the default 'fork' behavior. Once called,
* the settings will be present in cluster.settings. */
export function setupPrimary() {
notImplemented("cluster.setupPrimary");
}
/** A reference to the current worker object. Not available in the primary
* process. */
export const worker = undefined;
/** A hash that stores the active worker objects, keyed by id field. Makes it
* easy to loop through all the workers. It is only available in the primary
* process. */
export const workers = undefined;
const {
SCHED_NONE,
SCHED_RR,
export default {
Worker,
_events,
_eventsCount,
_maxListeners,
disconnect,
disconnected,
fork,
isMaster,
isPrimary,
isWorker,
isMaster,
schedulingPolicy,
settings,
setupMaster,
setupPrimary,
workers,
} = cluster;
export {
_events,
_eventsCount,
_maxListeners,
disconnect,
fork,
isMaster,
isPrimary,
isWorker,
SCHED_NONE,
SCHED_RR,
schedulingPolicy,
settings,
setupMaster,
setupPrimary,
Worker,
worker,
workers,
};
export default cluster;

View File

@ -42,8 +42,6 @@ import { kEmptyObject } from "./util.mjs";
import { getValidatedPath } from "./fs/utils.mjs";
import process from "../process.ts";
export const kChannelHandle = Symbol("kChannelHandle");
type NodeStdio = "pipe" | "overlapped" | "ignore" | "inherit" | "ipc";
type DenoStdio = "inherit" | "piped" | "null";
@ -133,7 +131,11 @@ export class ChildProcess extends EventEmitter {
#process!: Deno.ChildProcess;
#spawned = deferred<void>();
constructor(command: string, args?: string[], options?: ChildProcessOptions) {
constructor(
command: string,
args?: string[],
options?: ChildProcessOptions,
) {
super();
const {
@ -150,7 +152,11 @@ export class ChildProcess extends EventEmitter {
stderr = "pipe",
_channel, // TODO(kt3k): handle this correctly
] = normalizeStdioOption(stdio);
const [cmd, cmdArgs] = buildCommand(command, args || [], shell);
const [cmd, cmdArgs] = buildCommand(
command,
args || [],
shell,
);
this.spawnfile = cmd;
this.spawnargs = [cmd, ...cmdArgs];
@ -263,21 +269,10 @@ export class ChildProcess extends EventEmitter {
this.#process.unref();
}
get connected() {
warnNotImplemented("ChildProcess.prototype.connected");
return false;
}
disconnect() {
warnNotImplemented("ChildProcess.prototype.disconnect");
}
/** https://nodejs.org/api/child_process.html#subprocesssendmessage-sendhandle-options-callback */
send() {
warnNotImplemented("ChildProcess.prototype.send");
}
async #_waitForChildStreamsToClose() {
const promises = [] as Array<Promise<void>>;
if (this.stdin && !this.stdin.destroyed) {
@ -308,22 +303,6 @@ export class ChildProcess extends EventEmitter {
}
}
export function setupChannel(
// deno-lint-ignore no-explicit-any
_target: any,
// deno-lint-ignore no-explicit-any
_channel: any,
// deno-lint-ignore no-explicit-any
_serializationMode: any,
) {
notImplemented("child_process.setupChannel");
}
// deno-lint-ignore no-explicit-any
export function getValidStdio(_stdio: any, _sync: any) {
notImplemented("child_process.getValidStdio");
}
const supportedNodeStdioTypes: NodeStdio[] = ["pipe", "ignore", "inherit"];
function toDenoStdio(
pipe: NodeStdio | number | Stream | null | undefined,
@ -378,9 +357,7 @@ export interface ChildProcessOptions {
/**
* Environment variables passed to the child process.
*/
env?:
& InstanceType<ObjectConstructor>
& Record<string, string | number | boolean>;
env?: Record<string, string | number | boolean>;
/**
* This option defines child process's stdio configuration.
@ -1006,9 +983,6 @@ function toDenoArgs(args: string[]): string[] {
export default {
ChildProcess,
kChannelHandle,
setupChannel,
getValidStdio,
stdioStringToArray,
spawnSync,
};

View File

@ -1,337 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license.
import assert from "../assert.mjs";
import path from "../../path.ts";
import EventEmitter from "../../events.ts";
import { ownerSymbol } from "../async_hooks.ts";
import Worker from "./worker.ts";
import { internal, sendHelper } from "./utils.ts";
import process from "../../process.ts";
import type {
Cluster as ICluster,
Message,
Worker as IWorker,
WorkerClass,
} from "./types.ts";
const cluster: ICluster = new EventEmitter() as ICluster;
const handles = new Map();
const indexes = new Map();
const noop = Function.prototype;
(cluster.isWorker as boolean) = true;
(cluster.isMaster as boolean) = false; // Deprecated alias. Must be same as isPrimary.
(cluster.isPrimary as boolean) = false;
(cluster.worker as null) = null;
(cluster.Worker as WorkerClass) = Worker;
cluster._setupWorker = function () {
const worker = new Worker({
id: +process.env.NODE_UNIQUE_ID | 0,
process,
state: "online",
});
(cluster.worker as IWorker) = worker;
process.once("disconnect", () => {
worker.emit("disconnect");
if (!worker.exitedAfterDisconnect) {
// Unexpected disconnect, primary exited, or some such nastiness, so
// worker exits immediately.
process.exit(0);
}
});
process.on("internalMessage", internal(worker, onmessage));
send({ act: "online" });
// deno-lint-ignore no-explicit-any
function onmessage(message: Message, handle: any) {
if (message.act === "newconn") {
onconnection(message, handle);
} else if (message.act === "disconnect") {
Reflect.apply(_disconnect, worker, [true]);
}
}
};
// `obj` is a net#Server or a dgram#Socket object.
cluster._getServer = function (
// deno-lint-ignore no-explicit-any
obj: any,
options: {
address?: string | null;
port?: number | null;
addressType?: string | number | null;
fd?: number | null;
flags?: number | null;
},
// deno-lint-ignore no-explicit-any
cb: (err: number, handle: any | null) => void,
) {
let address = options.address;
// Resolve unix socket paths to absolute paths
if (
options.port! < 0 &&
typeof address === "string" &&
process.platform !== "win32"
) {
address = path.resolve(address);
}
const indexesKey = [
address,
options.port,
options.addressType,
options.fd,
].join(":");
let indexSet = indexes.get(indexesKey);
if (indexSet === undefined) {
indexSet = { nextIndex: 0, set: new Set() };
indexes.set(indexesKey, indexSet);
}
const index = indexSet.nextIndex++;
indexSet.set.add(index);
const message = {
act: "queryServer",
index,
data: null,
...options,
};
message.address = address;
// Set custom data on handle (i.e. tls tickets key)
// deno-lint-ignore no-explicit-any
if ((obj as any)._getServerData) {
// deno-lint-ignore no-explicit-any
message.data = (obj as any)._getServerData();
}
// deno-lint-ignore no-explicit-any
send(message, (reply: Record<string, unknown> | null, handle: any) => {
// deno-lint-ignore no-explicit-any
if (typeof (obj as any)._setServerData === "function") {
// deno-lint-ignore no-explicit-any
(obj as any)._setServerData(reply!.data);
}
if (handle) {
// Shared listen socket
shared(reply!, { handle, indexesKey, index }, cb);
} else {
// Round-robin.
rr(reply!, { indexesKey, index }, cb);
}
});
obj.once("listening", () => {
cluster.worker!.state = "listening";
const address = obj.address();
message.act = "listening";
message.port = address?.port || options.port;
send(message);
});
};
function removeIndexesKey(indexesKey: string, index: number) {
const indexSet = indexes.get(indexesKey);
if (!indexSet) {
return;
}
indexSet.set.delete(index);
if (indexSet.set.size === 0) {
indexes.delete(indexesKey);
}
}
// Shared listen socket.
function shared(
message: Message,
{
handle,
indexesKey,
index,
}: // deno-lint-ignore no-explicit-any
{ handle: any; indexesKey: string; index: number },
// deno-lint-ignore no-explicit-any
cb: (errno: number, handle: any) => void,
) {
const key = message.key;
// Monkey-patch the close() method so we can keep track of when it's
// closed. Avoids resource leaks when the handle is short-lived.
const close = handle.close;
handle.close = function () {
send({ act: "close", key });
handles.delete(key);
removeIndexesKey(indexesKey, index);
return Reflect.apply(close, handle, arguments);
};
assert(handles.has(key) === false);
handles.set(key, handle);
cb(message.errno, handle);
}
// Round-robin. Master distributes handles across workers.
function rr(
message: Message,
{ indexesKey, index }: { indexesKey: string; index: number },
// deno-lint-ignore no-explicit-any
cb: (errno: number, handle: any | null) => void,
) {
if (message.errno) {
return cb(message.errno, null);
}
let key = message.key;
function listen(_backlog: number) {
return 0;
}
function close() {
// lib/net.js treats server._handle.close() as effectively synchronous.
// That means there is a time window between the call to close() and
// the ack by the primary process in which we can still receive handles.
// onconnection() below handles that by sending those handles back to
// the primary.
if (key === undefined) {
return;
}
send({ act: "close", key });
handles.delete(key);
removeIndexesKey(indexesKey, index);
key = undefined;
}
function getsockname(out: Record<string, never>): number {
if (key) {
Object.assign(out, message.sockname);
}
return 0;
}
// Faux handle. Mimics a TCPWrap with just enough fidelity to get away
// with it. Fools net.Server into thinking that it's backed by a real
// handle. Use a noop function for ref() and unref() because the control
// channel is going to keep the worker alive anyway.
// deno-lint-ignore no-explicit-any
const handle: any = {
close,
listen,
ref: noop as () => void,
unref: noop as () => void,
};
if (message.sockname) {
handle.getsockname = getsockname; // TCP handles only.
}
assert(handles.has(key) === false);
handles.set(key, handle);
cb(0, handle);
}
// Round-robin connection.
// deno-lint-ignore no-explicit-any
function onconnection(message: Message, handle: any) {
const key = message.key;
const server = handles.get(key);
const accepted = server !== undefined;
send({ ack: message.seq, accepted });
if (accepted) {
server.onconnection(0, handle);
}
}
function send(message: Message, cb?: unknown) {
return sendHelper(process, message, null, cb);
}
function _disconnect(this: IWorker, primaryInitiated: boolean) {
this.exitedAfterDisconnect = true;
let waitingCount = 1;
function checkWaitingCount() {
waitingCount--;
if (waitingCount === 0) {
// If disconnect is worker initiated, wait for ack to be sure
// exitedAfterDisconnect is properly set in the primary, otherwise, if
// it's primary initiated there's no need to send the
// exitedAfterDisconnect message
if (primaryInitiated) {
// TODO(cmorten): remove type cast once process interface is completed.
// deno-lint-ignore no-explicit-any
(process as any).disconnect();
} else {
send({ act: "exitedAfterDisconnect" }, () =>
// TODO(cmorten): remove type cast once process interface is completed.
// deno-lint-ignore no-explicit-any
(process as any).disconnect());
}
}
}
handles.forEach((handle) => {
waitingCount++;
if (handle[ownerSymbol]) {
handle[ownerSymbol].close(checkWaitingCount);
} else {
handle.close(checkWaitingCount);
}
});
handles.clear();
checkWaitingCount();
}
// Extend generic Worker with methods specific to worker processes.
Worker.prototype.disconnect = function () {
if (this.state !== "disconnecting" && this.state !== "destroying") {
this.state = "disconnecting";
Reflect.apply(_disconnect, this, []);
}
return this;
};
Worker.prototype.destroy = function () {
if (this.state === "destroying") {
return;
}
this.exitedAfterDisconnect = true;
if (!this.isConnected()) {
process.exit(0);
} else {
this.state = "destroying";
// TODO(cmorten): remove type cast once process interface is completed.
// deno-lint-ignore no-explicit-any
send({ act: "exitedAfterDisconnect" }, () => (process as any).disconnect());
process.once("disconnect", () => process.exit(0));
}
};
export default cluster;

View File

@ -1,24 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent and Node contributors. All rights reserved. MIT license.
import process from "../../process.ts";
import childCluster from "./child.ts";
import primaryCluster from "./primary.ts";
export const cluster = "NODE_UNIQUE_ID" in process.env
? childCluster
: primaryCluster;
initializeClusterIPC();
// TODO: migrate to process pre-execution module if/when ported.
// See https://github.com/nodejs/node/blob/main/lib/internal/process/pre_execution.js#L507.
function initializeClusterIPC() {
if (process.env.NODE_UNIQUE_ID) {
cluster._setupWorker!();
// Make sure it's not accidentally inherited by child processes.
delete process.env.NODE_UNIQUE_ID;
}
}
export default cluster;

View File

@ -1,438 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license.
import assert from "../assert.mjs";
import { fork } from "../../child_process.ts";
import path from "../../path.ts";
import EventEmitter from "../../events.ts";
import RoundRobinHandle from "./round_robin_handle.ts";
import SharedHandle from "./shared_handle.ts";
import Worker from "./worker.ts";
import { internal, sendHelper } from "./utils.ts";
import { validatePort } from "../validators.mjs";
import process from "../../process.ts";
import { ChildProcess } from "../child_process.ts";
import { notImplemented } from "../../_utils.ts";
import { ObjectAssign } from "../primordials.mjs";
import type {
Cluster as ICluster,
ClusterSettings,
Message,
Worker as IWorker,
WorkerClass,
} from "./types.ts";
const cluster: ICluster = new EventEmitter() as ICluster;
const intercom = new EventEmitter();
const SCHED_NONE = 1;
const SCHED_RR = 2;
const minPort = 1024;
const maxPort = 65535;
const handles = new Map();
(cluster.isWorker as boolean) = false;
(cluster.isMaster as boolean) = true;
(cluster.isPrimary as boolean) = true;
(cluster.Worker as WorkerClass) = Worker;
(cluster.workers as Record<number, IWorker>) = {};
(cluster.settings as ClusterSettings) = {};
(cluster.SCHED_NONE as number) = SCHED_NONE; // Leave it to the operating system.
(cluster.SCHED_RR as number) = SCHED_RR; // Primary distributes connections.
let ids = 0;
let debugPortOffset = 1;
let initialized = false;
const envSchedulingPolicy = process.env.NODE_CLUSTER_SCHED_POLICY;
let schedulingPolicy: number;
if (envSchedulingPolicy === "rr") {
schedulingPolicy = SCHED_RR;
} else if (envSchedulingPolicy === "none") {
schedulingPolicy = SCHED_NONE;
} else if (process.platform === "win32") {
// Round-robin doesn't perform well on
// Windows due to the way IOCP is wired up.
schedulingPolicy = SCHED_NONE;
} else {
schedulingPolicy = SCHED_RR;
}
cluster.schedulingPolicy = schedulingPolicy;
cluster.setupPrimary = function (options?: ClusterSettings) {
const settings = {
args: process.argv.slice(2),
// TODO: remove extension replacement if/when have a better solution for
// resolving TypeScript files through Deno's Node module resolution.
// See https://github.com/denoland/deno/blob/main/cli/node/mod.rs#L725.
exec: process.argv[1].replace(/\.ts$/, ".mjs"),
execArgv: process.execArgv,
silent: false,
...cluster.settings,
...options,
};
// Tell V8 to write profile data for each process to a separate file.
// Without --logfile=v8-%p.log, everything ends up in a single, unusable
// file. (Unusable because what V8 logs are memory addresses and each
// process has its own memory mappings.)
if (
settings.execArgv.some((s: string) => s.startsWith("--prof")) &&
!settings.execArgv.some((s: string) => s.startsWith("--logfile="))
) {
settings.execArgv = [...settings.execArgv, "--logfile=v8-%p.log"];
}
(cluster.settings as ClusterSettings) = settings;
if (initialized === true) {
return process.nextTick(setupSettingsNT, settings);
}
initialized = true;
schedulingPolicy = cluster.schedulingPolicy; // Freeze policy.
assert(
schedulingPolicy === SCHED_NONE || schedulingPolicy === SCHED_RR,
`Bad cluster.schedulingPolicy: ${schedulingPolicy}`,
);
process.nextTick(setupSettingsNT, settings);
process.on("internalMessage", (message: Message) => {
if (message.cmd !== "NODE_DEBUG_ENABLED") {
return;
}
notImplemented("cluster.Cluster.prototype.setupPrimary debugProcess");
});
};
// Deprecated alias must be same as setupPrimary
cluster.setupMaster = cluster.setupPrimary;
function setupSettingsNT(settings: ClusterSettings) {
cluster.emit("setup", settings);
}
function createWorkerProcess(id: number, env?: Record<string, unknown>) {
const workerEnv = ObjectAssign({}, process.env, env, {
NODE_UNIQUE_ID: `${id}`,
}) as
& InstanceType<ObjectConstructor>
& Record<string, string | number | boolean>;
const execArgv = [...(cluster.settings.execArgv as string[])];
const debugArgRegex = /--inspect(?:-brk|-port)?|--debug-port/;
const nodeOptions = process.env.NODE_OPTIONS || "";
if (
execArgv.some((arg) => debugArgRegex.test(arg)) ||
debugArgRegex.test(nodeOptions)
) {
let inspectPort;
if ("inspectPort" in cluster.settings) {
if (typeof cluster.settings.inspectPort === "function") {
inspectPort = cluster.settings.inspectPort();
} else {
inspectPort = cluster.settings.inspectPort;
}
validatePort(inspectPort);
} else {
inspectPort = (process as unknown as { debugPort: number }).debugPort +
debugPortOffset;
if (inspectPort > maxPort) {
inspectPort = inspectPort - maxPort + minPort - 1;
}
debugPortOffset++;
}
execArgv.push(`--inspect-port=${inspectPort}`);
}
return fork(cluster.settings.exec!, cluster.settings.args, {
cwd: cluster.settings.cwd,
env: workerEnv,
serialization: cluster.settings.serialization,
silent: cluster.settings.silent,
windowsHide: cluster.settings.windowsHide,
execArgv: execArgv,
stdio: cluster.settings.stdio,
gid: cluster.settings.gid,
uid: cluster.settings.uid,
});
}
function removeWorker(worker: IWorker) {
assert(worker);
delete cluster.workers![worker.id];
if (Object.keys(cluster.workers!).length === 0) {
assert(handles.size === 0, "Resource leak detected.");
intercom.emit("disconnect");
}
}
function removeHandlesForWorker(worker: IWorker) {
assert(worker);
handles.forEach((handle, key) => {
if (handle.remove(worker)) handles.delete(key);
});
}
cluster.fork = function (env) {
cluster.setupPrimary();
const id = ++ids;
const workerProcess = createWorkerProcess(id, env);
const worker = new Worker({
id: id,
process: workerProcess,
});
worker.on(
"message",
// deno-lint-ignore no-explicit-any
function (this: IWorker, message: Message, handle: any) {
cluster.emit("message", this, message, handle);
},
);
worker.process.once("exit", (exitCode: number, signalCode: number) => {
/*
* Remove the worker from the workers list only
* if it has disconnected, otherwise we might
* still want to access it.
*/
if (!worker.isConnected()) {
removeHandlesForWorker(worker);
removeWorker(worker);
}
worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = "dead";
worker.emit("exit", exitCode, signalCode);
cluster.emit("exit", worker, exitCode, signalCode);
});
worker.process.once("disconnect", () => {
/*
* Now is a good time to remove the handles
* associated with this worker because it is
* not connected to the primary anymore.
*/
removeHandlesForWorker(worker);
/*
* Remove the worker from the workers list only
* if its process has exited. Otherwise, we might
* still want to access it.
*/
if (worker.isDead()) {
removeWorker(worker);
}
worker.exitedAfterDisconnect = !!worker.exitedAfterDisconnect;
worker.state = "disconnected";
worker.emit("disconnect");
cluster.emit("disconnect", worker);
});
worker.process.on("internalMessage", internal(worker, onmessage));
process.nextTick(emitForkNT, worker);
cluster.workers![worker.id] = worker;
return worker;
};
function emitForkNT(worker: IWorker) {
cluster.emit("fork", worker);
}
cluster.disconnect = function (cb) {
const workers = Object.keys(cluster.workers!);
if (workers.length === 0) {
process.nextTick(() => intercom.emit("disconnect"));
} else {
for (const worker of Object.values(cluster.workers!)) {
if (worker.isConnected()) {
worker.disconnect();
}
}
}
if (typeof cb === "function") {
intercom.once("disconnect", cb);
}
};
const methodMessageMapping = {
close,
exitedAfterDisconnect,
listening,
online,
queryServer,
};
// deno-lint-ignore no-explicit-any
function onmessage(this: IWorker, message: Message, _handle: any) {
const fn =
methodMessageMapping[message.act as keyof typeof methodMessageMapping];
if (typeof fn === "function") {
fn(this, message);
}
}
function online(worker: IWorker) {
worker.state = "online";
worker.emit("online");
cluster.emit("online", worker);
}
function exitedAfterDisconnect(worker: IWorker, message: Message) {
worker.exitedAfterDisconnect = true;
send(worker, { ack: message.seq });
}
function queryServer(worker: IWorker, message: Message) {
// Stop processing if worker already disconnecting
if (worker.exitedAfterDisconnect) {
return;
}
const key = `${message.address}:${message.port}:${message.addressType}:` +
`${message.fd}:${message.index}`;
let handle = handles.get(key);
if (handle === undefined) {
let address = message.address;
// Find shortest path for unix sockets because of the ~100 byte limit
if (
message.port! < 0 &&
typeof address === "string" &&
process.platform !== "win32"
) {
address = path.relative(process.cwd(), address);
if (message.address!.length < address.length) {
address = message.address;
}
}
// UDP is exempt from round-robin connection balancing for what should
// be obvious reasons: it's connectionless. There is nothing to send to
// the workers except raw datagrams and that's pointless.
if (
schedulingPolicy !== SCHED_RR ||
message.addressType === "udp4" ||
message.addressType === "udp6"
) {
handle = new SharedHandle(key, address!, message);
} else {
handle = new RoundRobinHandle(key, address!, message);
}
handles.set(key, handle);
}
if (!handle.data) {
handle.data = message.data;
}
// Set custom server data
handle.add(
worker,
(
errno: number,
reply: Record<string, unknown> | null,
// deno-lint-ignore no-explicit-any
handle: any,
) => {
const { data } = handles.get(key);
if (errno) {
handles.delete(key); // Gives other workers a chance to retry.
}
send(
worker,
{
errno,
key,
ack: message.seq,
data,
...reply,
},
handle,
);
},
);
}
function listening(worker: IWorker, message: Message) {
const info = {
addressType: message.addressType,
address: message.address,
port: message.port,
fd: message.fd,
};
worker.state = "listening";
worker.emit("listening", info);
cluster.emit("listening", worker, info);
}
// Server in worker is closing, remove from list. The handle may have been
// removed by a prior call to removeHandlesForWorker() so guard against that.
function close(worker: IWorker, message: Message) {
const key = message.key;
const handle = handles.get(key);
if (handle && handle.remove(worker)) {
handles.delete(key);
}
}
function send(
worker: IWorker,
message: Message,
// deno-lint-ignore no-explicit-any
handle?: any,
cb?: unknown,
) {
return sendHelper(worker.process, message, handle, cb);
}
// Extend generic Worker with methods specific to the primary process.
Worker.prototype.disconnect = function (): IWorker {
this.exitedAfterDisconnect = true;
send(this, { act: "disconnect" });
removeHandlesForWorker(this);
removeWorker(this);
return this;
};
Worker.prototype.destroy = function (signo?: string): void {
const proc = this.process;
signo = signo || "SIGTERM";
if (this.isConnected()) {
this.once("disconnect", () => (proc as ChildProcess).kill(signo));
this.disconnect();
return;
}
(proc as ChildProcess).kill(signo);
};
export default cluster;

View File

@ -1,177 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license.
import assert from "../assert.mjs";
import { sendHelper } from "./utils.ts";
import { constants } from "../../internal_binding/tcp_wrap.ts";
import { append, init, isEmpty, peek, remove } from "../linkedlist.mjs";
import type { Message, Worker } from "./types.ts";
// deno-lint-ignore no-var no-explicit-any
var RoundRobinHandle: any;
// Lazily initializes the actual RoundRobinHandle class.
// This trick is necessary for avoiding circular dependencies between
// net and cluster modules.
// deno-lint-ignore no-explicit-any
export function initRoundRobinHandle(createServer: any) {
if (RoundRobinHandle) {
return;
}
RoundRobinHandle = class RoundRobinHandle {
key: string;
all: Map<number, Worker>;
free: Map<number, Worker>;
// deno-lint-ignore no-explicit-any
handle: any = null;
// deno-lint-ignore no-explicit-any
handles: any;
// deno-lint-ignore no-explicit-any
server: any;
constructor(
key: string,
address: string,
{ port, fd, flags, backlog }: Message,
) {
this.key = key;
this.all = new Map();
this.free = new Map();
this.handles = init(Object.create(null));
this.handle = null;
this.server = createServer(assert.fail);
if (fd! >= 0) {
this.server.listen({ fd, backlog });
} else if (port! >= 0) {
this.server.listen({
port,
host: address,
// Currently, net module only supports `ipv6Only` option in `flags`.
ipv6Only: Boolean(flags! & constants.UV_TCP_IPV6ONLY),
backlog,
});
} else {
this.server.listen(address, backlog); // UNIX socket path.
}
this.server.once("listening", () => {
this.handle = this.server!._handle;
// deno-lint-ignore no-explicit-any
this.handle!.onconnection = (err: number, handle?: any) =>
this.distribute(err, handle);
this.server!._handle = null;
this.server = null;
});
}
add(
worker: Worker,
send: (
errno: number | null,
reply: Record<string, unknown> | null,
// deno-lint-ignore no-explicit-any
handle: any,
) => void,
) {
assert(this.all.has(worker.id) === false);
this.all.set(worker.id, worker);
const done = () => {
if (this.handle.getsockname) {
const out = {};
this.handle.getsockname(out);
send(null, { sockname: out }, null);
} else {
send(null, null, null); // UNIX socket.
}
this.handoff(worker); // In case there are connections pending.
};
if (this.server === null) {
return done();
}
// Still busy binding.
this.server.once("listening", done);
// deno-lint-ignore no-explicit-any
this.server.once("error", (err: any) => {
send(err.errno, null, null);
});
}
remove(worker: Worker) {
const existed = this.all.delete(worker.id);
if (!existed) {
return false;
}
this.free.delete(worker.id);
if (this.all.size !== 0) {
return false;
}
while (!isEmpty(this.handles)) {
const handle = peek(this.handles);
handle.close();
remove(handle);
}
this.handle!.close();
this.handle = null;
return true;
}
// deno-lint-ignore no-explicit-any
distribute(_err: number, handle?: any) {
append(this.handles, handle);
const [workerEntry] = this.free; // this.free is a Map
if (Array.isArray(workerEntry)) {
const { 0: workerId, 1: worker } = workerEntry;
this.free.delete(workerId);
this.handoff(worker);
}
}
handoff(worker: Worker) {
if (!this.all.has(worker.id)) {
return; // Worker is closing (or has closed) the server.
}
const handle = peek(this.handles);
if (handle === null) {
this.free.set(worker.id, worker); // Add to ready queue again.
return;
}
remove(handle);
const message = { act: "newconn", key: this.key };
sendHelper(
worker.process,
message,
handle,
(reply: Record<string, never>) => {
if (reply.accepted) {
handle.close();
} else {
this.distribute(0, handle); // Worker is shutting down. Send to another.
}
this.handoff(worker);
},
);
}
};
}
export { RoundRobinHandle as default };

View File

@ -1,91 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license.
import assert from "../assert.mjs";
import { _createSocketHandle } from "../dgram.ts";
import type { Message, Worker } from "./types.ts";
// deno-lint-ignore no-var no-explicit-any
var SharedHandle: any;
// Lazily initializes the actual SharedHandle class.
// This trick is necessary for avoiding circular dependencies between
// net and cluster modules.
// deno-lint-ignore no-explicit-any
export function initSharedHandle(_createServerHandle: any) {
if (SharedHandle) {
return;
}
SharedHandle = class SharedHandle {
key: string;
workers: Map<number, Worker>;
// deno-lint-ignore no-explicit-any
handle: any = null;
errno = 0;
constructor(
key: string,
address: string,
{ port, addressType, fd, flags }: Message,
) {
this.key = key;
this.workers = new Map();
this.handle = null;
this.errno = 0;
let rval;
if (addressType === "udp4" || addressType === "udp6") {
rval = _createSocketHandle(address, port!, addressType, fd!, flags!);
} else {
rval = _createServerHandle(
address,
port!,
addressType as number,
fd,
flags,
);
}
if (typeof rval === "number") {
this.errno = rval;
} else {
this.handle = rval;
}
}
add(
worker: Worker,
send: (
errno: number,
reply: Record<string, unknown> | null,
// deno-lint-ignore no-explicit-any
handle: any,
) => void,
) {
assert(!this.workers.has(worker.id));
this.workers.set(worker.id, worker);
send(this.errno, null, this.handle!);
}
remove(worker: Worker) {
if (!this.workers.has(worker.id)) {
return false;
}
this.workers.delete(worker.id);
if (this.workers.size !== 0) {
return false;
}
this.handle!.close();
this.handle = null;
return true;
}
};
}
export { SharedHandle as default };

View File

@ -1,539 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license.
import EventEmitter from "../../events.ts";
import { ChildProcess } from "../child_process.ts";
import { Process } from "../../process.ts";
import type { ForkOptions } from "../../child_process.ts";
export interface Message {
// deno-lint-ignore no-explicit-any
[key: string]: any;
}
export type Serializable =
| string
| Record<string, unknown>
| number
| boolean
| bigint;
export interface MessageOptions {
keepOpen?: boolean | undefined;
}
export interface ClusterSettings extends ForkOptions {
exec?: string;
args?: string[];
inspectPort?: number | (() => number);
}
export interface Address {
address: string;
port: number;
addressType: number | "udp4" | "udp6"; // 4, 6, -1, "udp4", "udp6"
}
export interface WorkerOptions {
id?: number;
process?: ChildProcess | Process;
state?: string;
}
export interface WorkerClass extends Function {
new (options?: WorkerOptions | null): Worker;
}
export interface Worker extends EventEmitter {
state: string;
/**
* Each new worker is given its own unique id, this id is stored in the`id`.
*
* While a worker is alive, this is the key that indexes it in`cluster.workers`.
*/
id: number;
/**
* All workers are created using `child_process.fork()`, the returned object
* from this function is stored as `.process`. In a worker, the global `process`is stored.
*
* See: `Child Process module`.
*
* Workers will call `process.exit(0)` if the `'disconnect'` event occurs
* on `process` and `.exitedAfterDisconnect` is not `true`. This protects against
* accidental disconnection.
*/
process: ChildProcess | Process;
/**
* Send a message to a worker or primary, optionally with a handle.
*
* In the primary this sends a message to a specific worker. It is identical to `ChildProcess.send()`.
*
* In a worker this sends a message to the primary. It is identical to`process.send()`.
*
* This example will echo back all messages from the primary:
*
* ```js
* if (cluster.isPrimary) {
* const worker = cluster.fork();
* worker.send('hi there');
*
* } else if (cluster.isWorker) {
* process.on('message', (msg) => {
* process.send(msg);
* });
* }
* ```
* @param options The `options` argument, if present, is an object used to parameterize the sending of certain types of handles. `options` supports the following properties:
*/
send(
message: Serializable,
callback?: (error: Error | null) => void,
): boolean;
send(
message: Serializable,
// deno-lint-ignore no-explicit-any
sendHandle: any,
callback?: (error: Error | null) => void,
): boolean;
send(
message: Serializable,
// deno-lint-ignore no-explicit-any
sendHandle: any,
options?: MessageOptions,
callback?: (error: Error | null) => void,
): boolean;
/**
* This function will kill the worker. In the primary, it does this
* by disconnecting the `worker.process`, and once disconnected, killing
* with `signal`. In the worker, it does it by disconnecting the channel,
* and then exiting with code `0`.
*
* Because `kill()` attempts to gracefully disconnect the worker process, it is
* susceptible to waiting indefinitely for the disconnect to complete. For example,
* if the worker enters an infinite loop, a graceful disconnect will never occur.
* If the graceful disconnect behavior is not needed, use `worker.process.kill()`.
*
* Causes `.exitedAfterDisconnect` to be set.
*
* This method is aliased as `worker.destroy()` for backward compatibility.
*
* In a worker, `process.kill()` exists, but it is not this function;
* it is `kill()`.
* @param [signal='SIGTERM'] Name of the kill signal to send to the worker process.
*/
kill(signal?: string): void;
destroy(signal?: string): void;
/**
* In a worker, this function will close all servers, wait for the `'close'` event
* on those servers, and then disconnect the IPC channel.
*
* In the primary, an internal message is sent to the worker causing it to call`.disconnect()` on itself.
*
* Causes `.exitedAfterDisconnect` to be set.
*
* After a server is closed, it will no longer accept new connections,
* but connections may be accepted by any other listening worker. Existing
* connections will be allowed to close as usual. When no more connections exist,
* see `server.close()`, the IPC channel to the worker will close allowing it
* to die gracefully.
*
* The above applies _only_ to server connections, client connections are not
* automatically closed by workers, and disconnect does not wait for them to close
* before exiting.
*
* In a worker, `process.disconnect` exists, but it is not this function;
* it is `disconnect()`.
*
* Because long living server connections may block workers from disconnecting, it
* may be useful to send a message, so application specific actions may be taken to
* close them. It also may be useful to implement a timeout, killing a worker if
* the `'disconnect'` event has not been emitted after some time.
*
* ```js
* if (cluster.isPrimary) {
* const worker = cluster.fork();
* let timeout;
*
* worker.on('listening', (address) => {
* worker.send('shutdown');
* worker.disconnect();
* timeout = setTimeout(() => {
* worker.kill();
* }, 2000);
* });
*
* worker.on('disconnect', () => {
* clearTimeout(timeout);
* });
*
* } else if (cluster.isWorker) {
* const net = require('net');
* const server = net.createServer((socket) => {
* // Connections never end
* });
*
* server.listen(8000);
*
* process.on('message', (msg) => {
* if (msg === 'shutdown') {
* // Initiate graceful close of any connections to server
* }
* });
* }
* ```
* @return A reference to `worker`.
*/
disconnect(): void;
/**
* This function returns `true` if the worker is connected to its primary via its
* IPC channel, `false` otherwise. A worker is connected to its primary after it
* has been created. It is disconnected after the `'disconnect'` event is emitted.
*/
isConnected(): boolean;
/**
* This function returns `true` if the worker's process has terminated (either
* because of exiting or being signaled). Otherwise, it returns `false`.
*
* ```js
* import cluster from 'cluster';
* import http from 'http';
* import { cpus } from 'os';
* import process from 'process';
*
* const numCPUs = cpus().length;
*
* if (cluster.isPrimary) {
* console.log(`Primary ${process.pid} is running`);
*
* // Fork workers.
* for (let i = 0; i < numCPUs; i++) {
* cluster.fork();
* }
*
* cluster.on('fork', (worker) => {
* console.log('worker is dead:', worker.isDead());
* });
*
* cluster.on('exit', (worker, code, signal) => {
* console.log('worker is dead:', worker.isDead());
* });
* } else {
* // Workers can share any TCP connection. In this case, it is an HTTP server.
* http.createServer((req, res) => {
* res.writeHead(200);
* res.end(`Current process\n ${process.pid}`);
* process.kill(process.pid);
* }).listen(8000);
* }
* ```
*/
isDead(): boolean;
/**
* This property is `true` if the worker exited due to `.kill()` or`.disconnect()`. If the worker exited any other way, it is `false`. If the
* worker has not exited, it is `undefined`.
*
* The boolean `worker.exitedAfterDisconnect` allows distinguishing between
* voluntary and accidental exit, the primary may choose not to respawn a worker
* based on this value.
*
* ```js
* cluster.on('exit', (worker, code, signal) => {
* if (worker.exitedAfterDisconnect === true) {
* console.log('Oh, it was just voluntary no need to worry');
* }
* });
*
* // kill worker
* worker.kill();
* ```
*/
exitedAfterDisconnect?: boolean;
/**
* events.EventEmitter
* 1. disconnect
* 2. error
* 3. exit
* 4. listening
* 5. message
* 6. online
*/
addListener(event: string, listener: (...args: unknown[]) => void): this;
addListener(event: "disconnect", listener: () => void): this;
addListener(event: "error", listener: (error: Error) => void): this;
addListener(
event: "exit",
listener: (code: number, signal: string) => void,
): this;
addListener(event: "listening", listener: (address: Address) => void): this;
addListener(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (message: unknown, handle: any) => void,
): this; // the handle is a Socket or Server object, or undefined.
addListener(event: "online", listener: () => void): this;
emit(event: string | symbol, ...args: unknown[]): boolean;
emit(event: "disconnect"): boolean;
emit(event: "error", error: Error): boolean;
emit(event: "exit", code: number, signal: string): boolean;
emit(event: "listening", address: Address): boolean;
// deno-lint-ignore no-explicit-any
emit(event: "message", message: unknown, handle: any): boolean;
emit(event: "online"): boolean;
on(event: string, listener: (...args: unknown[]) => void): this;
on(event: "disconnect", listener: () => void): this;
on(event: "error", listener: (error: Error) => void): this;
on(event: "exit", listener: (code: number, signal: string) => void): this;
on(event: "listening", listener: (address: Address) => void): this;
// deno-lint-ignore no-explicit-any
on(event: "message", listener: (message: unknown, handle: any) => void): this; // the handle is a Socket or Server object, or undefined.
on(event: "online", listener: () => void): this;
once(event: string, listener: (...args: unknown[]) => void): this;
once(event: "disconnect", listener: () => void): this;
once(event: "error", listener: (error: Error) => void): this;
once(event: "exit", listener: (code: number, signal: string) => void): this;
once(event: "listening", listener: (address: Address) => void): this;
once(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (message: unknown, handle: any) => void,
): this; // the handle is a Socket or Server object, or undefined.
once(event: "online", listener: () => void): this;
prependListener(event: string, listener: (...args: unknown[]) => void): this;
prependListener(event: "disconnect", listener: () => void): this;
prependListener(event: "error", listener: (error: Error) => void): this;
prependListener(
event: "exit",
listener: (code: number, signal: string) => void,
): this;
prependListener(
event: "listening",
listener: (address: Address) => void,
): this;
prependListener(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (message: unknown, handle: any) => void,
): this; // the handle is a Socket or Server object, or undefined.
prependListener(event: "online", listener: () => void): this;
prependOnceListener(
event: string,
listener: (...args: unknown[]) => void,
): this;
prependOnceListener(event: "disconnect", listener: () => void): this;
prependOnceListener(event: "error", listener: (error: Error) => void): this;
prependOnceListener(
event: "exit",
listener: (code: number, signal: string) => void,
): this;
prependOnceListener(
event: "listening",
listener: (address: Address) => void,
): this;
prependOnceListener(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (message: unknown, handle: any) => void,
): this; // the handle is a Socket or Server object, or undefined.
prependOnceListener(event: "online", listener: () => void): this;
}
export interface Cluster extends EventEmitter {
readonly isPrimary: boolean;
/** @deprecated use isPrimary. */
readonly isMaster: boolean;
readonly isWorker: boolean;
readonly settings: ClusterSettings;
readonly worker?: Worker | null;
readonly Worker?: WorkerClass;
readonly workers?: Record<number, Worker>;
readonly SCHED_NONE: number;
readonly SCHED_RR: number;
schedulingPolicy: number;
_setupWorker?: () => void;
_getServer?: (
// deno-lint-ignore no-explicit-any
obj: any,
options: {
address?: string | null;
port?: number | null;
addressType?: string | number | null;
fd?: number | null;
flags?: number | null;
},
// deno-lint-ignore no-explicit-any
cb: (err: number, handle: any) => void,
) => void;
disconnect(callback?: () => void): void;
fork(env?: Record<string | number, unknown>): Worker;
/** @deprecated - use setupPrimary. */
setupMaster(settings?: ClusterSettings): void;
/**
* `setupPrimary` is used to change the default 'fork' behavior. Once called, the settings will be present in cluster.settings.
*/
setupPrimary(settings?: ClusterSettings): void;
/**
* events.EventEmitter
* 1. disconnect
* 2. exit
* 3. fork
* 4. listening
* 5. message
* 6. online
* 7. setup
*/
addListener(event: string, listener: (...args: unknown[]) => void): this;
addListener(event: "disconnect", listener: (worker: Worker) => void): this;
addListener(
event: "exit",
listener: (worker: Worker, code: number, signal: string) => void,
): this;
addListener(event: "fork", listener: (worker: Worker) => void): this;
addListener(
event: "listening",
listener: (worker: Worker, address: Address) => void,
): this;
addListener(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (worker: Worker, message: unknown, handle: any) => void,
): this; // the handle is a Socket or Server object, or undefined.
addListener(event: "online", listener: (worker: Worker) => void): this;
addListener(
event: "setup",
listener: (settings: ClusterSettings) => void,
): this;
emit(event: string | symbol, ...args: unknown[]): boolean;
emit(event: "disconnect", worker: Worker): boolean;
emit(event: "exit", worker: Worker, code: number, signal: string): boolean;
emit(event: "fork", worker: Worker): boolean;
emit(event: "listening", worker: Worker, address: Address): boolean;
emit(
event: "message",
worker: Worker,
message: unknown,
// deno-lint-ignore no-explicit-any
handle: any,
): boolean;
emit(event: "online", worker: Worker): boolean;
emit(event: "setup", settings: ClusterSettings): boolean;
on(event: string, listener: (...args: unknown[]) => void): this;
on(event: "disconnect", listener: (worker: Worker) => void): this;
on(
event: "exit",
listener: (worker: Worker, code: number, signal: string) => void,
): this;
on(event: "fork", listener: (worker: Worker) => void): this;
on(
event: "listening",
listener: (worker: Worker, address: Address) => void,
): this;
on(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (worker: Worker, message: unknown, handle: any) => void,
): this; // the handle is a Socket or Server object, or undefined.
on(event: "online", listener: (worker: Worker) => void): this;
on(event: "setup", listener: (settings: ClusterSettings) => void): this;
once(event: string, listener: (...args: unknown[]) => void): this;
once(event: "disconnect", listener: (worker: Worker) => void): this;
once(
event: "exit",
listener: (worker: Worker, code: number, signal: string) => void,
): this;
once(event: "fork", listener: (worker: Worker) => void): this;
once(
event: "listening",
listener: (worker: Worker, address: Address) => void,
): this;
once(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (worker: Worker, message: unknown, handle: any) => void,
): this; // the handle is a Socket or Server object, or undefined.
once(event: "online", listener: (worker: Worker) => void): this;
once(event: "setup", listener: (settings: ClusterSettings) => void): this;
prependListener(event: string, listener: (...args: unknown[]) => void): this;
prependListener(
event: "disconnect",
listener: (worker: Worker) => void,
): this;
prependListener(
event: "exit",
listener: (worker: Worker, code: number, signal: string) => void,
): this;
prependListener(event: "fork", listener: (worker: Worker) => void): this;
prependListener(
event: "listening",
listener: (worker: Worker, address: Address) => void,
): this;
// the handle is a Socket or Server object, or undefined.
prependListener(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (worker: Worker, message: unknown, handle?: any) => void,
): this;
prependListener(event: "online", listener: (worker: Worker) => void): this;
prependListener(
event: "setup",
listener: (settings: ClusterSettings) => void,
): this;
prependOnceListener(
event: string,
listener: (...args: unknown[]) => void,
): this;
prependOnceListener(
event: "disconnect",
listener: (worker: Worker) => void,
): this;
prependOnceListener(
event: "exit",
listener: (worker: Worker, code: number, signal: string) => void,
): this;
prependOnceListener(event: "fork", listener: (worker: Worker) => void): this;
prependOnceListener(
event: "listening",
listener: (worker: Worker, address: Address) => void,
): this;
// the handle is a Socket or Server object, or undefined.
prependOnceListener(
event: "message",
// deno-lint-ignore no-explicit-any
listener: (worker: Worker, message: unknown, handle: any) => void,
): this;
prependOnceListener(
event: "online",
listener: (worker: Worker) => void,
): this;
prependOnceListener(
event: "setup",
listener: (settings: ClusterSettings) => void,
): this;
}

View File

@ -1,72 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license.
import { ChildProcess } from "../child_process.ts";
import { Process } from "../../process.ts";
import type { Message, Worker } from "./types.ts";
const callbacks = new Map();
let seq = 0;
export function sendHelper(
proc: ChildProcess | Process,
message: Message,
// deno-lint-ignore no-explicit-any
handle?: any,
cb?: unknown,
) {
// TODO(cmorten): remove type cast once ChildProcess implements `connected`
// property.
// deno-lint-ignore no-explicit-any
if (!(proc as any).connected) {
return false;
}
// Mark message as internal. See INTERNAL_PREFIX
// in lib/internal/child_process.js
message = { cmd: "NODE_CLUSTER", ...message, seq };
if (typeof cb === "function") {
callbacks.set(seq, cb);
}
seq += 1;
// TODO(cmorten): remove type cast once ChildProcess implements `send`
// method.
// deno-lint-ignore no-explicit-any
return (proc as any).send(message, handle);
}
// Returns an internalMessage listener that hands off normal messages
// to the callback but intercepts and redirects ACK messages.
export function internal(
worker: Worker,
// deno-lint-ignore no-explicit-any
cb: (message: Message, handle: any) => void,
) {
// deno-lint-ignore no-explicit-any
return function onInternalMessage(message: Message, _handle: any) {
if (message.cmd !== "NODE_CLUSTER") {
return;
}
let fn = cb;
if (message.ack !== undefined) {
const callback = callbacks.get(message.ack);
if (callback !== undefined) {
fn = callback;
callbacks.delete(message.ack);
}
}
Reflect.apply(fn, worker, arguments);
};
}
export default {
sendHelper,
internal,
};

View File

@ -1,76 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license.
import { EventEmitter } from "../../events.ts";
import { ChildProcess } from "../child_process.ts";
import { Process } from "../../process.ts";
import type { Worker as IWorker, WorkerOptions } from "./types.ts";
// Common Worker implementation shared between the cluster primary and workers.
export class Worker extends EventEmitter implements IWorker {
state: string;
id: number;
process!: ChildProcess | Process;
exitedAfterDisconnect?: boolean;
destroy(_signal?: string): void {}
disconnect(): void {}
constructor(options?: WorkerOptions | null) {
super();
if (options === null || typeof options !== "object") {
options = {};
}
this.exitedAfterDisconnect = undefined;
this.state = options.state || "none";
this.id = options.id || 0;
if (options.process) {
this.process = options.process;
this.process.on(
"error",
(code, signal) => {
console.log("Worker process error", code);
this.emit("error", code, signal);
},
);
this.process.on(
"message",
(message, handle) => {
console.log("Worker process message", message);
this.emit("message", message, handle);
},
);
}
}
kill(): void {
return Reflect.apply(this.destroy, this, arguments);
}
send(): boolean {
// TODO(cmorten): remove type cast once ChildProcess implements `send`
// method.
// deno-lint-ignore no-explicit-any
return Reflect.apply((this.process as any).send, this.process, arguments);
}
isDead(): boolean {
return (
this.process.exitCode != null ||
(this.process as ChildProcess).signalCode != null
);
}
isConnected(): boolean {
// TODO(cmorten): remove type cast once ChildProcess implements `connected`
// property.
// deno-lint-ignore no-explicit-any
return (this.process as any).connected;
}
}
export default Worker;

View File

@ -1,60 +0,0 @@
// Copyright 2018-2023 the Deno authors. All rights reserved. MIT license.
// Copyright Joyent, Inc. and Node.js contributors. All rights reserved. MIT license.
export function init(list) {
list._idleNext = list;
list._idlePrev = list;
return list;
}
// Show the most idle item.
export function peek(list) {
if (list._idlePrev === list) {
return null;
}
return list._idlePrev;
}
// Remove an item from its list.
export function remove(item) {
if (item._idleNext) {
item._idleNext._idlePrev = item._idlePrev;
}
if (item._idlePrev) {
item._idlePrev._idleNext = item._idleNext;
}
item._idleNext = null;
item._idlePrev = null;
}
// Remove an item from its list and place at the end.
export function append(list, item) {
if (item._idleNext || item._idlePrev) {
remove(item);
}
// Items are linked with _idleNext -> (older) and _idlePrev -> (newer).
// Note: This linkage (next being older) may seem counter-intuitive at first.
item._idleNext = list._idleNext;
item._idlePrev = list;
// The list _idleNext points to tail (newest) and _idlePrev to head (oldest).
list._idleNext._idlePrev = item;
list._idleNext = item;
}
export function isEmpty(list) {
return list._idleNext === list;
}
export default {
init,
peek,
remove,
append,
isEmpty,
};

View File

@ -28,12 +28,6 @@ import http2 from "./http2.ts";
import https from "./https.ts";
import inspector from "./inspector.ts";
import internalCp from "./internal/child_process.ts";
import internalClusterChild from "./internal/cluster/child.ts";
import internalClusterPrimary from "./internal/cluster/primary.ts";
import internalClusterRoundRobinHandle from "./internal/cluster/round_robin_handle.ts";
import internalClusterSharedHandle from "./internal/cluster/shared_handle.ts";
import internalClusterUtils from "./internal/cluster/utils.ts";
import internalClusterWorker from "./internal/cluster/worker.ts";
import internalCryptoCertificate from "./internal/crypto/certificate.ts";
import internalCryptoCipher from "./internal/crypto/cipher.ts";
import internalCryptoDiffiehellman from "./internal/crypto/diffiehellman.ts";
@ -124,12 +118,6 @@ export default {
https,
inspector,
"internal/child_process": internalCp,
"internal/cluster/child": internalClusterChild,
"internal/cluster/primary": internalClusterPrimary,
"internal/cluster/round_robin_handle": internalClusterRoundRobinHandle,
"internal/cluster/shared_handle": internalClusterSharedHandle,
"internal/cluster/utils": internalClusterUtils,
"internal/cluster/worker": internalClusterWorker,
"internal/crypto/certificate": internalCryptoCertificate,
"internal/crypto/cipher": internalCryptoCipher,
"internal/crypto/diffiehellman": internalCryptoDiffiehellman,

View File

@ -144,7 +144,6 @@ Deno.test("requireErrorInEval", async function () {
"run",
"--unstable",
"--allow-read",
"--allow-env",
"./_module/cjs/test_cjs_import.js",
],
cwd,

View File

@ -96,17 +96,8 @@ import { debuglog } from "./internal/util/debuglog.ts";
import type { DuplexOptions } from "./_stream.d.ts";
import type { BufferEncoding } from "./_global.d.ts";
import type { Abortable } from "./_events.d.ts";
import { initRoundRobinHandle } from "./internal/cluster/round_robin_handle.ts";
import { initSharedHandle } from "./internal/cluster/shared_handle.ts";
import { cluster } from "./internal/cluster/cluster.ts";
import { channel } from "./diagnostics_channel.ts";
// Lazily initializes the cluster *Handle classes.
// This trick is necessary for avoiding circular dependencies between
// net and cluster modules.
initRoundRobinHandle(createServer);
initSharedHandle(_createServerHandle);
let debug = debuglog("net", (fn) => {
debug = fn;
});
@ -119,7 +110,7 @@ const kBytesWritten = Symbol("kBytesWritten");
const DEFAULT_IPV4_ADDR = "0.0.0.0";
const DEFAULT_IPV6_ADDR = "::";
export type Handle = TCP | Pipe;
type Handle = TCP | Pipe;
interface HandleOptions {
pauseOnCreate?: boolean;
@ -189,7 +180,7 @@ interface IpcNetConnectOptions extends IpcSocketConnectOptions, SocketOptions {
type NetConnectOptions = TcpNetConnectOptions | IpcNetConnectOptions;
export interface AddressInfo {
interface AddressInfo {
address: string;
family?: string;
port: number;
@ -1669,7 +1660,16 @@ function _listenInCluster(
) {
exclusive = !!exclusive;
if (cluster.isPrimary || exclusive) {
// TODO(cmorten): here we deviate somewhat from the Node implementation which
// makes use of the https://nodejs.org/api/cluster.html module to run servers
// across a "cluster" of Node processes to take advantage of multi-core
// systems.
//
// Though Deno has has a Worker capability from which we could simulate this,
// for now we assert that we are _always_ on the primary process.
const isPrimary = true;
if (isPrimary || exclusive) {
// Will create a new handle
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
@ -1677,35 +1677,6 @@ function _listenInCluster(
return;
}
const serverQuery = {
address,
port,
addressType,
fd,
flags,
backlog,
};
// Get the primary's server handle, and listen on it
// deno-lint-ignore no-explicit-any
cluster._getServer!(server, serverQuery, listenOnPrimaryHandle as any);
function listenOnPrimaryHandle(err: number, handle: TCP) {
err = _checkBindError(err, port!, handle!);
if (err) {
const ex = exceptionWithHostPort(err, "bind", address!, port!);
return server.emit("error", ex);
}
// Reuse primary's server handle
server._handle = handle;
// _listen2 sets up the listened handle, it is still named like this
// to avoid breaking code that wraps this method
server._listen2(address, port, addressType, backlog, fd, flags);
}
}
function _lookupAndListen(
@ -2567,16 +2538,11 @@ export class Server extends EventEmitter {
* @param connectionListener Automatically set as a listener for the `"connection"` event.
* @return A `net.Server`.
*/
export function createServer(connectionListener?: ConnectionListener): Server;
export function createServer(
options?: ServerOptions,
connectionListener?: ConnectionListener,
): Server;
export function createServer(
options?: ServerOptions | ConnectionListener,
connectionListener?: ConnectionListener,
): Server {
return new Server(options as ServerOptions, connectionListener);
return new Server(options, connectionListener);
}
export { isIP, isIPv4, isIPv6 };

View File

@ -340,7 +340,7 @@ function uncaughtExceptionHandler(err: any, origin: string) {
let execPath: string | null = null;
export class Process extends EventEmitter {
class Process extends EventEmitter {
constructor() {
super();
@ -421,18 +421,6 @@ export class Process extends EventEmitter {
_exiting = _exiting;
/** https://nodejs.org/api/process.html#processconnected */
get connected() {
warnNotImplemented("process.connected");
return false;
}
/** https://nodejs.org/api/process.html#processsendmessage-sendhandle-options-callback */
send() {
warnNotImplemented("process.send");
}
/** https://nodejs.org/api/process.html#processexitcode_1 */
exitCode: undefined | number = undefined;