new cluster api

This commit is contained in:
Ryan Dahl 2011-11-04 15:11:19 -07:00
parent 44a5452a4f
commit 86528489ec
4 changed files with 95 additions and 68 deletions

View File

@ -9,10 +9,17 @@ which share server ports.
var cluster = require('cluster');
var http = require('http');
var numCPUs = require('os').cpus().length;
if (cluster.isMaster) {
// Start the master process, fork workers.
cluster.startMaster({ workers: 2 });
// Fork workers.
for (var i = 0; i < numCPUs; i++) {
cluster.fork();
}
cluster.on('death', function(worker) {
console.log('worker ' + worker.pid + ' died');
});
} else {
// Worker processes have a http server.
http.Server(function(req, res) {
@ -27,37 +34,38 @@ Running node will now share port 8000 between the workers:
Worker 2438 online
Worker 2437 online
### exports.startMaster([options])
### cluster.fork()
Spawns the initial worker processes, one per CPU by default.
Spawn a new worker process. This can only be called from the master process.
The following options are supported:
### cluster.isMaster
### cluster.isWorker
- `workerFilename`: script to execute in the worker process, defaults to
`process.argv[1]`
- `args`: worker program arguments, defaulting to `process.argv.slice(2)`
- `workers`: the number of workers, defaulting to `os.cpus().length`
Boolean flags to determine if the current process is a master or a worker
process in a cluster. A process `isMaster` if `process.env.NODE_WORKER_ID`
is undefined.
### exports.spawnWorker([options])
### cluster.eachWorker(cb)
Spawn a new worker process. This is called within `cluster.startMaster()`,
however it is useful to implement worker resuscitation as described below
in the "Common patterns" section.
Synchronously iterates over all of the workers.
The `options` available are identical to `cluster.startMaster()`.
cluster.eachWorker(function(worker) {
console.log("worker pid=" + worker.pid);
});
## Common patterns
### cluster.workerCount()
## Worker resuscitation
Returns the number of workers.
The following is an example of how you may implement worker resuscitation,
spawning a new worker process when another exits.
### Event: 'death'
if (cluster.isMaster) {
cluster.startMaster();
process.on('SIGCHLD', function(){
console.log('worker killed');
cluster.spawnWorker();
});
}
When any of the workers die the cluster module will emit the 'death' event.
This can be used to restart the worker by calling `fork()` again.
cluster.on('death', function(worker) {
console.log('worker ' + worker.pid + ' died. restart...');
cluster.fork();
});
Different techniques can be used to restart the worker depending on the
application.

View File

@ -22,7 +22,9 @@
var assert = require('assert');
var fork = require('child_process').fork;
var net = require('net');
var amMaster; // Used for asserts
var EventEmitter = require('events').EventEmitter;
var cluster = module.exports = new EventEmitter();
var debug;
@ -38,17 +40,20 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
// Used in the master:
var masterStarted = false;
var ids = 0;
var workers = [];
var servers = {};
var workerFilename;
var workerArgs;
// Used in the worker:
var workerId = 0;
var queryIds = 0;
var queryCallbacks = {};
exports.isWorker = 'NODE_WORKER_ID' in process.env;
exports.isMaster = ! exports.isWorker;
cluster.isWorker = 'NODE_WORKER_ID' in process.env;
cluster.isMaster = ! cluster.isWorker;
// Call this from the master process. It will start child workers.
//
@ -62,38 +67,23 @@ exports.isMaster = ! exports.isWorker;
//
// options.workers
// The number of workers to start. Defaults to os.cpus().length.
exports.startMaster = function(options) {
amMaster = true;
function startMaster() {
// This can only be called from the master.
assert(cluster.isMaster);
if (!options) {
options = {};
}
if (masterStarted) return;
masterStarted = true;
if (!options.workerFilename) {
options.workerFilename = process.argv[1];
}
if (!options.args) {
options.args = process.argv.slice(2);
}
if (!options.workers) {
options.workers = require('os').cpus().length;
}
for (var i = 0; i < options.workers; i++) {
forkWorker(options.workerFilename, options.args);
}
workerFilename = process.argv[1];
workerArgs = process.argv.slice(2);
process.on('uncaughtException', function(e) {
// Quickly try to kill all the workers.
// TODO: be session leader - will cause auto SIGHUP to the children.
for (var id in workers) {
if (workers[id]) {
debug("kill worker " + id);
workers[id].kill();
}
}
cluster.eachWorker(function(worker) {
debug("kill worker " + worker.pid);
worker.kill();
})
console.error("Exception in cluster master process: " +
e.message + '\n' + e.stack);
@ -104,7 +94,8 @@ exports.startMaster = function(options) {
function handleWorkerMessage(worker, message) {
assert.ok(amMaster);
// This can only be called from the master.
assert(cluster.isMaster);
debug("recv " + JSON.stringify(message));
@ -137,7 +128,34 @@ function handleWorkerMessage(worker, message) {
}
function forkWorker(workerFilename, args) {
cluster.eachWorker = function(cb) {
// This can only be called from the master.
assert(cluster.isMaster);
for (var id in workers) {
if (workers[id]) {
cb(workers[id]);
}
}
};
cluster.workerCount = function() {
var c = 0;
cluster.eachWorker(function() {
c++;
});
return c;
};
cluster.fork = function() {
// This can only be called from the master.
assert(cluster.isMaster);
// Lazily start the master process stuff.
startMaster();
var id = ++ids;
var envCopy = {};
@ -147,9 +165,7 @@ function forkWorker(workerFilename, args) {
envCopy['NODE_WORKER_ID'] = id;
var worker = fork(workerFilename, args, {
env: envCopy
});
var worker = fork(workerFilename, workerArgs, { env: envCopy });
worker.on('message', function(message) {
handleWorkerMessage(worker, message);
@ -158,15 +174,16 @@ function forkWorker(workerFilename, args) {
worker.on('exit', function() {
debug('worker id=' + id + ' died');
delete workers[id];
cluster.emit('death', worker);
});
return worker;
}
exports.startWorker = function() {
assert.ok(!amMaster);
amMaster = false;
// Internal function. Called from src/node.js when worker process starts.
cluster._startWorker = function() {
assert(cluster.isWorker);
workerId = parseInt(process.env.NODE_WORKER_ID);
queryMaster({ cmd: 'online' });
@ -186,7 +203,7 @@ exports.startWorker = function() {
function queryMaster(msg, cb) {
assert.ok(!amMaster);
assert(cluster.isWorker);
debug('send ' + JSON.stringify(msg));
@ -194,7 +211,7 @@ function queryMaster(msg, cb) {
msg._queryId = (++queryIds);
msg._workerId = workerId;
// Store callback for later. Callback called in startWorker.
// Store callback for later. Callback called in _startWorker.
if (cb) {
queryCallbacks[msg._queryId] = cb;
}
@ -204,8 +221,10 @@ function queryMaster(msg, cb) {
}
exports.getServer = function(address, port, addressType, cb) {
assert.ok(!amMaster);
// Internal function. Called by lib/net.js when attempting to bind a
// server.
cluster._getServer = function(address, port, addressType, cb) {
assert(cluster.isWorker);
queryMaster({
cmd: "queryServer",

View File

@ -714,7 +714,7 @@ Server.prototype._listen2 = function(address, port, addressType) {
function listen(self, address, port, addressType) {
if (process.env.NODE_WORKER_ID) {
require('cluster').getServer(address, port, addressType, function(handle) {
require('cluster')._getServer(address, port, addressType, function(handle) {
self._handle = handle;
self._listen2(address, port, addressType);
});

View File

@ -88,7 +88,7 @@
// channel.
if (process.env.NODE_WORKER_ID) {
var cluster = NativeModule.require('cluster');
cluster.startWorker();
cluster._startWorker();
}
var Module = NativeModule.require('module');