fix(node): Create additional pipes for child processes (#25016)

Linux/macos only currently.

Part of https://github.com/denoland/deno/issues/23524 (fixes it on
platforms other than windows).
Part of #16899  (fixes it on platforms other than windows).

After this PR, playwright is functional on mac/linux.
This commit is contained in:
Nathan Whitaker 2024-08-15 09:38:46 -07:00 committed by GitHub
parent 7ca95fc999
commit 8749d651fb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
17 changed files with 785 additions and 304 deletions

4
Cargo.lock generated
View File

@ -1658,13 +1658,17 @@ dependencies = [
"deno_core",
"filetime",
"fs3",
"libc",
"log",
"once_cell",
"os_pipe",
"parking_lot 0.12.3",
"pin-project",
"rand",
"tokio",
"uuid",
"winapi",
"windows-sys 0.52.0",
]
[[package]]

View File

@ -20,12 +20,16 @@ filetime.workspace = true
fs3.workspace = true
log.workspace = true
once_cell.workspace = true
pin-project.workspace = true
tokio.workspace = true
uuid.workspace = true
[target.'cfg(not(windows))'.dependencies]
os_pipe.workspace = true
libc.workspace = true
[target.'cfg(windows)'.dependencies]
winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] }
rand.workspace = true
parking_lot.workspace = true
windows-sys.workspace = true

433
ext/io/bi_pipe.rs Normal file
View File

