child_process: queue pending messages

It fixes the problem of the child process not receiving messages.

Fixes: https://github.com/nodejs/node/issues/41134

PR-URL: https://github.com/nodejs/node/pull/41221
Reviewed-By: Adrian Estrada <edsadr@gmail.com>
Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
Reviewed-By: Minwoo Jung <nodecorelab@gmail.com>
This commit is contained in:
Erick Wendel 2021-12-17 12:04:29 -03:00 committed by Rich Trott
parent 0465373d77
commit de12141dd0
2 changed files with 49 additions and 1 deletions

View File

@ -81,6 +81,7 @@ let HTTPParser;
const MAX_HANDLE_RETRANSMISSIONS = 3;
const kChannelHandle = Symbol('kChannelHandle');
const kIsUsedAsStdio = Symbol('kIsUsedAsStdio');
const kPendingMessages = Symbol('kPendingMessages');
// This object contain function to convert TCP objects to native handle objects
// and back again.
@ -526,6 +527,7 @@ class Control extends EventEmitter {
constructor(channel) {
super();
this.#channel = channel;
this[kPendingMessages] = [];
}
// The methods keeping track of the counter are being used to track the
@ -699,6 +701,24 @@ function setupChannel(target, channel, serializationMode) {
});
});
target.on('newListener', function() {
process.nextTick(() => {
if (!target.channel || !target.listenerCount('message'))
return;
const messages = target.channel[kPendingMessages];
const { length } = messages;
if (!length) return;
for (let i = 0; i < length; i++) {
ReflectApply(target.emit, target, messages[i]);
}
target.channel[kPendingMessages] = [];
});
});
target.send = function(message, handle, options, callback) {
if (typeof handle === 'function') {
callback = handle;
@ -912,7 +932,15 @@ function setupChannel(target, channel, serializationMode) {
};
function emit(event, message, handle) {
target.emit(event, message, handle);
if ('internalMessage' === event || target.listenerCount('message')) {
target.emit(event, message, handle);
return;
}
ArrayPrototypePush(
target.channel[kPendingMessages],
[event, message, handle]
);
}
function handleMessage(message, handle, internal) {

View File

@ -0,0 +1,20 @@
import '../common/index.mjs';
import assert from 'assert';
import { fork } from 'child_process';
import { once } from 'events';
import { fileURLToPath } from 'url';
if (process.argv[2] !== 'child') {
const filename = fileURLToPath(import.meta.url);
const cp = fork(filename, ['child']);
const message = 'Hello World';
cp.send(message);
const [received] = await once(cp, 'message');
assert.deepStrictEqual(received, message);
cp.disconnect();
await once(cp, 'exit');
} else {
process.on('message', (msg) => process.send(msg));
}