feat(unstable): Ability to ref/unref "Child" in "Deno.spawnChild()" API (#15151)

Co-authored-by: Bartek Iwańczuk <biwanczuk@gmail.com>
Co-authored-by: Colin Ihrig <cjihrig@gmail.com>
This commit is contained in:
Leo Kettmeir 2022-07-18 22:24:35 +02:00 committed by GitHub
parent 9eb70bdb5f
commit 2bebdc9116
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 98 additions and 7 deletions

View File

@ -1176,6 +1176,9 @@ declare namespace Deno {
output(): Promise<SpawnOutput>;
/** Kills the process with given Signal. Defaults to SIGTERM. */
kill(signo?: Signal): void;
ref(): void;
unref(): void;
}
/**

View File

@ -533,7 +533,7 @@ Deno.test(
Deno.test(
{ permissions: { run: true, read: true } },
function spawnEnv() {
function spawnSyncEnv() {
const { stdout } = Deno.spawnSync(Deno.execPath(), {
args: [
"eval",
@ -712,3 +712,48 @@ Deno.test(function spawnSyncStdinPipedFails() {
"Piped stdin is not supported for this function, use 'Deno.spawnChild()' instead",
);
});
Deno.test(
{ permissions: { write: true, run: true, read: true } },
async function spawnChildUnref() {
const enc = new TextEncoder();
const cwd = await Deno.makeTempDir({ prefix: "deno_command_test" });
const programFile = "unref.ts";
const program = `
const child = await Deno.spawnChild(Deno.execPath(), {
cwd: Deno.args[0],
args: ["run", "-A", "--unstable", Deno.args[1]],
});
console.log("spawned pid", child.pid);
child.unref();
`;
const childProgramFile = "unref_child.ts";
const childProgram = `
setInterval(() => {
console.log("hello from interval");
}, 100);
`;
Deno.writeFileSync(`${cwd}/${programFile}`, enc.encode(program));
Deno.writeFileSync(`${cwd}/${childProgramFile}`, enc.encode(childProgram));
// In this subprocess we are spawning another subprocess which has
// an infite interval set. Following call would never resolve unless
// child process gets unrefed.
const { success, stdout } = await Deno.spawn(Deno.execPath(), {
cwd,
args: ["run", "-A", "--unstable", programFile, cwd, childProgramFile],
});
assert(success);
const stdoutText = new TextDecoder().decode(stdout);
const pidStr = stdoutText.split(" ").at(-1);
assert(pidStr);
const pid = Number.parseInt(pidStr, 10);
await Deno.remove(cwd, { recursive: true });
// Child process should have been killed when parent process exits.
assertThrows(() => {
Deno.kill(pid, "SIGTERM");
}, Deno.errors.NotFound);
},
);

View File

@ -647,13 +647,28 @@
const DEFAULT_CHUNK_SIZE = 64 * 1024; // 64 KiB
function readableStreamForRid(rid) {
/**
* @callback unrefCallback
* @param {Promise} promise
* @returns {undefined}
*/
/**
* @param {number} rid
* @param {unrefCallback=} unrefCallback
* @returns {ReadableStream<Uint8Array>}
*/
function readableStreamForRid(rid, unrefCallback) {
const stream = new ReadableStream({
type: "bytes",
async pull(controller) {
const v = controller.byobRequest.view;
try {
const bytesRead = await core.read(rid, v);
const promise = core.read(rid, v);
unrefCallback?.(promise);
const bytesRead = await promise;
if (bytesRead === 0) {
core.tryClose(rid);
controller.close();

View File

@ -13,10 +13,13 @@
TypeError,
Uint8Array,
PromiseAll,
SymbolFor,
} = window.__bootstrap.primordials;
const { readableStreamForRid, writableStreamForRid } =
window.__bootstrap.streamUtils;
const promiseIdSymbol = SymbolFor("Deno.core.internalPromiseId");
function spawnChild(command, {
args = [],
cwd = undefined,
@ -71,6 +74,7 @@
class Child {
#rid;
#waitPromiseId;
#pid;
get pid() {
@ -85,6 +89,8 @@
return this.#stdin;
}
#stdoutPromiseId;
#stdoutRid;
#stdout = null;
get stdout() {
if (this.#stdout == null) {
@ -93,6 +99,8 @@
return this.#stdout;
}
#stderrPromiseId;
#stderrRid;
#stderr = null;
get stderr() {
if (this.#stderr == null) {
@ -121,17 +129,25 @@
}
if (stdoutRid !== null) {
this.#stdout = readableStreamForRid(stdoutRid);
this.#stdoutRid = stdoutRid;
this.#stdout = readableStreamForRid(stdoutRid, (promise) => {
this.#stdoutPromiseId = promise[promiseIdSymbol];
});
}
if (stderrRid !== null) {
this.#stderr = readableStreamForRid(stderrRid);
this.#stderrRid = stderrRid;
this.#stderr = readableStreamForRid(stderrRid, (promise) => {
this.#stderrPromiseId = promise[promiseIdSymbol];
});
}
const onAbort = () => this.kill("SIGTERM");
signal?.[add](onAbort);
this.#status = core.opAsync("op_spawn_wait", this.#rid).then((res) => {
const waitPromise = core.opAsync("op_spawn_wait", this.#rid);
this.#waitPromiseId = waitPromise[promiseIdSymbol];
this.#status = waitPromise.then((res) => {
this.#rid = null;
signal?.[remove](onAbort);
return res;
@ -186,6 +202,18 @@
}
core.opSync("op_kill", this.#pid, signo);
}
ref() {
core.refOp(this.#waitPromiseId);
if (this.#stdoutPromiseId) core.refOp(this.#stdoutPromiseId);
if (this.#stderrPromiseId) core.refOp(this.#stderrPromiseId);
}
unref() {
core.unrefOp(this.#waitPromiseId);
if (this.#stdoutPromiseId) core.unrefOp(this.#stdoutPromiseId);
if (this.#stderrPromiseId) core.unrefOp(this.#stderrPromiseId);
}
}
function spawn(command, options) {

View File

@ -76,7 +76,7 @@ pub struct ChildStatus {
signal: Option<String>,
}
impl TryFrom<std::process::ExitStatus> for ChildStatus {
impl TryFrom<ExitStatus> for ChildStatus {
type Error = AnyError;
fn try_from(status: ExitStatus) -> Result<Self, Self::Error> {