@ -0,0 +1,433 @@
// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license.
use std::rc::Rc;
use deno_core::error::AnyError;
use deno_core::AsyncRefCell;
use deno_core::AsyncResult;
use deno_core::CancelHandle;
use deno_core::CancelTryFuture;
use deno_core::RcRef;
use tokio::io::AsyncReadExt;
use tokio::io::AsyncWriteExt;
#[cfg(unix)]
pub type RawBiPipeHandle = std::os::fd::RawFd;
#[cfg(windows)]
pub type RawBiPipeHandle = std::os::windows::io::RawHandle;
/// One end of a bidirectional pipe. This implements the
/// `Resource` trait.
pub struct BiPipeResource {
read_half: AsyncRefCell<BiPipeRead>,
write_half: AsyncRefCell<BiPipeWrite>,
cancel: CancelHandle,
raw_handle: RawBiPipeHandle,
}
#[cfg(windows)]
// workaround because `RawHandle` doesn't impl `AsRawHandle`
mod as_raw_handle {
use super::RawBiPipeHandle;
pub(super) struct RawHandleWrap(pub(super) RawBiPipeHandle);
impl std::os::windows::io::AsRawHandle for RawHandleWrap {
fn as_raw_handle(&self) -> std::os::windows::prelude::RawHandle {
self.0
}
}
}
impl deno_core::Resource for BiPipeResource {
fn close(self: Rc<Self>) {
self.cancel.cancel();
}
fn backing_handle(self: Rc<Self>) -> Option<deno_core::ResourceHandle> {
#[cfg(unix)]
{
Some(deno_core::ResourceHandle::from_fd_like(&self.raw_handle))
}
#[cfg(windows)]
{
Some(deno_core::ResourceHandle::from_fd_like(
&as_raw_handle::RawHandleWrap(self.raw_handle),
))
}
}
deno_core::impl_readable_byob!();
deno_core::impl_writable!();
}
impl BiPipeResource {
pub fn from_raw_handle(raw: RawBiPipeHandle) -> Result<Self, std::io::Error> {
let pipe = BiPipe::from_raw(raw)?;
let (read, write) = pipe.split();
Ok(Self {
raw_handle: raw,
read_half: AsyncRefCell::new(read),
write_half: AsyncRefCell::new(write),
cancel: Default::default(),
})
}
pub async fn read(
self: Rc<Self>,
data: &mut [u8],
) -> Result<usize, AnyError> {
let mut rd = RcRef::map(&self, |r| &r.read_half).borrow_mut().await;
let cancel_handle = RcRef::map(&self, |r| &r.cancel);
Ok(rd.read(data).try_or_cancel(cancel_handle).await?)
}
pub async fn write(self: Rc<Self>, data: &[u8]) -> Result<usize, AnyError> {
let mut wr = RcRef::map(self, |r| &r.write_half).borrow_mut().await;
let nwritten = wr.write(data).await?;
wr.flush().await?;
Ok(nwritten)
}
}
/// One end of a bidirectional pipe
#[pin_project::pin_project]
pub struct BiPipe {
#[pin]
read_end: BiPipeRead,
#[pin]
write_end: BiPipeWrite,
}
impl BiPipe {
pub fn from_raw(raw: RawBiPipeHandle) -> Result<Self, std::io::Error> {
let (read_end, write_end) = from_raw(raw)?;
Ok(Self {
read_end,
write_end,
})
}
pub fn split(self) -> (BiPipeRead, BiPipeWrite) {
(self.read_end, self.write_end)
}
pub fn unsplit(read_end: BiPipeRead, write_end: BiPipeWrite) -> Self {
Self {
read_end,
write_end,
}
}
}
#[pin_project::pin_project]
pub struct BiPipeRead {
#[cfg(unix)]
#[pin]
inner: tokio::net::unix::OwnedReadHalf,
#[cfg(windows)]
#[pin]
inner: tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>,
}
#[cfg(unix)]
impl From<tokio::net::unix::OwnedReadHalf> for BiPipeRead {
fn from(value: tokio::net::unix::OwnedReadHalf) -> Self {
Self { inner: value }
}
}
#[cfg(windows)]
impl From<tokio::io::ReadHalf<tokio::net::windows::named_pipe::NamedPipeClient>>
for BiPipeRead
{
fn from(
value: tokio::io::ReadHalf<
tokio::net::windows::named_pipe::NamedPipeClient,
>,
) -> Self {
Self { inner: value }
}
}
#[pin_project::pin_project]
pub struct BiPipeWrite {
#[cfg(unix)]
#[pin]
inner: tokio::net::unix::OwnedWriteHalf,
#[cfg(windows)]
#[pin]
inner: tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>,
}
#[cfg(unix)]
impl From<tokio::net::unix::OwnedWriteHalf> for BiPipeWrite {
fn from(value: tokio::net::unix::OwnedWriteHalf) -> Self {
Self { inner: value }
}
}
#[cfg(windows)]
impl
From<tokio::io::WriteHalf<tokio::net::windows::named_pipe::NamedPipeClient>>
for BiPipeWrite
{
fn from(
value: tokio::io::WriteHalf<
tokio::net::windows::named_pipe::NamedPipeClient,
>,
) -> Self {
Self { inner: value }
}
}
#[cfg(unix)]
fn from_raw(
stream: RawBiPipeHandle,
) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> {
use std::os::fd::FromRawFd;
// Safety: The fd is part of a pair of connected sockets
let unix_stream = tokio::net::UnixStream::from_std(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(stream)
})?;
let (read, write) = unix_stream.into_split();
Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write }))
}
#[cfg(windows)]
fn from_raw(
handle: RawBiPipeHandle,
) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> {
// Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
// fd handle map will be the same.
let pipe = unsafe {
tokio::net::windows::named_pipe::NamedPipeClient::from_raw_handle(
handle as _,
)?
};
let (read, write) = tokio::io::split(pipe);
Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write }))
}
impl tokio::io::AsyncRead for BiPipeRead {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.project().inner.poll_read(cx, buf)
}
}
impl tokio::io::AsyncRead for BiPipe {
fn poll_read(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut tokio::io::ReadBuf<'_>,
) -> std::task::Poll<std::io::Result<()>> {
self.project().read_end.poll_read(cx, buf)
}
}
// implement `AsyncWrite` for `$name`, delegating
// the impl to `$field`. `$name` must have a `project` method
// with a projected `$field` (e.g. with `pin_project::pin_project`)
macro_rules! impl_async_write {
(for $name: ident -> self.$field: ident) => {
impl tokio::io::AsyncWrite for $name {
fn poll_write_vectored(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
bufs: &[std::io::IoSlice<'_>],
) -> std::task::Poll<Result<usize, std::io::Error>> {
self.project().$field.poll_write_vectored(cx, bufs)
}
fn is_write_vectored(&self) -> bool {
self.$field.is_write_vectored()
}
fn poll_write(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &[u8],
) -> std::task::Poll<Result<usize, std::io::Error>> {
self.project().$field.poll_write(cx, buf)
}
fn poll_flush(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().$field.poll_flush(cx)
}
fn poll_shutdown(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), std::io::Error>> {
self.project().$field.poll_shutdown(cx)
}
}
};
}
impl_async_write!(for BiPipeWrite -> self.inner);
impl_async_write!(for BiPipe -> self.write_end);
/// Creates both sides of a bidirectional pipe, returning the raw
/// handles to the underlying OS resources.
pub fn bi_pipe_pair_raw() -> Result<(RawBiPipeHandle, RawBiPipeHandle), AnyError>
{
#[cfg(unix)]
{
// SockFlag is broken on macOS
// https://github.com/nix-rust/nix/issues/861
let mut fds = [-1, -1];
#[cfg(not(target_os = "macos"))]
let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
#[cfg(target_os = "macos")]
let flags = 0;
// SAFETY: libc call, fds are correct size+align
let ret = unsafe {
libc::socketpair(
libc::AF_UNIX,
libc::SOCK_STREAM | flags,
0,
fds.as_mut_ptr(),
)
};
if ret != 0 {
return Err(std::io::Error::last_os_error().into());
}
if cfg!(target_os = "macos") {
let fcntl = |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> {
// SAFETY: libc call, fd is valid
let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) };
if flags == -1 {
return Err(fail(fds));
}
// SAFETY: libc call, fd is valid
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | flag) };
if ret == -1 {
return Err(fail(fds));
}
Ok(())
};
fn fail(fds: [i32; 2]) -> std::io::Error {
// SAFETY: libc call, fds are valid
unsafe {
libc::close(fds[0]);
libc::close(fds[1]);
}
std::io::Error::last_os_error()
}
// SOCK_NONBLOCK is not supported on macOS.
(fcntl)(fds[0], libc::O_NONBLOCK)?;
(fcntl)(fds[1], libc::O_NONBLOCK)?;
// SOCK_CLOEXEC is not supported on macOS.
(fcntl)(fds[0], libc::FD_CLOEXEC)?;
(fcntl)(fds[1], libc::FD_CLOEXEC)?;
}
let fd1 = fds[0];
let fd2 = fds[1];
Ok((fd1, fd2))
}
#[cfg(windows)]
{
// TODO(nathanwhit): more granular unsafe blocks
// SAFETY: win32 calls
unsafe {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED;
use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED;
use windows_sys::Win32::Foundation::GENERIC_READ;
use windows_sys::Win32::Foundation::GENERIC_WRITE;
use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
use windows_sys::Win32::Security::SECURITY_ATTRIBUTES;
use windows_sys::Win32::Storage::FileSystem::CreateFileW;
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE;
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING;
use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX;
use windows_sys::Win32::System::Pipes::ConnectNamedPipe;
use windows_sys::Win32::System::Pipes::CreateNamedPipeW;
use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE;
use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE;
use std::io;
use std::os::windows::ffi::OsStrExt;
use std::path::Path;
use std::ptr;
let (path, hd1) = loop {
let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4());
let mut path = Path::new(&name)
.as_os_str()
.encode_wide()
.collect::<Vec<_>>();
path.push(0);
let hd1 = CreateNamedPipeW(
path.as_ptr(),
PIPE_ACCESS_DUPLEX
| FILE_FLAG_FIRST_PIPE_INSTANCE
| FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
1,
65536,
65536,
0,
std::ptr::null_mut(),
);
if hd1 == INVALID_HANDLE_VALUE {
let err = io::Error::last_os_error();
/* If the pipe name is already in use, try again. */
if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) {
continue;
}
return Err(err.into());
}
break (path, hd1);
};
/* Create child pipe handle. */
let s = SECURITY_ATTRIBUTES {
nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
lpSecurityDescriptor: ptr::null_mut(),
bInheritHandle: 1,
};
let hd2 = CreateFileW(
path.as_ptr(),
GENERIC_READ | GENERIC_WRITE,
0,
&s,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
0,
);
if hd2 == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error().into());
}
// Will not block because we have create the pair.
if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) {
CloseHandle(hd2);
return Err(err.into());
}
}
Ok((hd1 as _, hd2 as _))
}
}
}

