cluster improvements: Worker class and isolate internal messages

Fixes #2388
This commit is contained in:
Andreas Madsen 2011-12-20 10:42:48 +01:00 committed by Ryan Dahl
parent e21643d618
commit 5f08c3cfa1
11 changed files with 948 additions and 185 deletions

View File

@ -229,6 +229,12 @@ And then the child script, `'sub.js'` might look like this:
In the child the `process` object will have a `send()` method, and `process`
will emit objects each time it receives a message on its channel.
There is a special case when seinding a `{cmd: 'NODE_foo'}` message. All messages
containging a `NODE_` prefix in its `cmd` property will not be emitted in
the `message` event, since this are internal messages used by node core.
Messages contain the prefix are emitted in the `internalMessage` event, you
should by all means avoid using this feature, it may change without warranty.
By default the spawned Node process will have the stdout, stderr associated
with the parent's. To change this behavior set the `silent` property in the
`options` object to `true`.

View File

@ -21,8 +21,9 @@ all share server ports.
console.log('worker ' + worker.pid + ' died');
});
} else {
// Worker processes have a http server.
http.Server(function(req, res) {
// Workers can share any TCP connection
// In this case its a HTTP server
http.createServer(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
}).listen(8000);
@ -34,56 +35,61 @@ Running node will now share port 8000 between the workers:
Worker 2438 online
Worker 2437 online
The difference between `cluster.fork()` and `child_process.fork()` is simply
that cluster allows TCP servers to be shared between workers. `cluster.fork`
is implemented on top of `child_process.fork`. The message passing API that
is available with `child_process.fork` is available with `cluster` as well.
As an example, here is a cluster which keeps count of the number of requests
in the master process via message passing:
var cluster = require('cluster');
var http = require('http');
var numReqs = 0;
if (cluster.isMaster) {
// Fork workers.
for (var i = 0; i < 2; i++) {
var worker = cluster.fork();
worker.on('message', function(msg) {
if (msg.cmd && msg.cmd == 'notifyRequest') {
numReqs++;
}
});
}
setInterval(function() {
console.log("numReqs =", numReqs);
}, 1000);
} else {
// Worker processes have a http server.
http.Server(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
// Send message to master process
process.send({ cmd: 'notifyRequest' });
}).listen(8000);
}
### cluster.fork([env])
Spawn a new worker process. This can only be called from the master process.
The function takes an optional `env` object. The propertyies in this object
will be added to the process environment in the worker.
### cluster.isMaster
This boolean flag is true if the process is a master. This is determined
by the `process.env.NODE_UNIQUE_ID`. If `process.env.NODE_UNIQUE_ID` is
undefined `isMaster` is `true`.
### cluster.isWorker
Boolean flags to determine if the current process is a master or a worker
process in a cluster. A process `isMaster` if `process.env.NODE_WORKER_ID`
is undefined.
This boolean flag is true if the process is a worker forked from a master.
If the `process.env.NODE_UNIQUE_ID` is set to a value different efined
`isWorker` is `true`.
### Event: 'fork'
When a new worker is forked the cluster module will emit a 'fork' event.
This can be used to log worker activity, and create you own timeout.
var timeouts = [];
var errorMsg = function () {
console.error("Something must be wrong with the connection ...");
});
cluster.on('fork', function (worker) {
timeouts[worker.uniqueID] = setTimeout(errorMsg, 2000);
});
cluster.on('listening', function (worker) {
clearTimeout(timeouts[worker.uniqueID]);
});
cluster.on('death', function (worker) {
clearTimeout(timeouts[worker.uniqueID]);
errorMsg();
});
### Event: 'online'
After forking a new worker, the worker should respond with a online message.
When the master receives a online message it will emit such event.
The difference between 'fork' and 'online' is that fork is emitted when the
master tries to fork a worker, and 'online' is emitted when the worker is being
executed.
cluster.on('online', function (worker) {
console.log("Yay, the worker responded after it was forked");
});
### Event: 'listening'
When calling `listen()` from a worker, a 'listening' event is automatically assigned
to the server instance. When the server is listening a message is send to the master
where the 'listening' event is emitted.
cluster.on('listening', function (worker) {
console.log("We are now connected");
});
### Event: 'death'
@ -95,5 +101,155 @@ This can be used to restart the worker by calling `fork()` again.
cluster.fork();
});
Different techniques can be used to restart the worker depending on the
application.
### cluster.fork([env])
Spawn a new worker process. This can only be called from the master process.
The function takes an optional `env` object. The properties in this object
will be added to the process environment in the worker.
### cluster.workers
In the cluster all living worker objects are stored in this object by there
`uniqueID` as the key. This makes it easy to loop thouge all liveing workers.
// Go througe all workers
function eachWorker(callback) {
for (var uniqueID in cluster.workers) {
callback(cluster.workers[uniqueID]);
}
}
eachWorker(function (worker) {
worker.send('big announcement to all workers');
});
Should you wich to reference a worker over a communication channel this unsing
there `uniqueID` this is also the easies way to find the worker.
socket.on('data', function (uniqueID) {
var worker = cluster.workers[uniqueID];
});
## Worker
This object contains all public information and method about a worker.
In the master it can be obtainedusing `cluster.workers`. In a worker
it can be obtained ained using `cluster.worker`.
### Worker.uniqueID
Each new worker is given its own unique id, this id i stored in the `uniqueID`.
### Worker.process
All workers are created using `child_process.fork()`, the returned object from this
function is stored in process.
### Worker.send(message, [sendHandle])
This function is equal to the send methods provided by `child_process.fork()`.
In the master you should use this function to send a message to a specific worker.
However in a worker you can also use `process.send(message)`, since this is the same
function.
This example will echo back all messages from the master:
if (cluster.isMaster) {
var worker = cluster.fork();
worker.send('hi there');
} else if (cluster.isWorker) {
process.on('message', function (msg) {
process.send(msg);
});
}
### Worker.destroy()
This function will kill the worker, and inform the master to not spawn a new worker.
To know the difference between suicide and accidently death a suicide boolean is set to true.
cluster.on('death', function (worker) {
if (worker.suicide === true) {
console.log('Oh, it was just suicide' no need to worry').
}
});
// destroy worker
worker.destroy();
### Worker.suicide
This property is a boolean. It is set when a worker dies, until then it is `undefined`.
It is true if the worker was killed using the `.destroy()` method, and false otherwise.
### Event: message
This event is the same as the one provided by `child_process.fork()`.
In the master you should use this event, however in a worker you can also use
`process.on('message')`
As an example, here is a cluster that keeps count of the number of requests
in the master process using the message system:
var cluster = require('cluster');
var http = require('http');
if (cluster.isMaster) {
// Keep track of http requests
var numReqs = 0;
setInterval(function() {
console.log("numReqs =", numReqs);
}, 1000);
// Count requestes
var messageHandler = function (msg) {
if (msg.cmd && msg.cmd == 'notifyRequest') {
numReqs += 1;
}
};
// Start workers and listen for messages containing notifyRequest
cluster.autoFork();
Object.keys(cluster.workers).forEach(function (uniqueID) {
cluster.workers[uniqueID].on('message', messageHandler);
});
} else {
// Worker processes have a http server.
http.Server(function(req, res) {
res.writeHead(200);
res.end("hello world\n");
// notify master about the request
process.send({ cmd: 'notifyRequest' });
}).listen(8000);
}
### Event: online
Same as the `cluster.on('online')` event, but emits only when the state change
on the specified worker.
cluster.fork().on('online', function (worker) {
// Worker is online
};
### Event: listening
Same as the `cluster.on('listening')` event, but emits only when the state change
on the specified worker.
cluster.fork().on('listening', function (worker) {
// Worker is listening
};
### Event: death
Same as the `cluster.on('death')` event, but emits only when the state change
on the specified worker.
cluster.fork().on('death', function (worker) {
// Worker has died
};

View File

@ -95,11 +95,25 @@ function setupChannel(target, channel) {
jsonBuffer += pool.toString('ascii', offset, offset + length);
var i, start = 0;
//Linebreak is used as a message end sign
while ((i = jsonBuffer.indexOf('\n', start)) >= 0) {
var json = jsonBuffer.slice(start, i);
var message = JSON.parse(json);
target.emit('message', message, recvHandle);
//Filter out internal messages
//if cmd property begin with "_NODE"
if (message !== null &&
typeof message === 'object' &&
typeof message.cmd === 'string' &&
message.cmd.indexOf('NODE_') === 0) {
target.emit('inernalMessage', message, recvHandle);
}
//Non-internal message
else {
target.emit('message', message, recvHandle);
}
start = i + 1;
}
jsonBuffer = jsonBuffer.slice(start);

View File

@ -23,22 +23,24 @@ var assert = require('assert');
var fork = require('child_process').fork;
var net = require('net');
var EventEmitter = require('events').EventEmitter;
var util = require('util');
function isObject(o) {
return (typeof o === 'object' && o !== null);
}
function extendObject(origin, add) {
// Don't do anything if add isn't an object
if (!add) return origin;
var keys = Object.keys(add),
i = keys.length;
while(i--) {
while (i--) {
origin[keys[i]] = add[keys[i]];
}
return origin;
}
var cluster = module.exports = new EventEmitter();
var debug;
if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
debug = function(x) {
@ -50,23 +52,42 @@ if (process.env.NODE_DEBUG && /cluster/.test(process.env.NODE_DEBUG)) {
debug = function() { };
}
// cluster object:
function cluster() {}
util.inherits(cluster, EventEmitter);
var cluster = module.exports = new cluster();
// Used in the master:
var masterStarted = false;
var ids = 0;
var workers = [];
var servers = {};
var serverHandlers = {};
var workerFilename;
var workerArgs;
// Used in the worker:
var workerId = 0;
var serverLisenters = {};
var queryIds = 0;
var queryCallbacks = {};
cluster.isWorker = 'NODE_WORKER_ID' in process.env;
// Define isWorker and isMaster
cluster.isWorker = 'NODE_UNIQUE_ID' in process.env;
cluster.isMaster = ! cluster.isWorker;
// The worker object is only used in a worker
cluster.worker = cluster.isWorker ? {} : null;
// The workers array is oly used in the naster
cluster.workers = cluster.isMaster ? {} : null;
// Simple function there call a function on each worker
function eachWorker(cb) {
// Go througe all workers
for (var id in cluster.workers) {
if (cluster.workers.hasOwnProperty(id)) {
cb(cluster.workers[id]);
}
}
}
// Call this from the master process. It will start child workers.
//
// options.workerFilename
@ -90,155 +111,375 @@ function startMaster() {
workerArgs = process.argv.slice(2);
process.on('uncaughtException', function(e) {
// Quickly try to kill all the workers.
// TODO: be session leader - will cause auto SIGHUP to the children.
eachWorker(function(worker) {
debug('kill worker ' + worker.pid);
worker.kill();
});
console.error('Exception in cluster master process: ' +
e.message + '\n' + e.stack);
quickDestroyCluster();
process.exit(1);
});
}
// Check if a message is internal only
var INTERNAL_PREFIX = 'NODE_CLUTER_';
function isInternalMessage(message) {
return (isObject(message) &&
typeof message.cmd === 'string' &&
message.cmd.indexOf(INTERNAL_PREFIX) === 0);
}
function handleWorkerMessage(worker, message) {
// This can only be called from the master.
assert(cluster.isMaster);
// Modyfi message object to be internal
function internalMessage(inMessage) {
var outMessage = extendObject({}, inMessage);
debug('recv ' + JSON.stringify(message));
// Add internal prefix to cmd
outMessage.cmd = INTERNAL_PREFIX + (outMessage.cmd || '');
switch (message.cmd) {
case 'online':
debug('Worker ' + worker.pid + ' online');
worker.online = true;
break;
return outMessage;
}
case 'queryServer':
var key = message.address + ':' +
message.port + ':' +
message.addressType;
var response = { _queryId: message._queryId };
// Handle callback messges
function handleResponse(outMessage, outHandle, inMessage, inHandle, worker) {
if (!(key in servers)) {
// Create a new server.
debug('create new server ' + key);
servers[key] = net._createServerHandle(message.address,
message.port,
message.addressType);
}
worker.send(response, servers[key]);
break;
// The message there will be send
var message = internalMessage(outMessage);
default:
// Ignore.
break;
// callback id - will be undefined if not set
message._queryEcho = inMessage._requestEcho;
// Call callback if a query echo is received
if (inMessage.hasOwnProperty('_queryEcho')) {
queryCallbacks[inMessage._queryEcho](inMessage.content, inHandle);
delete queryCallbacks[inMessage._queryEcho];
}
// Send if outWrap do contain something useful
if (!(outMessage === undefined && message._queryEcho === undefined)) {
sendInternalMessage(worker, message, outHandle);
}
}
// Handle messages from both master and workers
var messageHandingObject = {};
function handleMessage(inMessage, inHandle, worker) {
function eachWorker(cb) {
// This can only be called from the master.
assert(cluster.isMaster);
//Remove internal prefix
var message = extendObject({}, inMessage);
message.cmd = inMessage.cmd.substr(INTERNAL_PREFIX.length);
for (var id in workers) {
if (workers[id]) {
cb(workers[id]);
var respondUsed = false;
var respond = function(outMessage, outHandler) {
respondUsed = true;
handleResponse(outMessage, outHandler, inMessage, inHandle, worker);
};
// Run handler if it exist
if (messageHandingObject[message.cmd]) {
messageHandingObject[message.cmd](message, worker, respond);
}
// Send respond if it wasn't done
if (respondUsed === false) {
respond();
}
}
// Messages to the master will be handled using this methods
if (cluster.isMaster) {
// Handle online messages from workers
messageHandingObject.online = function(message, worker) {
worker.state = 'online';
debug('Worker ' + worker.process.pid + ' online');
worker.emit('online', worker);
cluster.emit('online', worker);
};
// Handle queryServer messages form workers
messageHandingObject.queryServer = function(message, worker, send) {
// This sequence of infomation is unique to the connection but not
// to the worker
var args = [message.address, message.port, message.addressType];
var key = args.join(':');
var handler;
if (serverHandlers.hasOwnProperty(key)) {
handler = serverHandlers[key];
} else {
handler = serverHandlers[key] = net._createServerHandle.apply(net, args);
}
// echo callback with the fd handler associated with it
send({}, handler);
};
// Handle listening messages from workers
messageHandingObject.listening = function(message, worker) {
worker.state = 'listening';
// Emit listining, now that we know the worker is listning
worker.emit('listening', worker, {
address: message.address,
port: message.port,
addressType: message.addressType
});
cluster.emit('listening', worker, {
address: message.address,
port: message.port,
addressType: message.addressType
});
};
// Handle suicide messages from workers
messageHandingObject.suicide = function(message, worker) {
worker.suicide = true;
};
}
// Messages to a worker will be handled using this methods
else if (cluster.isWorker) {
// TODO: the disconnect step will use this
}
function toDecInt(value) {
value = parseInt(value, 10);
return isNaN(value) ? null : value;
}
// Create a worker object, there works both for master and worker
function Worker(customEnv) {
if (!(this instanceof Worker)) return new Worker();
var self = this;
var env = process.env;
// Assign uniqueID, default null
this.uniqueID = cluster.isMaster ? ++ids : toDecInt(env.NODE_UNIQUE_ID);
// Assign state
this.state = 'none';
// Create or get process
if (cluster.isMaster) {
// Create env object
// first: copy and add uniqueID
var envCopy = extendObject({}, env);
envCopy['NODE_UNIQUE_ID'] = this.uniqueID;
// second: extend envCopy with the env argument
if (isObject(customEnv)) {
envCopy = extendObject(envCopy, customEnv);
}
// fork worker
this.process = fork(workerFilename, workerArgs, {
'env': envCopy
});
} else {
this.process = process;
}
if (cluster.isMaster) {
// Save worker in the cluster.workers array
cluster.workers[this.uniqueID] = this;
// Emit a fork event, on next tick
// There is no worker.fork event since this has no real purpose
process.nextTick(function() {
cluster.emit('fork', self);
});
}
// Internal message: handle message
this.process.on('inernalMessage', function(message, handle) {
debug('recived: ', message);
// relay to handleMessage
handleMessage(message, handle, self);
return;
});
// Non-internal message: relay to Worker object
this.process.on('message', function(message, handle) {
self.emit('message', message, handle);
});
// Handle exit
self.process.on('exit', function() {
debug('worker id=' + self.uniqueID + ' died');
// Prepare worker to die and emit events
prepareDeath(self, 'dead', 'death');
});
}
util.inherits(Worker, EventEmitter);
cluster.Worker = Worker;
function prepareDeath(worker, state, eventName) {
// set state to disconnect
worker.state = state;
// Make suicide a boolean
worker.suicide = !!worker.suicide;
// Remove from workers in the master
if (cluster.isMaster) {
delete cluster.workers[worker.uniqueID];
}
// Emit events
worker.emit(eventName, worker);
cluster.emit(eventName, worker);
}
// Send internal message
function sendInternalMessage(worker, message/*, handler, callback*/) {
// Exist callback
var callback = arguments[arguments.length - 1];
if (typeof callback !== 'function') {
callback = undefined;
}
// exist handler
var handler = arguments[2] !== callback ? arguments[2] : undefined;
if (!isInternalMessage(message)) {
message = internalMessage(message);
}
// Store callback for later
if (callback) {
message._requestEcho = worker.uniqueID + ':' + (++queryIds);
queryCallbacks[message._requestEcho] = callback;
}
worker.send(message, handler);
}
// Send message to worker or master
Worker.prototype.send = function() {
//You could also just use process.send in a worker
this.process.send.apply(this.process, arguments);
};
function closeWorkerChannel(worker, callback) {
//Apparently the .close method is async, but do not have a callback
worker.process._channel.close();
worker.process._channel = null;
process.nextTick(callback);
}
// Kill the worker without restarting
Worker.prototype.destroy = function() {
var self = this;
this.suicide = true;
if (cluster.isMaster) {
// Stop channel
// this way the worker won't need to propagate suicide state to master
closeWorkerChannel(this, function() {
// Then kill worker
self.process.kill();
});
} else {
// Channel is open
if (this.process._channel !== null) {
// Inform master that is is suicide and then kill
sendInternalMessage(this, {cmd: 'suicide'}, function() {
// Kill worker
process.exit(0);
});
// When master do a quickDestroy the channel is not necesarily closed
// at the point this function runs. For that reason we need to keep
// checking that the channel is still open, until a actually callback
// from the master is resicved. Also we can't do a timeout and then
// just kill, since we don't know if the quickDestroy function was called.
setInterval(function() {
if (self.process._channel === null) {
process.exit(0);
}
}, 200);
} else {
process.exit(0);
}
}
}
};
// Fork a new worker
cluster.fork = function(env) {
// This can only be called from the master.
assert(cluster.isMaster);
// Lazily start the master process stuff.
// Make sure that the master has been initalized
startMaster();
var id = ++ids;
//Create env object
var envCopy = extendObject({}, process.env);
envCopy['NODE_WORKER_ID'] = id;
if (isObject(env)) {
envCopy = extendObject(envCopy, env);
}
//fork worker
var worker = fork(workerFilename, workerArgs, {
'env': envCopy
});
workers[id] = worker;
worker.on('message', function(message) {
handleWorkerMessage(worker, message);
});
worker.on('exit', function() {
debug('worker id=' + id + ' died');
delete workers[id];
cluster.emit('death', worker);
});
return worker;
return (new cluster.Worker(env));
};
// Internal function. Called from src/node.js when worker process starts.
cluster._startWorker = function() {
assert(cluster.isWorker);
workerId = parseInt(process.env.NODE_WORKER_ID, 10);
queryMaster({ cmd: 'online' });
// Make callbacks from queryMaster()
process.on('message', function(msg, handle) {
debug('recv ' + JSON.stringify(msg));
if (msg._queryId && msg._queryId in queryCallbacks) {
var cb = queryCallbacks[msg._queryId];
if (typeof cb == 'function') {
cb(msg, handle);
}
delete queryCallbacks[msg._queryId];
}
// Sync way to quickly kill all cluster workers
// However the workers may not die instantly
function quickDestroyCluster() {
eachWorker(function(worker) {
worker.process.kill();
});
};
function queryMaster(msg, cb) {
assert(cluster.isWorker);
debug('send ' + JSON.stringify(msg));
// Grab some random queryId
msg._queryId = (++queryIds);
msg._workerId = workerId;
// Store callback for later. Callback called in _startWorker.
if (cb) {
queryCallbacks[msg._queryId] = cb;
}
// Send message to master.
process.send(msg);
}
// Internal function. Called from src/node.js when worker process starts.
cluster._setupWorker = function() {
// Get worker class
var worker = cluster.worker = new Worker();
// Internal function. Called by lib/net.js when attempting to bind a
// server.
cluster._getServer = function(address, port, addressType, cb) {
// Tell master that the worker is online
worker.state = 'online';
sendInternalMessage(worker, { cmd: 'online' });
};
// Internal function. Called by lib/net.js when attempting to bind a server.
cluster._getServer = function(tcpSelf, address, port, addressType, cb) {
// This can only be called from a worker.
assert(cluster.isWorker);
queryMaster({
// Store tcp instance for later use
var key = [address, port, addressType].join(':');
serverLisenters[key] = tcpSelf;
// Send a listening message to the master
tcpSelf.once('listening', function() {
cluster.worker.state = 'listening';
sendInternalMessage(cluster.worker, {
cmd: 'listening',
address: address,
port: port,
addressType: addressType
});
});
// Request the fd handler from the master process
var message = {
cmd: 'queryServer',
address: address,
port: port,
addressType: addressType
}, function(msg, handle) {
};
// The callback will be stored until the master has responed
sendInternalMessage(cluster.worker, message, function(msg, handle) {
cb(handle);
});
};

View File

@ -750,8 +750,8 @@ Server.prototype._listen2 = function(address, port, addressType) {
function listen(self, address, port, addressType) {
if (process.env.NODE_WORKER_ID) {
require('cluster')._getServer(address, port, addressType, function(handle) {
if (process.env.NODE_UNIQUE_ID) {
require('cluster')._getServer(self, address, port, addressType, function(handle) {
self._handle = handle;
self._listen2(address, port, addressType);
});

View File

@ -86,9 +86,9 @@
// If this is a worker in cluster mode, start up the communiction
// channel.
if (process.env.NODE_WORKER_ID) {
if (process.env.NODE_UNIQUE_ID) {
var cluster = NativeModule.require('cluster');
cluster._startWorker();
cluster._setupWorker();
}
var Module = NativeModule.require('module');

View File

@ -0,0 +1,58 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
//messages
var PREFIX = 'NODE_';
var normal = {cmd: 'foo' + PREFIX};
var internal = {cmd: PREFIX + 'bar'};
if (process.argv[2] === 'child') {
//send non-internal message containing PREFIX at a non prefix position
process.send(normal);
//send inernal message
process.send(internal);
process.exit(0);
} else {
var fork = require('child_process').fork;
var child = fork(process.argv[1], ['child']);
var gotNormal;
child.once('message', function(data) {
gotNormal = data;
});
var gotInternal;
child.once('inernalMessage', function(data) {
gotInternal = data;
});
process.on('exit', function() {
assert.deepEqual(gotNormal, normal);
assert.deepEqual(gotInternal, internal);
});
}

View File

@ -0,0 +1,161 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
function forEach(obj, fn) {
Object.keys(obj).forEach(function(name, index) {
fn(obj[name], name, index);
});
}
if (cluster.isWorker) {
var http = require('http');
http.Server(function() {
}).listen(common.PORT, '127.0.0.1');
}
else if (cluster.isMaster) {
assert.equal('NODE_UNIQUE_ID' in process.env, false,
'cluster.isMaster should not be true when NODE_UNIQUE_ID is set');
var checks = {
cluster: {
events: {
fork: false,
online: false,
listening: false,
death: false
},
equal: {
fork: false,
online: false,
listening: false,
death: false
}
},
worker: {
events: {
online: false,
listening: false,
death: false
},
equal: {
online: false,
listening: false,
death: false
},
states: {
none: false,
online: false,
listening: false,
dead: false
}
}
};
var worker;
var stateNames = Object.keys(checks.worker.states);
//Check events, states, and emit arguments
forEach(checks.cluster.events, function(bool, name, index) {
//Listen on event
cluster.on(name, function(/* worker */) {
//Set event
checks.cluster.events[name] = true;
//Check argument
checks.cluster.equal[name] = worker === arguments[0];
//Check state
var state = stateNames[index];
checks.worker.states[state] = (state === worker.state);
});
});
//Kill worker when listening
cluster.on('listening', function() {
worker.destroy();
});
//Kill process when worker is killed
cluster.on('death', function() {
process.exit(0);
});
//Create worker
worker = cluster.fork();
assert.ok(worker instanceof cluster.Worker,
'the worker is not a instance of the Worker constructor');
//Check event
forEach(checks.worker.events, function(bool, name, index) {
worker.on(name, function() {
//Set event
checks.worker.events[name] = true;
//Check argument
checks.worker.equal[name] = worker === arguments[0];
});
});
//Check all values
process.once('exit', function() {
//Check cluster events
forEach(checks.cluster.events, function(check, name) {
assert.ok(check, 'The cluster event "' + name + '" on the cluster ' +
'object did not fire');
});
//Check cluster event arguments
forEach(checks.cluster.equal, function(check, name) {
assert.ok(check, 'The cluster event "' + name + '" did not emit ' +
'with corrent argument');
});
//Check worker states
forEach(checks.worker.states, function(check, name) {
assert.ok(check, 'The worker state "' + name + '" was not set to true');
});
//Check worker events
forEach(checks.worker.events, function(check, name) {
assert.ok(check, 'The worker event "' + name + '" on the worker object ' +
'did not fire');
});
//Check worker event arguments
forEach(checks.worker.equal, function(check, name) {
assert.ok(check, 'The worker event "' + name + '" did not emit with ' +
'corrent argument');
});
});
}

View File

@ -25,8 +25,7 @@ var assert = require('assert');
var cluster = require('cluster');
if (cluster.isWorker) {
process.send({
testcase: true,
cluster.worker.send({
prop: process.env['cluster_test_prop'],
overwrite: process.env['cluster_test_overwrite']
});
@ -38,23 +37,21 @@ if (cluster.isWorker) {
overwrite: false
};
//To check that the cluster extend on the process.env we will overwrite a
//property
// To check that the cluster extend on the process.env we will overwrite a
// property
process.env['cluster_test_overwrite'] = 'old';
//Fork worker
// Fork worker
var worker = cluster.fork({
'cluster_test_prop': 'custom',
'cluster_test_overwrite': 'new'
});
//Checks worker env
// Checks worker env
worker.on('message', function(data) {
if (data.testcase) {
checks.using = (data.prop === 'custom');
checks.overwrite = (data.overwrite === 'new');
process.exit(0);
}
checks.using = (data.prop === 'custom');
checks.overwrite = (data.overwrite === 'new');
process.exit(0);
});
process.once('exit', function() {

View File

@ -72,7 +72,7 @@ if (isTestRunner) {
// Cluster stuff.
if (cluster.isMaster) {
var worker = cluster.fork();
process.send({ workerPID: worker.pid });
process.send({ workerPID: worker.process.pid });
// should kill the worker too
throw new Error('kill master');
} else {

View File

@ -0,0 +1,130 @@
// Copyright Joyent, Inc. and other Node contributors.
//
// Permission is hereby granted, free of charge, to any person obtaining a
// copy of this software and associated documentation files (the
// "Software"), to deal in the Software without restriction, including
// without limitation the rights to use, copy, modify, merge, publish,
// distribute, sublicense, and/or sell copies of the Software, and to permit
// persons to whom the Software is furnished to do so, subject to the
// following conditions:
//
// The above copyright notice and this permission notice shall be included
// in all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN
// NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE
// USE OR OTHER DEALINGS IN THE SOFTWARE.
var common = require('../common');
var assert = require('assert');
var cluster = require('cluster');
var net = require('net');
function forEach(obj, fn) {
Object.keys(obj).forEach(function(name, index) {
fn(obj[name], name, index);
});
}
if (cluster.isWorker) {
// Create a tcp server
// this will be used as cluster-shared-server
// and as an alternativ IPC channel
var server = net.Server();
server.on('connection', function(socket) {
// Tell master using TCP socket that a message is received
process.on('message', function(message) {
socket.write(JSON.stringify({
code: 'received message',
echo: message
}));
});
process.send('message from worker');
});
server.listen(common.PORT, '127.0.0.1');
}
else if (cluster.isMaster) {
var checks = {
master: {
'receive': false,
'correct': false
},
worker: {
'receive': false,
'correct': false
}
};
var client;
var check = function(type, result) {
checks[type].receive = true;
checks[type].correct = result;
var missing = false;
forEach(checks, function(type) {
if (type.receive === false) missing = true;
});
if (missing === false) {
client.end();
}
};
// Spawn worker
var worker = cluster.fork();
// When a IPC message is resicved form the worker
worker.on('message', function(message) {
check('master', message === 'message from worker');
});
// When a TCP connection is made with the worker connect to it
worker.on('listening', function() {
client = net.connect(common.PORT, function() {
//Send message to worker
worker.send('message from master');
});
client.on('data', function(data) {
// All data is JSON
data = JSON.parse(data.toString());
if (data.code === 'received message') {
check('worker', data.echo === 'message from master');
} else {
throw new Error('worng TCP message recived: ' + data);
}
});
// When the connection ends kill worker and shutdown process
client.on('end', function() {
worker.destroy();
});
worker.on('death', function() {
process.exit(0);
});
});
process.once('exit', function() {
forEach(checks, function(check, type) {
assert.ok(check.receive, 'The ' + type + ' did not receive any message');
assert.ok(check.correct,
'The ' + type + ' did not get the correct message');
});
});
}