View File

@ -60,12 +60,21 @@ mod pipe;
#[cfg(windows)]
mod winpipe;
mod bi_pipe;
pub use pipe::pipe;
pub use pipe::AsyncPipeRead;
pub use pipe::AsyncPipeWrite;
pub use pipe::PipeRead;
pub use pipe::PipeWrite;
pub use bi_pipe::bi_pipe_pair_raw;
pub use bi_pipe::BiPipe;
pub use bi_pipe::BiPipeRead;
pub use bi_pipe::BiPipeResource;
pub use bi_pipe::BiPipeWrite;
pub use bi_pipe::RawBiPipeHandle;
// Store the stdio fd/handles in global statics in order to keep them
// alive for the duration of the application since the last handle/fd
// being dropped will close the corresponding pipe.

View File

@ -9,10 +9,6 @@ mod impl_ {
use std::future::Future;
use std::io;
use std::mem;
#[cfg(unix)]
use std::os::fd::FromRawFd;
#[cfg(unix)]
use std::os::fd::RawFd;
use std::pin::Pin;
use std::rc::Rc;
use std::sync::atomic::AtomicBool;
@ -43,15 +39,9 @@ mod impl_ {
use tokio::io::AsyncWriteExt;
use tokio::io::ReadBuf;
#[cfg(unix)]
use tokio::net::unix::OwnedReadHalf;
#[cfg(unix)]
use tokio::net::unix::OwnedWriteHalf;
#[cfg(unix)]
use tokio::net::UnixStream;
#[cfg(windows)]
type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient;
use deno_io::BiPipe;
use deno_io::BiPipeRead;
use deno_io::BiPipeWrite;
/// Wrapper around v8 value that implements Serialize.
struct SerializeWrapper<'a, 'b>(
@ -349,10 +339,7 @@ mod impl_ {
pub struct IpcJsonStreamResource {
read_half: AsyncRefCell<IpcJsonStream>,
#[cfg(unix)]
write_half: AsyncRefCell<OwnedWriteHalf>,
#[cfg(windows)]
write_half: AsyncRefCell<tokio::io::WriteHalf<NamedPipeClient>>,
write_half: AsyncRefCell<BiPipeWrite>,
cancel: Rc<CancelHandle>,
queued_bytes: AtomicUsize,
ref_tracker: IpcRefTracker,
@ -364,38 +351,12 @@ mod impl_ {
}
}
#[cfg(unix)]
fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> {
// Safety: The fd is part of a pair of connected sockets create by child process
// implementation.
let unix_stream = UnixStream::from_std(unsafe {
std::os::unix::net::UnixStream::from_raw_fd(stream)
})?;
Ok(unix_stream.into_split())
}
#[cfg(windows)]
fn pipe(
handle: i64,
) -> Result<
(
tokio::io::ReadHalf<NamedPipeClient>,
tokio::io::WriteHalf<NamedPipeClient>,
),
io::Error,
> {
// Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the
// fd handle map will be the same.
let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? };
Ok(tokio::io::split(pipe))
}
impl IpcJsonStreamResource {
pub fn new(
stream: i64,
ref_tracker: IpcRefTracker,
) -> Result<Self, std::io::Error> {
let (read_half, write_half) = pipe(stream as _)?;
let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split();
Ok(Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
@ -406,11 +367,14 @@ mod impl_ {
}
#[cfg(all(unix, test))]
fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self {
fn from_stream(
stream: tokio::net::UnixStream,
ref_tracker: IpcRefTracker,
) -> Self {
let (read_half, write_half) = stream.into_split();
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())),
write_half: AsyncRefCell::new(write_half.into()),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
@ -418,11 +382,14 @@ mod impl_ {
}
#[cfg(all(windows, test))]
fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self {
fn from_stream(
pipe: tokio::net::windows::named_pipe::NamedPipeClient,
ref_tracker: IpcRefTracker,
) -> Self {
let (read_half, write_half) = tokio::io::split(pipe);
Self {
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)),
write_half: AsyncRefCell::new(write_half),
read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())),
write_half: AsyncRefCell::new(write_half.into()),
cancel: Default::default(),
queued_bytes: Default::default(),
ref_tracker,
@ -492,26 +459,13 @@ mod impl_ {
//
// `\n` is used as a delimiter between messages.
struct IpcJsonStream {
#[cfg(unix)]
pipe: OwnedReadHalf,
#[cfg(windows)]
pipe: tokio::io::ReadHalf<NamedPipeClient>,
pipe: BiPipeRead,
buffer: Vec<u8>,
read_buffer: ReadBuffer,
}
impl IpcJsonStream {
#[cfg(unix)]
fn new(pipe: OwnedReadHalf) -> Self {
Self {
pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY),
read_buffer: ReadBuffer::new(),
}
}
#[cfg(windows)]
fn new(pipe: tokio::io::ReadHalf<NamedPipeClient>) -> Self {
fn new(pipe: BiPipeRead) -> Self {
Self {
pipe,
buffer: Vec::with_capacity(INITIAL_CAPACITY),

View File

@ -25,7 +25,6 @@ import {
StringPrototypeStartsWith,
StringPrototypeToUpperCase,
} from "ext:deno_node/internal/primordials.mjs";
import { assert } from "ext:deno_node/_util/asserts.ts";
import { EventEmitter } from "node:events";
import { os } from "ext:deno_node/internal_binding/constants.ts";
@ -54,6 +53,10 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs";
import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs";
import process from "node:process";
import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs";
import { StreamBase } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { Pipe, socketType } from "ext:deno_node/internal_binding/pipe_wrap.ts";
import console from "node:console";
import { Socket } from "node:net";
export function mapValues<T, O>(
record: Readonly<Record<string, T>>,
@ -118,6 +121,47 @@ function maybeClose(child: ChildProcess) {
}
}
function flushStdio(subprocess: ChildProcess) {
const stdio = subprocess.stdio;
if (stdio == null) return;
for (let i = 0; i < stdio.length; i++) {
const stream = stdio[i];
if (!stream || !stream.readable) {
continue;
}
stream.resume();
}
}
// Wraps a resource in a class that implements
// StreamBase, so it can be used with node streams
class StreamResource implements StreamBase {
#rid: number;
constructor(rid: number) {
this.#rid = rid;
}
close(): void {
core.close(this.#rid);
}
async read(p: Uint8Array): Promise<number | null> {
const readPromise = core.read(this.#rid, p);
core.unrefOpPromise(readPromise);
const nread = await readPromise;
return nread > 0 ? nread : null;
}
ref(): void {
return;
}
unref(): void {
return;
}
write(p: Uint8Array): Promise<number> {
return core.write(this.#rid, p);
}
}
export class ChildProcess extends EventEmitter {
/**
* The exit code of the child process. This property will be `null` until the child process exits.
@ -201,7 +245,7 @@ export class ChildProcess extends EventEmitter {
stdin = "pipe",
stdout = "pipe",
stderr = "pipe",
_channel, // TODO(kt3k): handle this correctly
...extraStdio
] = normalizedStdio;
const [cmd, cmdArgs] = buildCommand(
command,
@ -213,6 +257,15 @@ export class ChildProcess extends EventEmitter {
const ipc = normalizedStdio.indexOf("ipc");
const extraStdioOffset = 3; // stdin, stdout, stderr
const extraStdioNormalized: DenoStdio[] = [];
for (let i = 0; i < extraStdio.length; i++) {
const fd = i + extraStdioOffset;
if (fd === ipc) extraStdioNormalized.push("null");
extraStdioNormalized.push(toDenoStdio(extraStdio[i]));
}
const stringEnv = mapValues(env, (value) => value.toString());
try {
this.#process = new Deno.Command(cmd, {
@ -224,6 +277,7 @@ export class ChildProcess extends EventEmitter {
stderr: toDenoStdio(stderr),
windowsRawArguments: windowsVerbatimArguments,
ipc, // internal
extraStdio: extraStdioNormalized,
}).spawn();
this.pid = this.#process.pid;
@ -259,13 +313,36 @@ export class ChildProcess extends EventEmitter {
maybeClose(this);
});
}
// TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their
// close events (like above)
this.stdio[0] = this.stdin;
this.stdio[1] = this.stdout;
this.stdio[2] = this.stderr;
if (ipc >= 0) {
this.stdio[ipc] = null;
}
const pipeRids = internals.getExtraPipeRids(this.#process);
for (let i = 0; i < pipeRids.length; i++) {
const rid: number | null = pipeRids[i];
const fd = i + extraStdioOffset;
if (rid) {
this[kClosesNeeded]++;
this.stdio[fd] = new Socket(
{
handle: new Pipe(
socketType.IPC,
new StreamResource(rid),
),
// deno-lint-ignore no-explicit-any
} as any,
);
this.stdio[fd]?.on("close", () => {
maybeClose(this);
});
}
}
nextTick(() => {
this.emit("spawn");
this.#spawned.resolve();
@ -292,9 +369,9 @@ export class ChildProcess extends EventEmitter {
}
}
const pipeFd = internals.getPipeFd(this.#process);
if (typeof pipeFd == "number") {
setupChannel(this, pipeFd);
const pipeRid = internals.getIpcPipeRid(this.#process);
if (typeof pipeRid == "number") {
setupChannel(this, pipeRid);
this[kClosesNeeded]++;
this.on("disconnect", () => {
maybeClose(this);
@ -312,6 +389,7 @@ export class ChildProcess extends EventEmitter {
await this.#_waitForChildStreamsToClose();
this.#closePipes();
maybeClose(this);
nextTick(flushStdio, this);
});
})();
} catch (err) {
@ -395,16 +473,6 @@ export class ChildProcess extends EventEmitter {
assert(this.stdin);
this.stdin.destroy();
}
/// TODO(nathanwhit): for some reason when the child process exits
/// and the child end of the named pipe closes, reads still just return `Pending`
/// instead of returning that 0 bytes were read (to signal the pipe died).
/// For now, just forcibly disconnect, but in theory I think we could miss messages
/// that haven't been read yet.
if (Deno.build.os === "windows") {
if (this.canDisconnect) {
this.disconnect?.();
}
}
}
}

View File

@ -37,7 +37,10 @@ import {
import { LibuvStreamWrap } from "ext:deno_node/internal_binding/stream_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
import { delay } from "ext:deno_node/_util/async.ts";
import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts";
import {
kStreamBaseField,
StreamBase,
} from "ext:deno_node/internal_binding/stream_wrap.ts";
import {
ceilPowOf2,
INITIAL_ACCEPT_BACKOFF_DELAY,
@ -68,7 +71,7 @@ export class Pipe extends ConnectionWrap {
#closed = false;
#acceptBackoffDelay?: number;
constructor(type: number, conn?: Deno.UnixConn) {
constructor(type: number, conn?: Deno.UnixConn | StreamBase) {
let provider: providerType;
let ipc: boolean;
@ -100,8 +103,8 @@ export class Pipe extends ConnectionWrap {
this.ipc = ipc;
if (conn && provider === providerType.PIPEWRAP) {
const localAddr = conn.localAddr as Deno.UnixAddr;
if (conn && provider === providerType.PIPEWRAP && "localAddr" in conn) {
const localAddr = conn.localAddr;
this.#address = localAddr.path;
}
}

View File

@ -44,11 +44,11 @@ import {
} from "ext:deno_node/internal_binding/async_wrap.ts";
import { codeMap } from "ext:deno_node/internal_binding/uv.ts";
interface Reader {
export interface Reader {
read(p: Uint8Array): Promise<number | null>;
}
interface Writer {
export interface Writer {
write(p: Uint8Array): Promise<number>;
}
@ -56,7 +56,12 @@ export interface Closer {
close(): void;
}
type Ref = { ref(): void; unref(): void };
export interface Ref {
ref(): void;
unref(): void;
}
export interface StreamBase extends Reader, Writer, Closer, Ref {}
const enum StreamBaseStateFields {
kReadBytesOrError,

View File

@ -168,7 +168,7 @@ function run({
const illegalConstructorKey = Symbol("illegalConstructorKey");
function spawnChildInner(opFn, command, apiName, {
function spawnChildInner(command, apiName, {
args = [],
cwd = undefined,
clearEnv = false,
@ -181,8 +181,9 @@ function spawnChildInner(opFn, command, apiName, {
signal = undefined,
windowsRawArguments = false,
ipc = -1,
extraStdio = [],
} = { __proto__: null }) {
const child = opFn({
const child = op_spawn_child({
cmd: pathFromURL(command),
args: ArrayPrototypeMap(args, String),
cwd: pathFromURL(cwd),
@ -195,6 +196,7 @@ function spawnChildInner(opFn, command, apiName, {
stderr,
windowsRawArguments,
ipc,
extraStdio,
}, apiName);
return new ChildProcess(illegalConstructorKey, {
...child,
@ -204,7 +206,6 @@ function spawnChildInner(opFn, command, apiName, {
function spawnChild(command, options = { __proto__: null }) {
return spawnChildInner(
op_spawn_child,
command,
"Deno.Command().spawn()",
options,
@ -221,16 +222,19 @@ function collectOutput(readableStream) {
return readableStreamCollectIntoUint8Array(readableStream);
}
const _pipeFd = Symbol("[[pipeFd]]");
const _ipcPipeRid = Symbol("[[ipcPipeRid]]");
const _extraPipeRids = Symbol("[[_extraPipeRids]]");
internals.getPipeFd = (process) => process[_pipeFd];
internals.getIpcPipeRid = (process) => process[_ipcPipeRid];
internals.getExtraPipeRids = (process) => process[_extraPipeRids];
class ChildProcess {
#rid;
#waitPromise;
#waitComplete = false;
[_pipeFd];
[_ipcPipeRid];
[_extraPipeRids];
#pid;
get pid() {
@ -268,7 +272,8 @@ class ChildProcess {
stdinRid,
stdoutRid,
stderrRid,
pipeFd, // internal
ipcPipeRid, // internal
extraPipeRids,
} = null) {
if (key !== illegalConstructorKey) {
throw new TypeError("Illegal constructor.");
@ -276,7 +281,8 @@ class ChildProcess {
this.#rid = rid;
this.#pid = pid;
this[_pipeFd] = pipeFd;
this[_ipcPipeRid] = ipcPipeRid;
this[_extraPipeRids] = extraPipeRids;
if (stdinRid !== null) {
this.#stdin = writableStreamForRid(stdinRid);
@ -380,7 +386,6 @@ function spawn(command, options) {
);
}
return spawnChildInner(
op_spawn_child,
command,
"Deno.Command().output()",
options,
@ -417,6 +422,7 @@ function spawnSync(command, {
stdout,
stderr,
windowsRawArguments,
extraStdio: [],
});
return {
success: result.status.success,

View File

@ -154,6 +154,8 @@ pub struct SpawnArgs {
#[serde(flatten)]
stdio: ChildStdio,
extra_stdio: Vec<Stdio>,
}
#[derive(Deserialize)]
@ -215,7 +217,12 @@ pub struct SpawnOutput {
stderr: Option<ToJsBuffer>,
}
type CreateCommand = (std::process::Command, Option<ResourceId>);
type CreateCommand = (
std::process::Command,
Option<ResourceId>,
Vec<Option<ResourceId>>,
Vec<deno_io::RawBiPipeHandle>,
);
fn create_command(
state: &mut OpState,
@ -277,216 +284,103 @@ fn create_command(
// TODO(bartlomieju):
#[allow(clippy::undocumented_unsafe_blocks)]
unsafe {
let mut extra_pipe_rids = Vec::new();
let mut fds_to_dup = Vec::new();
let mut fds_to_close = Vec::new();
let mut ipc_rid = None;
if let Some(ipc) = args.ipc {
if ipc < 0 {
return Ok((command, None));
if ipc >= 0 {
let (ipc_fd1, ipc_fd2) = deno_io::bi_pipe_pair_raw()?;
fds_to_dup.push((ipc_fd2, ipc));
fds_to_close.push(ipc_fd2);
/* One end returned to parent process (this) */
let pipe_rid =
state
.resource_table
.add(deno_node::IpcJsonStreamResource::new(
ipc_fd1 as _,
deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
)?);
/* The other end passed to child process via NODE_CHANNEL_FD */
command.env("NODE_CHANNEL_FD", format!("{}", ipc));
ipc_rid = Some(pipe_rid);
}
// SockFlag is broken on macOS
// https://github.com/nix-rust/nix/issues/861
let mut fds = [-1, -1];
#[cfg(not(target_os = "macos"))]
let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK;
#[cfg(target_os = "macos")]
let flags = 0;
let ret = libc::socketpair(
libc::AF_UNIX,
libc::SOCK_STREAM | flags,
0,
fds.as_mut_ptr(),
);
if ret != 0 {
return Err(std::io::Error::last_os_error().into());
}
if cfg!(target_os = "macos") {
let fcntl =
|fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> {
let flags = libc::fcntl(fd, libc::F_GETFL, 0);
if flags == -1 {
return Err(fail(fds));
}
let ret = libc::fcntl(fd, libc::F_SETFL, flags | flag);
if ret == -1 {
return Err(fail(fds));
}
Ok(())
};
fn fail(fds: [i32; 2]) -> std::io::Error {
unsafe {
libc::close(fds[0]);
libc::close(fds[1]);
}
std::io::Error::last_os_error()
}
// SOCK_NONBLOCK is not supported on macOS.
(fcntl)(fds[0], libc::O_NONBLOCK)?;
(fcntl)(fds[1], libc::O_NONBLOCK)?;
// SOCK_CLOEXEC is not supported on macOS.
(fcntl)(fds[0], libc::FD_CLOEXEC)?;
(fcntl)(fds[1], libc::FD_CLOEXEC)?;
}
let fd1 = fds[0];
let fd2 = fds[1];
command.pre_exec(move || {
if ipc >= 0 {
let _fd = libc::dup2(fd2, ipc);
libc::close(fd2);
}
libc::setgroups(0, std::ptr::null());
Ok(())
});
/* One end returned to parent process (this) */
let pipe_rid = Some(state.resource_table.add(
deno_node::IpcJsonStreamResource::new(
fd1 as _,
deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
)?,
));
/* The other end passed to child process via NODE_CHANNEL_FD */
command.env("NODE_CHANNEL_FD", format!("{}", ipc));
return Ok((command, pipe_rid));
}
Ok((command, None))
for (i, stdio) in args.extra_stdio.into_iter().enumerate() {
// index 0 in `extra_stdio` actually refers to fd 3
// because we handle stdin,stdout,stderr specially
let fd = (i + 3) as i32;
// TODO(nathanwhit): handle inherited, but this relies on the parent process having
// fds open already. since we don't generally support dealing with raw fds,
// we can't properly support this
if matches!(stdio, Stdio::Piped) {
let (fd1, fd2) = deno_io::bi_pipe_pair_raw()?;
fds_to_dup.push((fd2, fd));
fds_to_close.push(fd2);
let rid = state.resource_table.add(
match deno_io::BiPipeResource::from_raw_handle(fd1) {
Ok(v) => v,
Err(e) => {
log::warn!("Failed to open bidirectional pipe for fd {fd}: {e}");
extra_pipe_rids.push(None);
continue;
}
},
);
extra_pipe_rids.push(Some(rid));
} else {
extra_pipe_rids.push(None);
}
}
command.pre_exec(move || {
for &(src, dst) in &fds_to_dup {
if src >= 0 && dst >= 0 {
let _fd = libc::dup2(src, dst);
libc::close(src);
}
}
libc::setgroups(0, std::ptr::null());
Ok(())
});
Ok((command, ipc_rid, extra_pipe_rids, fds_to_close))
}
#[cfg(windows)]
// Safety: We setup a windows named pipe and pass one end to the child process.
unsafe {
use windows_sys::Win32::Foundation::CloseHandle;
use windows_sys::Win32::Foundation::DuplicateHandle;
use windows_sys::Win32::Foundation::DUPLICATE_SAME_ACCESS;
use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED;
use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED;
use windows_sys::Win32::Foundation::GENERIC_READ;
use windows_sys::Win32::Foundation::GENERIC_WRITE;
use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE;
use windows_sys::Win32::Security::SECURITY_ATTRIBUTES;
use windows_sys::Win32::Storage::FileSystem::CreateFileW;
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE;
use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED;
use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING;
use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX;
use windows_sys::Win32::System::Pipes::ConnectNamedPipe;
use windows_sys::Win32::System::Pipes::CreateNamedPipeW;
use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE;
use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE;
use windows_sys::Win32::System::Threading::GetCurrentProcess;
use std::io;
use std::os::windows::ffi::OsStrExt;
use std::path::Path;
use std::ptr;
{
let mut ipc_rid = None;
let mut handles_to_close = Vec::with_capacity(1);
if let Some(ipc) = args.ipc {
if ipc < 0 {
return Ok((command, None));
if ipc >= 0 {
let (hd1, hd2) = deno_io::bi_pipe_pair_raw()?;
/* One end returned to parent process (this) */
let pipe_rid = Some(state.resource_table.add(
deno_node::IpcJsonStreamResource::new(
hd1 as i64,
deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
)?,
));
/* The other end passed to child process via NODE_CHANNEL_FD */
command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64));
handles_to_close.push(hd2);
ipc_rid = pipe_rid;
}
let (path, hd1) = loop {
let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4());
let mut path = Path::new(&name)
.as_os_str()
.encode_wide()
.collect::<Vec<_>>();
path.push(0);
let hd1 = CreateNamedPipeW(
path.as_ptr(),
PIPE_ACCESS_DUPLEX
| FILE_FLAG_FIRST_PIPE_INSTANCE
| FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE,
1,
65536,
65536,
0,
std::ptr::null_mut(),
);
if hd1 == INVALID_HANDLE_VALUE {
let err = io::Error::last_os_error();
/* If the pipe name is already in use, try again. */
if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) {
continue;
}
return Err(err.into());
}
break (path, hd1);
};
/* Create child pipe handle. */
let s = SECURITY_ATTRIBUTES {
nLength: std::mem::size_of::<SECURITY_ATTRIBUTES>() as u32,
lpSecurityDescriptor: ptr::null_mut(),
bInheritHandle: 1,
};
let mut hd2 = CreateFileW(
path.as_ptr(),
GENERIC_READ | GENERIC_WRITE,
0,
&s,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
0,
);
if hd2 == INVALID_HANDLE_VALUE {
return Err(io::Error::last_os_error().into());
}
// Will not block because we have create the pair.
if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) {
CloseHandle(hd2);
return Err(err.into());
}
}
// Duplicating the handle to allow the child process to use it.
if DuplicateHandle(
GetCurrentProcess(),
hd2,
GetCurrentProcess(),
&mut hd2,
0,
1,
DUPLICATE_SAME_ACCESS,
) == 0
{
return Err(std::io::Error::last_os_error().into());
}
/* One end returned to parent process (this) */
let pipe_fd = Some(state.resource_table.add(
deno_node::IpcJsonStreamResource::new(
hd1 as i64,
deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()),
)?,
));
/* The other end passed to child process via NODE_CHANNEL_FD */
command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64));
return Ok((command, pipe_fd));
}
}
#[cfg(not(unix))]
return Ok((command, None));
if args.extra_stdio.iter().any(|s| matches!(s, Stdio::Piped)) {
log::warn!(
"Additional stdio pipes beyond stdin/stdout/stderr are not currently supported on windows"
);
}
Ok((command, ipc_rid, vec![], handles_to_close))
}
}
#[derive(Serialize)]
@ -497,13 +391,15 @@ struct Child {
stdin_rid: Option<ResourceId>,
stdout_rid: Option<ResourceId>,
stderr_rid: Option<ResourceId>,
pipe_fd: Option<ResourceId>,
ipc_pipe_rid: Option<ResourceId>,
extra_pipe_rids: Vec<Option<ResourceId>>,
}
fn spawn_child(
state: &mut OpState,
command: std::process::Command,
pipe_fd: Option<ResourceId>,
ipc_pipe_rid: Option<ResourceId>,
extra_pipe_rids: Vec<Option<ResourceId>>,
) -> Result<Child, AnyError> {
let mut command = tokio::process::Command::from(command);
// TODO(@crowlkats): allow detaching processes.
@ -585,10 +481,28 @@ fn spawn_child(
stdin_rid,
stdout_rid,
stderr_rid,
pipe_fd,
ipc_pipe_rid,
extra_pipe_rids,
})
}
fn close_raw_handle(handle: deno_io::RawBiPipeHandle) {
#[cfg(unix)]
{
// SAFETY: libc call
unsafe {
libc::close(handle);
}
}
#[cfg(windows)]
{
// SAFETY: win32 call
unsafe {
windows_sys::Win32::Foundation::CloseHandle(handle as _);
}
}
}
#[op2]
#[serde]
fn op_spawn_child(
@ -596,8 +510,13 @@ fn op_spawn_child(
#[serde] args: SpawnArgs,
#[string] api_name: String,
) -> Result<Child, AnyError> {
let (command, pipe_rid) = create_command(state, args, &api_name)?;
spawn_child(state, command, pipe_rid)
let (command, pipe_rid, extra_pipe_rids, handles_to_close) =
create_command(state, args, &api_name)?;
let child = spawn_child(state, command, pipe_rid, extra_pipe_rids);
for handle in handles_to_close {
close_raw_handle(handle);
}
child
}
#[op2(async)]
@ -626,7 +545,7 @@ fn op_spawn_sync(
) -> Result<SpawnOutput, AnyError> {
let stdout = matches!(args.stdio.stdout, StdioOrRid::Stdio(Stdio::Piped));
let stderr = matches!(args.stdio.stderr, StdioOrRid::Stdio(Stdio::Piped));
let (mut command, _) =
let (mut command, _, _, _) =
create_command(state, args, "Deno.Command().outputSync()")?;
let output = command.output().with_context(|| {
format!(

View File

@ -0,0 +1,17 @@
{
"tempDir": true,
"steps": [
{
"if": "unix",
"cwd": "./test-pipe",
"commandName": "cargo",
"args": "build",
"output": "[WILDCARD]"
},
{
"if": "unix",
"args": "run -A main.ts",
"output": "main.out"
}
]
}

View File

@ -0,0 +1,5 @@
data: hello world
[UNORDERED_START]
child closed
pipe closed
[UNORDERED_END]

View File

@ -0,0 +1,26 @@
import child_process from "node:child_process";
import { Buffer } from "node:buffer";
import console from "node:console";
const child = child_process.spawn("./test-pipe/target/debug/test-pipe", [], {
stdio: ["inherit", "inherit", "inherit", "ignore", "pipe"],
});
const extra = child.stdio[4];
const p = Promise.withResolvers();
child.on("close", () => {
console.log("child closed");
p.resolve();
});
extra.on("data", (d) => {
console.log("data:", d.toString().trim());
});
extra.on("close", () => {
console.log("pipe closed");
});
await p.promise;

View File

@ -0,0 +1,7 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "test-pipe"
version = "0.1.0"

View File

@ -0,0 +1,8 @@
[package]
name = "test-pipe"
version = "0.1.0"
edition = "2021"
[dependencies]
[workspace]

View File

@ -0,0 +1,12 @@
use std::io::prelude::*;
use std::os::fd::FromRawFd;
use std::os::unix::net::UnixStream;
fn main() {
#[cfg(unix)]
{
let mut stream = unsafe { UnixStream::from_raw_fd(4) };
stream.write_all(b"hello world\n").unwrap();
}
}

View File

@ -166,6 +166,7 @@
"ext:deno_node/internal/test/binding.ts": "../ext/node/polyfills/internal/test/binding.ts",
"ext:deno_node/internal/timers.mjs": "../ext/node/polyfills/internal/timers.mjs",
"ext:deno_node/internal/url.ts": "../ext/node/polyfills/internal/url.ts",
"ext:deno_node/internal/stream_base_commons.ts": "../ext/node/polyfills/internal/stream_base_commons.ts",
"ext:deno_node/internal/util.mjs": "../ext/node/polyfills/internal/util.mjs",
"ext:deno_node/internal/util/debuglog.ts": "../ext/node/polyfills/internal/util/debuglog.ts",
"ext:deno_node/internal/util/inspect.mjs": "../ext/node/polyfills/internal/util/inspect.mjs",