From 8749d651fb5e0964cdb8e62be7a59a603cbc3c7c Mon Sep 17 00:00:00 2001 From: Nathan Whitaker <17734409+nathanwhit@users.noreply.github.com> Date: Thu, 15 Aug 2024 09:38:46 -0700 Subject: [PATCH] fix(node): Create additional pipes for child processes (#25016) Linux/macos only currently. Part of https://github.com/denoland/deno/issues/23524 (fixes it on platforms other than windows). Part of #16899 (fixes it on platforms other than windows). After this PR, playwright is functional on mac/linux. --- Cargo.lock | 4 + ext/io/Cargo.toml | 4 + ext/io/bi_pipe.rs | 433 ++++++++++++++++++ ext/io/lib.rs | 9 + ext/node/ops/ipc.rs | 84 +--- ext/node/polyfills/internal/child_process.ts | 102 ++++- .../polyfills/internal_binding/pipe_wrap.ts | 11 +- .../polyfills/internal_binding/stream_wrap.ts | 11 +- runtime/js/40_process.js | 24 +- runtime/ops/process.rs | 331 +++++-------- .../child_process_extra_pipes/__test__.jsonc | 17 + .../node/child_process_extra_pipes/main.out | 5 + .../node/child_process_extra_pipes/main.ts | 26 ++ .../test-pipe/Cargo.lock | 7 + .../test-pipe/Cargo.toml | 8 + .../test-pipe/src/main.rs | 12 + tools/core_import_map.json | 1 + 17 files changed, 785 insertions(+), 304 deletions(-) create mode 100644 ext/io/bi_pipe.rs create mode 100644 tests/specs/node/child_process_extra_pipes/__test__.jsonc create mode 100644 tests/specs/node/child_process_extra_pipes/main.out create mode 100644 tests/specs/node/child_process_extra_pipes/main.ts create mode 100644 tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.lock create mode 100644 tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.toml create mode 100644 tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index a22fa29389..e3cdc49d6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1658,13 +1658,17 @@ dependencies = [ "deno_core", "filetime", "fs3", + "libc", "log", "once_cell", "os_pipe", "parking_lot 0.12.3", + "pin-project", "rand", "tokio", + "uuid", "winapi", + "windows-sys 0.52.0", ] [[package]] diff --git a/ext/io/Cargo.toml b/ext/io/Cargo.toml index 9d02f88c97..414bf0739d 100644 --- a/ext/io/Cargo.toml +++ b/ext/io/Cargo.toml @@ -20,12 +20,16 @@ filetime.workspace = true fs3.workspace = true log.workspace = true once_cell.workspace = true +pin-project.workspace = true tokio.workspace = true +uuid.workspace = true [target.'cfg(not(windows))'.dependencies] os_pipe.workspace = true +libc.workspace = true [target.'cfg(windows)'.dependencies] winapi = { workspace = true, features = ["winbase", "processenv", "errhandlingapi"] } rand.workspace = true parking_lot.workspace = true +windows-sys.workspace = true diff --git a/ext/io/bi_pipe.rs b/ext/io/bi_pipe.rs new file mode 100644 index 0000000000..04fff7b00b --- /dev/null +++ b/ext/io/bi_pipe.rs @@ -0,0 +1,433 @@ +// Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. + +use std::rc::Rc; + +use deno_core::error::AnyError; +use deno_core::AsyncRefCell; +use deno_core::AsyncResult; +use deno_core::CancelHandle; +use deno_core::CancelTryFuture; +use deno_core::RcRef; +use tokio::io::AsyncReadExt; +use tokio::io::AsyncWriteExt; + +#[cfg(unix)] +pub type RawBiPipeHandle = std::os::fd::RawFd; + +#[cfg(windows)] +pub type RawBiPipeHandle = std::os::windows::io::RawHandle; + +/// One end of a bidirectional pipe. This implements the +/// `Resource` trait. +pub struct BiPipeResource { + read_half: AsyncRefCell, + write_half: AsyncRefCell, + cancel: CancelHandle, + raw_handle: RawBiPipeHandle, +} + +#[cfg(windows)] +// workaround because `RawHandle` doesn't impl `AsRawHandle` +mod as_raw_handle { + use super::RawBiPipeHandle; + pub(super) struct RawHandleWrap(pub(super) RawBiPipeHandle); + impl std::os::windows::io::AsRawHandle for RawHandleWrap { + fn as_raw_handle(&self) -> std::os::windows::prelude::RawHandle { + self.0 + } + } +} + +impl deno_core::Resource for BiPipeResource { + fn close(self: Rc) { + self.cancel.cancel(); + } + + fn backing_handle(self: Rc) -> Option { + #[cfg(unix)] + { + Some(deno_core::ResourceHandle::from_fd_like(&self.raw_handle)) + } + #[cfg(windows)] + { + Some(deno_core::ResourceHandle::from_fd_like( + &as_raw_handle::RawHandleWrap(self.raw_handle), + )) + } + } + + deno_core::impl_readable_byob!(); + deno_core::impl_writable!(); +} + +impl BiPipeResource { + pub fn from_raw_handle(raw: RawBiPipeHandle) -> Result { + let pipe = BiPipe::from_raw(raw)?; + let (read, write) = pipe.split(); + Ok(Self { + raw_handle: raw, + read_half: AsyncRefCell::new(read), + write_half: AsyncRefCell::new(write), + cancel: Default::default(), + }) + } + + pub async fn read( + self: Rc, + data: &mut [u8], + ) -> Result { + let mut rd = RcRef::map(&self, |r| &r.read_half).borrow_mut().await; + let cancel_handle = RcRef::map(&self, |r| &r.cancel); + Ok(rd.read(data).try_or_cancel(cancel_handle).await?) + } + + pub async fn write(self: Rc, data: &[u8]) -> Result { + let mut wr = RcRef::map(self, |r| &r.write_half).borrow_mut().await; + let nwritten = wr.write(data).await?; + wr.flush().await?; + Ok(nwritten) + } +} + +/// One end of a bidirectional pipe +#[pin_project::pin_project] +pub struct BiPipe { + #[pin] + read_end: BiPipeRead, + #[pin] + write_end: BiPipeWrite, +} + +impl BiPipe { + pub fn from_raw(raw: RawBiPipeHandle) -> Result { + let (read_end, write_end) = from_raw(raw)?; + Ok(Self { + read_end, + write_end, + }) + } + + pub fn split(self) -> (BiPipeRead, BiPipeWrite) { + (self.read_end, self.write_end) + } + + pub fn unsplit(read_end: BiPipeRead, write_end: BiPipeWrite) -> Self { + Self { + read_end, + write_end, + } + } +} + +#[pin_project::pin_project] +pub struct BiPipeRead { + #[cfg(unix)] + #[pin] + inner: tokio::net::unix::OwnedReadHalf, + #[cfg(windows)] + #[pin] + inner: tokio::io::ReadHalf, +} + +#[cfg(unix)] +impl From for BiPipeRead { + fn from(value: tokio::net::unix::OwnedReadHalf) -> Self { + Self { inner: value } + } +} +#[cfg(windows)] +impl From> + for BiPipeRead +{ + fn from( + value: tokio::io::ReadHalf< + tokio::net::windows::named_pipe::NamedPipeClient, + >, + ) -> Self { + Self { inner: value } + } +} + +#[pin_project::pin_project] +pub struct BiPipeWrite { + #[cfg(unix)] + #[pin] + inner: tokio::net::unix::OwnedWriteHalf, + #[cfg(windows)] + #[pin] + inner: tokio::io::WriteHalf, +} + +#[cfg(unix)] +impl From for BiPipeWrite { + fn from(value: tokio::net::unix::OwnedWriteHalf) -> Self { + Self { inner: value } + } +} + +#[cfg(windows)] +impl + From> + for BiPipeWrite +{ + fn from( + value: tokio::io::WriteHalf< + tokio::net::windows::named_pipe::NamedPipeClient, + >, + ) -> Self { + Self { inner: value } + } +} + +#[cfg(unix)] +fn from_raw( + stream: RawBiPipeHandle, +) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> { + use std::os::fd::FromRawFd; + // Safety: The fd is part of a pair of connected sockets + let unix_stream = tokio::net::UnixStream::from_std(unsafe { + std::os::unix::net::UnixStream::from_raw_fd(stream) + })?; + let (read, write) = unix_stream.into_split(); + Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write })) +} + +#[cfg(windows)] +fn from_raw( + handle: RawBiPipeHandle, +) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> { + // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the + // fd handle map will be the same. + let pipe = unsafe { + tokio::net::windows::named_pipe::NamedPipeClient::from_raw_handle( + handle as _, + )? + }; + let (read, write) = tokio::io::split(pipe); + Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write })) +} + +impl tokio::io::AsyncRead for BiPipeRead { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + self.project().inner.poll_read(cx, buf) + } +} +impl tokio::io::AsyncRead for BiPipe { + fn poll_read( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut tokio::io::ReadBuf<'_>, + ) -> std::task::Poll> { + self.project().read_end.poll_read(cx, buf) + } +} + +// implement `AsyncWrite` for `$name`, delegating +// the impl to `$field`. `$name` must have a `project` method +// with a projected `$field` (e.g. with `pin_project::pin_project`) +macro_rules! impl_async_write { + (for $name: ident -> self.$field: ident) => { + impl tokio::io::AsyncWrite for $name { + fn poll_write_vectored( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + bufs: &[std::io::IoSlice<'_>], + ) -> std::task::Poll> { + self.project().$field.poll_write_vectored(cx, bufs) + } + + fn is_write_vectored(&self) -> bool { + self.$field.is_write_vectored() + } + + fn poll_write( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &[u8], + ) -> std::task::Poll> { + self.project().$field.poll_write(cx, buf) + } + + fn poll_flush( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().$field.poll_flush(cx) + } + + fn poll_shutdown( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.project().$field.poll_shutdown(cx) + } + } + }; +} + +impl_async_write!(for BiPipeWrite -> self.inner); +impl_async_write!(for BiPipe -> self.write_end); + +/// Creates both sides of a bidirectional pipe, returning the raw +/// handles to the underlying OS resources. +pub fn bi_pipe_pair_raw() -> Result<(RawBiPipeHandle, RawBiPipeHandle), AnyError> +{ + #[cfg(unix)] + { + // SockFlag is broken on macOS + // https://github.com/nix-rust/nix/issues/861 + let mut fds = [-1, -1]; + #[cfg(not(target_os = "macos"))] + let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK; + + #[cfg(target_os = "macos")] + let flags = 0; + + // SAFETY: libc call, fds are correct size+align + let ret = unsafe { + libc::socketpair( + libc::AF_UNIX, + libc::SOCK_STREAM | flags, + 0, + fds.as_mut_ptr(), + ) + }; + if ret != 0 { + return Err(std::io::Error::last_os_error().into()); + } + + if cfg!(target_os = "macos") { + let fcntl = |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> { + // SAFETY: libc call, fd is valid + let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + + if flags == -1 { + return Err(fail(fds)); + } + // SAFETY: libc call, fd is valid + let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | flag) }; + if ret == -1 { + return Err(fail(fds)); + } + Ok(()) + }; + + fn fail(fds: [i32; 2]) -> std::io::Error { + // SAFETY: libc call, fds are valid + unsafe { + libc::close(fds[0]); + libc::close(fds[1]); + } + std::io::Error::last_os_error() + } + + // SOCK_NONBLOCK is not supported on macOS. + (fcntl)(fds[0], libc::O_NONBLOCK)?; + (fcntl)(fds[1], libc::O_NONBLOCK)?; + + // SOCK_CLOEXEC is not supported on macOS. + (fcntl)(fds[0], libc::FD_CLOEXEC)?; + (fcntl)(fds[1], libc::FD_CLOEXEC)?; + } + + let fd1 = fds[0]; + let fd2 = fds[1]; + Ok((fd1, fd2)) + } + #[cfg(windows)] + { + // TODO(nathanwhit): more granular unsafe blocks + // SAFETY: win32 calls + unsafe { + use windows_sys::Win32::Foundation::CloseHandle; + use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED; + use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED; + use windows_sys::Win32::Foundation::GENERIC_READ; + use windows_sys::Win32::Foundation::GENERIC_WRITE; + use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE; + use windows_sys::Win32::Security::SECURITY_ATTRIBUTES; + use windows_sys::Win32::Storage::FileSystem::CreateFileW; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE; + use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; + use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING; + use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX; + use windows_sys::Win32::System::Pipes::ConnectNamedPipe; + use windows_sys::Win32::System::Pipes::CreateNamedPipeW; + use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE; + use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE; + + use std::io; + use std::os::windows::ffi::OsStrExt; + use std::path::Path; + use std::ptr; + + let (path, hd1) = loop { + let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4()); + let mut path = Path::new(&name) + .as_os_str() + .encode_wide() + .collect::>(); + path.push(0); + + let hd1 = CreateNamedPipeW( + path.as_ptr(), + PIPE_ACCESS_DUPLEX + | FILE_FLAG_FIRST_PIPE_INSTANCE + | FILE_FLAG_OVERLAPPED, + PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, + 1, + 65536, + 65536, + 0, + std::ptr::null_mut(), + ); + + if hd1 == INVALID_HANDLE_VALUE { + let err = io::Error::last_os_error(); + /* If the pipe name is already in use, try again. */ + if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) { + continue; + } + + return Err(err.into()); + } + + break (path, hd1); + }; + + /* Create child pipe handle. */ + let s = SECURITY_ATTRIBUTES { + nLength: std::mem::size_of::() as u32, + lpSecurityDescriptor: ptr::null_mut(), + bInheritHandle: 1, + }; + let hd2 = CreateFileW( + path.as_ptr(), + GENERIC_READ | GENERIC_WRITE, + 0, + &s, + OPEN_EXISTING, + FILE_FLAG_OVERLAPPED, + 0, + ); + if hd2 == INVALID_HANDLE_VALUE { + return Err(io::Error::last_os_error().into()); + } + + // Will not block because we have create the pair. + if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 { + let err = std::io::Error::last_os_error(); + if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) { + CloseHandle(hd2); + return Err(err.into()); + } + } + + Ok((hd1 as _, hd2 as _)) + } + } +} diff --git a/ext/io/lib.rs b/ext/io/lib.rs index a2f14e0dbd..47921bcee4 100644 --- a/ext/io/lib.rs +++ b/ext/io/lib.rs @@ -60,12 +60,21 @@ mod pipe; #[cfg(windows)] mod winpipe; +mod bi_pipe; + pub use pipe::pipe; pub use pipe::AsyncPipeRead; pub use pipe::AsyncPipeWrite; pub use pipe::PipeRead; pub use pipe::PipeWrite; +pub use bi_pipe::bi_pipe_pair_raw; +pub use bi_pipe::BiPipe; +pub use bi_pipe::BiPipeRead; +pub use bi_pipe::BiPipeResource; +pub use bi_pipe::BiPipeWrite; +pub use bi_pipe::RawBiPipeHandle; + // Store the stdio fd/handles in global statics in order to keep them // alive for the duration of the application since the last handle/fd // being dropped will close the corresponding pipe. diff --git a/ext/node/ops/ipc.rs b/ext/node/ops/ipc.rs index 7cfab65a45..59b6fece14 100644 --- a/ext/node/ops/ipc.rs +++ b/ext/node/ops/ipc.rs @@ -9,10 +9,6 @@ mod impl_ { use std::future::Future; use std::io; use std::mem; - #[cfg(unix)] - use std::os::fd::FromRawFd; - #[cfg(unix)] - use std::os::fd::RawFd; use std::pin::Pin; use std::rc::Rc; use std::sync::atomic::AtomicBool; @@ -43,15 +39,9 @@ mod impl_ { use tokio::io::AsyncWriteExt; use tokio::io::ReadBuf; - #[cfg(unix)] - use tokio::net::unix::OwnedReadHalf; - #[cfg(unix)] - use tokio::net::unix::OwnedWriteHalf; - #[cfg(unix)] - use tokio::net::UnixStream; - - #[cfg(windows)] - type NamedPipeClient = tokio::net::windows::named_pipe::NamedPipeClient; + use deno_io::BiPipe; + use deno_io::BiPipeRead; + use deno_io::BiPipeWrite; /// Wrapper around v8 value that implements Serialize. struct SerializeWrapper<'a, 'b>( @@ -349,10 +339,7 @@ mod impl_ { pub struct IpcJsonStreamResource { read_half: AsyncRefCell, - #[cfg(unix)] - write_half: AsyncRefCell, - #[cfg(windows)] - write_half: AsyncRefCell>, + write_half: AsyncRefCell, cancel: Rc, queued_bytes: AtomicUsize, ref_tracker: IpcRefTracker, @@ -364,38 +351,12 @@ mod impl_ { } } - #[cfg(unix)] - fn pipe(stream: RawFd) -> Result<(OwnedReadHalf, OwnedWriteHalf), io::Error> { - // Safety: The fd is part of a pair of connected sockets create by child process - // implementation. - let unix_stream = UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; - Ok(unix_stream.into_split()) - } - - #[cfg(windows)] - fn pipe( - handle: i64, - ) -> Result< - ( - tokio::io::ReadHalf, - tokio::io::WriteHalf, - ), - io::Error, - > { - // Safety: We cannot use `get_osfhandle` because Deno statically links to msvcrt. It is not guaranteed that the - // fd handle map will be the same. - let pipe = unsafe { NamedPipeClient::from_raw_handle(handle as _)? }; - Ok(tokio::io::split(pipe)) - } - impl IpcJsonStreamResource { pub fn new( stream: i64, ref_tracker: IpcRefTracker, ) -> Result { - let (read_half, write_half) = pipe(stream as _)?; + let (read_half, write_half) = BiPipe::from_raw(stream as _)?.split(); Ok(Self { read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), write_half: AsyncRefCell::new(write_half), @@ -406,11 +367,14 @@ mod impl_ { } #[cfg(all(unix, test))] - fn from_stream(stream: UnixStream, ref_tracker: IpcRefTracker) -> Self { + fn from_stream( + stream: tokio::net::UnixStream, + ref_tracker: IpcRefTracker, + ) -> Self { let (read_half, write_half) = stream.into_split(); Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), cancel: Default::default(), queued_bytes: Default::default(), ref_tracker, @@ -418,11 +382,14 @@ mod impl_ { } #[cfg(all(windows, test))] - fn from_stream(pipe: NamedPipeClient, ref_tracker: IpcRefTracker) -> Self { + fn from_stream( + pipe: tokio::net::windows::named_pipe::NamedPipeClient, + ref_tracker: IpcRefTracker, + ) -> Self { let (read_half, write_half) = tokio::io::split(pipe); Self { - read_half: AsyncRefCell::new(IpcJsonStream::new(read_half)), - write_half: AsyncRefCell::new(write_half), + read_half: AsyncRefCell::new(IpcJsonStream::new(read_half.into())), + write_half: AsyncRefCell::new(write_half.into()), cancel: Default::default(), queued_bytes: Default::default(), ref_tracker, @@ -492,26 +459,13 @@ mod impl_ { // // `\n` is used as a delimiter between messages. struct IpcJsonStream { - #[cfg(unix)] - pipe: OwnedReadHalf, - #[cfg(windows)] - pipe: tokio::io::ReadHalf, + pipe: BiPipeRead, buffer: Vec, read_buffer: ReadBuffer, } impl IpcJsonStream { - #[cfg(unix)] - fn new(pipe: OwnedReadHalf) -> Self { - Self { - pipe, - buffer: Vec::with_capacity(INITIAL_CAPACITY), - read_buffer: ReadBuffer::new(), - } - } - - #[cfg(windows)] - fn new(pipe: tokio::io::ReadHalf) -> Self { + fn new(pipe: BiPipeRead) -> Self { Self { pipe, buffer: Vec::with_capacity(INITIAL_CAPACITY), diff --git a/ext/node/polyfills/internal/child_process.ts b/ext/node/polyfills/internal/child_process.ts index 0996806d7f..edc11df736 100644 --- a/ext/node/polyfills/internal/child_process.ts +++ b/ext/node/polyfills/internal/child_process.ts @@ -25,7 +25,6 @@ import { StringPrototypeStartsWith, StringPrototypeToUpperCase, } from "ext:deno_node/internal/primordials.mjs"; - import { assert } from "ext:deno_node/_util/asserts.ts"; import { EventEmitter } from "node:events"; import { os } from "ext:deno_node/internal_binding/constants.ts"; @@ -54,6 +53,10 @@ import { kEmptyObject } from "ext:deno_node/internal/util.mjs"; import { getValidatedPath } from "ext:deno_node/internal/fs/utils.mjs"; import process from "node:process"; import { StringPrototypeSlice } from "ext:deno_node/internal/primordials.mjs"; +import { StreamBase } from "ext:deno_node/internal_binding/stream_wrap.ts"; +import { Pipe, socketType } from "ext:deno_node/internal_binding/pipe_wrap.ts"; +import console from "node:console"; +import { Socket } from "node:net"; export function mapValues( record: Readonly>, @@ -118,6 +121,47 @@ function maybeClose(child: ChildProcess) { } } +function flushStdio(subprocess: ChildProcess) { + const stdio = subprocess.stdio; + + if (stdio == null) return; + + for (let i = 0; i < stdio.length; i++) { + const stream = stdio[i]; + if (!stream || !stream.readable) { + continue; + } + stream.resume(); + } +} + +// Wraps a resource in a class that implements +// StreamBase, so it can be used with node streams +class StreamResource implements StreamBase { + #rid: number; + constructor(rid: number) { + this.#rid = rid; + } + close(): void { + core.close(this.#rid); + } + async read(p: Uint8Array): Promise { + const readPromise = core.read(this.#rid, p); + core.unrefOpPromise(readPromise); + const nread = await readPromise; + return nread > 0 ? nread : null; + } + ref(): void { + return; + } + unref(): void { + return; + } + write(p: Uint8Array): Promise { + return core.write(this.#rid, p); + } +} + export class ChildProcess extends EventEmitter { /** * The exit code of the child process. This property will be `null` until the child process exits. @@ -201,7 +245,7 @@ export class ChildProcess extends EventEmitter { stdin = "pipe", stdout = "pipe", stderr = "pipe", - _channel, // TODO(kt3k): handle this correctly + ...extraStdio ] = normalizedStdio; const [cmd, cmdArgs] = buildCommand( command, @@ -213,6 +257,15 @@ export class ChildProcess extends EventEmitter { const ipc = normalizedStdio.indexOf("ipc"); + const extraStdioOffset = 3; // stdin, stdout, stderr + + const extraStdioNormalized: DenoStdio[] = []; + for (let i = 0; i < extraStdio.length; i++) { + const fd = i + extraStdioOffset; + if (fd === ipc) extraStdioNormalized.push("null"); + extraStdioNormalized.push(toDenoStdio(extraStdio[i])); + } + const stringEnv = mapValues(env, (value) => value.toString()); try { this.#process = new Deno.Command(cmd, { @@ -224,6 +277,7 @@ export class ChildProcess extends EventEmitter { stderr: toDenoStdio(stderr), windowsRawArguments: windowsVerbatimArguments, ipc, // internal + extraStdio: extraStdioNormalized, }).spawn(); this.pid = this.#process.pid; @@ -259,13 +313,36 @@ export class ChildProcess extends EventEmitter { maybeClose(this); }); } - // TODO(nathanwhit): once we impl > 3 stdio pipes make sure we also listen for their - // close events (like above) this.stdio[0] = this.stdin; this.stdio[1] = this.stdout; this.stdio[2] = this.stderr; + if (ipc >= 0) { + this.stdio[ipc] = null; + } + + const pipeRids = internals.getExtraPipeRids(this.#process); + for (let i = 0; i < pipeRids.length; i++) { + const rid: number | null = pipeRids[i]; + const fd = i + extraStdioOffset; + if (rid) { + this[kClosesNeeded]++; + this.stdio[fd] = new Socket( + { + handle: new Pipe( + socketType.IPC, + new StreamResource(rid), + ), + // deno-lint-ignore no-explicit-any + } as any, + ); + this.stdio[fd]?.on("close", () => { + maybeClose(this); + }); + } + } + nextTick(() => { this.emit("spawn"); this.#spawned.resolve(); @@ -292,9 +369,9 @@ export class ChildProcess extends EventEmitter { } } - const pipeFd = internals.getPipeFd(this.#process); - if (typeof pipeFd == "number") { - setupChannel(this, pipeFd); + const pipeRid = internals.getIpcPipeRid(this.#process); + if (typeof pipeRid == "number") { + setupChannel(this, pipeRid); this[kClosesNeeded]++; this.on("disconnect", () => { maybeClose(this); @@ -312,6 +389,7 @@ export class ChildProcess extends EventEmitter { await this.#_waitForChildStreamsToClose(); this.#closePipes(); maybeClose(this); + nextTick(flushStdio, this); }); })(); } catch (err) { @@ -395,16 +473,6 @@ export class ChildProcess extends EventEmitter { assert(this.stdin); this.stdin.destroy(); } - /// TODO(nathanwhit): for some reason when the child process exits - /// and the child end of the named pipe closes, reads still just return `Pending` - /// instead of returning that 0 bytes were read (to signal the pipe died). - /// For now, just forcibly disconnect, but in theory I think we could miss messages - /// that haven't been read yet. - if (Deno.build.os === "windows") { - if (this.canDisconnect) { - this.disconnect?.(); - } - } } } diff --git a/ext/node/polyfills/internal_binding/pipe_wrap.ts b/ext/node/polyfills/internal_binding/pipe_wrap.ts index a657f7018d..f5c3c54396 100644 --- a/ext/node/polyfills/internal_binding/pipe_wrap.ts +++ b/ext/node/polyfills/internal_binding/pipe_wrap.ts @@ -37,7 +37,10 @@ import { import { LibuvStreamWrap } from "ext:deno_node/internal_binding/stream_wrap.ts"; import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; import { delay } from "ext:deno_node/_util/async.ts"; -import { kStreamBaseField } from "ext:deno_node/internal_binding/stream_wrap.ts"; +import { + kStreamBaseField, + StreamBase, +} from "ext:deno_node/internal_binding/stream_wrap.ts"; import { ceilPowOf2, INITIAL_ACCEPT_BACKOFF_DELAY, @@ -68,7 +71,7 @@ export class Pipe extends ConnectionWrap { #closed = false; #acceptBackoffDelay?: number; - constructor(type: number, conn?: Deno.UnixConn) { + constructor(type: number, conn?: Deno.UnixConn | StreamBase) { let provider: providerType; let ipc: boolean; @@ -100,8 +103,8 @@ export class Pipe extends ConnectionWrap { this.ipc = ipc; - if (conn && provider === providerType.PIPEWRAP) { - const localAddr = conn.localAddr as Deno.UnixAddr; + if (conn && provider === providerType.PIPEWRAP && "localAddr" in conn) { + const localAddr = conn.localAddr; this.#address = localAddr.path; } } diff --git a/ext/node/polyfills/internal_binding/stream_wrap.ts b/ext/node/polyfills/internal_binding/stream_wrap.ts index 4915a38cac..dc30bfdfe0 100644 --- a/ext/node/polyfills/internal_binding/stream_wrap.ts +++ b/ext/node/polyfills/internal_binding/stream_wrap.ts @@ -44,11 +44,11 @@ import { } from "ext:deno_node/internal_binding/async_wrap.ts"; import { codeMap } from "ext:deno_node/internal_binding/uv.ts"; -interface Reader { +export interface Reader { read(p: Uint8Array): Promise; } -interface Writer { +export interface Writer { write(p: Uint8Array): Promise; } @@ -56,7 +56,12 @@ export interface Closer { close(): void; } -type Ref = { ref(): void; unref(): void }; +export interface Ref { + ref(): void; + unref(): void; +} + +export interface StreamBase extends Reader, Writer, Closer, Ref {} const enum StreamBaseStateFields { kReadBytesOrError, diff --git a/runtime/js/40_process.js b/runtime/js/40_process.js index 6db04468f0..0f28b9d5cb 100644 --- a/runtime/js/40_process.js +++ b/runtime/js/40_process.js @@ -168,7 +168,7 @@ function run({ const illegalConstructorKey = Symbol("illegalConstructorKey"); -function spawnChildInner(opFn, command, apiName, { +function spawnChildInner(command, apiName, { args = [], cwd = undefined, clearEnv = false, @@ -181,8 +181,9 @@ function spawnChildInner(opFn, command, apiName, { signal = undefined, windowsRawArguments = false, ipc = -1, + extraStdio = [], } = { __proto__: null }) { - const child = opFn({ + const child = op_spawn_child({ cmd: pathFromURL(command), args: ArrayPrototypeMap(args, String), cwd: pathFromURL(cwd), @@ -195,6 +196,7 @@ function spawnChildInner(opFn, command, apiName, { stderr, windowsRawArguments, ipc, + extraStdio, }, apiName); return new ChildProcess(illegalConstructorKey, { ...child, @@ -204,7 +206,6 @@ function spawnChildInner(opFn, command, apiName, { function spawnChild(command, options = { __proto__: null }) { return spawnChildInner( - op_spawn_child, command, "Deno.Command().spawn()", options, @@ -221,16 +222,19 @@ function collectOutput(readableStream) { return readableStreamCollectIntoUint8Array(readableStream); } -const _pipeFd = Symbol("[[pipeFd]]"); +const _ipcPipeRid = Symbol("[[ipcPipeRid]]"); +const _extraPipeRids = Symbol("[[_extraPipeRids]]"); -internals.getPipeFd = (process) => process[_pipeFd]; +internals.getIpcPipeRid = (process) => process[_ipcPipeRid]; +internals.getExtraPipeRids = (process) => process[_extraPipeRids]; class ChildProcess { #rid; #waitPromise; #waitComplete = false; - [_pipeFd]; + [_ipcPipeRid]; + [_extraPipeRids]; #pid; get pid() { @@ -268,7 +272,8 @@ class ChildProcess { stdinRid, stdoutRid, stderrRid, - pipeFd, // internal + ipcPipeRid, // internal + extraPipeRids, } = null) { if (key !== illegalConstructorKey) { throw new TypeError("Illegal constructor."); @@ -276,7 +281,8 @@ class ChildProcess { this.#rid = rid; this.#pid = pid; - this[_pipeFd] = pipeFd; + this[_ipcPipeRid] = ipcPipeRid; + this[_extraPipeRids] = extraPipeRids; if (stdinRid !== null) { this.#stdin = writableStreamForRid(stdinRid); @@ -380,7 +386,6 @@ function spawn(command, options) { ); } return spawnChildInner( - op_spawn_child, command, "Deno.Command().output()", options, @@ -417,6 +422,7 @@ function spawnSync(command, { stdout, stderr, windowsRawArguments, + extraStdio: [], }); return { success: result.status.success, diff --git a/runtime/ops/process.rs b/runtime/ops/process.rs index 69fb5cf29d..9d166a8011 100644 --- a/runtime/ops/process.rs +++ b/runtime/ops/process.rs @@ -154,6 +154,8 @@ pub struct SpawnArgs { #[serde(flatten)] stdio: ChildStdio, + + extra_stdio: Vec, } #[derive(Deserialize)] @@ -215,7 +217,12 @@ pub struct SpawnOutput { stderr: Option, } -type CreateCommand = (std::process::Command, Option); +type CreateCommand = ( + std::process::Command, + Option, + Vec>, + Vec, +); fn create_command( state: &mut OpState, @@ -277,216 +284,103 @@ fn create_command( // TODO(bartlomieju): #[allow(clippy::undocumented_unsafe_blocks)] unsafe { + let mut extra_pipe_rids = Vec::new(); + let mut fds_to_dup = Vec::new(); + let mut fds_to_close = Vec::new(); + let mut ipc_rid = None; if let Some(ipc) = args.ipc { - if ipc < 0 { - return Ok((command, None)); + if ipc >= 0 { + let (ipc_fd1, ipc_fd2) = deno_io::bi_pipe_pair_raw()?; + fds_to_dup.push((ipc_fd2, ipc)); + fds_to_close.push(ipc_fd2); + /* One end returned to parent process (this) */ + let pipe_rid = + state + .resource_table + .add(deno_node::IpcJsonStreamResource::new( + ipc_fd1 as _, + deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()), + )?); + /* The other end passed to child process via NODE_CHANNEL_FD */ + command.env("NODE_CHANNEL_FD", format!("{}", ipc)); + ipc_rid = Some(pipe_rid); } - // SockFlag is broken on macOS - // https://github.com/nix-rust/nix/issues/861 - let mut fds = [-1, -1]; - #[cfg(not(target_os = "macos"))] - let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK; - - #[cfg(target_os = "macos")] - let flags = 0; - - let ret = libc::socketpair( - libc::AF_UNIX, - libc::SOCK_STREAM | flags, - 0, - fds.as_mut_ptr(), - ); - if ret != 0 { - return Err(std::io::Error::last_os_error().into()); - } - - if cfg!(target_os = "macos") { - let fcntl = - |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> { - let flags = libc::fcntl(fd, libc::F_GETFL, 0); - - if flags == -1 { - return Err(fail(fds)); - } - let ret = libc::fcntl(fd, libc::F_SETFL, flags | flag); - if ret == -1 { - return Err(fail(fds)); - } - Ok(()) - }; - - fn fail(fds: [i32; 2]) -> std::io::Error { - unsafe { - libc::close(fds[0]); - libc::close(fds[1]); - } - std::io::Error::last_os_error() - } - - // SOCK_NONBLOCK is not supported on macOS. - (fcntl)(fds[0], libc::O_NONBLOCK)?; - (fcntl)(fds[1], libc::O_NONBLOCK)?; - - // SOCK_CLOEXEC is not supported on macOS. - (fcntl)(fds[0], libc::FD_CLOEXEC)?; - (fcntl)(fds[1], libc::FD_CLOEXEC)?; - } - - let fd1 = fds[0]; - let fd2 = fds[1]; - - command.pre_exec(move || { - if ipc >= 0 { - let _fd = libc::dup2(fd2, ipc); - libc::close(fd2); - } - libc::setgroups(0, std::ptr::null()); - Ok(()) - }); - - /* One end returned to parent process (this) */ - let pipe_rid = Some(state.resource_table.add( - deno_node::IpcJsonStreamResource::new( - fd1 as _, - deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()), - )?, - )); - - /* The other end passed to child process via NODE_CHANNEL_FD */ - command.env("NODE_CHANNEL_FD", format!("{}", ipc)); - - return Ok((command, pipe_rid)); } - Ok((command, None)) + for (i, stdio) in args.extra_stdio.into_iter().enumerate() { + // index 0 in `extra_stdio` actually refers to fd 3 + // because we handle stdin,stdout,stderr specially + let fd = (i + 3) as i32; + // TODO(nathanwhit): handle inherited, but this relies on the parent process having + // fds open already. since we don't generally support dealing with raw fds, + // we can't properly support this + if matches!(stdio, Stdio::Piped) { + let (fd1, fd2) = deno_io::bi_pipe_pair_raw()?; + fds_to_dup.push((fd2, fd)); + fds_to_close.push(fd2); + let rid = state.resource_table.add( + match deno_io::BiPipeResource::from_raw_handle(fd1) { + Ok(v) => v, + Err(e) => { + log::warn!("Failed to open bidirectional pipe for fd {fd}: {e}"); + extra_pipe_rids.push(None); + continue; + } + }, + ); + extra_pipe_rids.push(Some(rid)); + } else { + extra_pipe_rids.push(None); + } + } + + command.pre_exec(move || { + for &(src, dst) in &fds_to_dup { + if src >= 0 && dst >= 0 { + let _fd = libc::dup2(src, dst); + libc::close(src); + } + } + libc::setgroups(0, std::ptr::null()); + Ok(()) + }); + + Ok((command, ipc_rid, extra_pipe_rids, fds_to_close)) } #[cfg(windows)] - // Safety: We setup a windows named pipe and pass one end to the child process. - unsafe { - use windows_sys::Win32::Foundation::CloseHandle; - use windows_sys::Win32::Foundation::DuplicateHandle; - use windows_sys::Win32::Foundation::DUPLICATE_SAME_ACCESS; - use windows_sys::Win32::Foundation::ERROR_ACCESS_DENIED; - use windows_sys::Win32::Foundation::ERROR_PIPE_CONNECTED; - use windows_sys::Win32::Foundation::GENERIC_READ; - use windows_sys::Win32::Foundation::GENERIC_WRITE; - use windows_sys::Win32::Foundation::INVALID_HANDLE_VALUE; - use windows_sys::Win32::Security::SECURITY_ATTRIBUTES; - use windows_sys::Win32::Storage::FileSystem::CreateFileW; - use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_FIRST_PIPE_INSTANCE; - use windows_sys::Win32::Storage::FileSystem::FILE_FLAG_OVERLAPPED; - use windows_sys::Win32::Storage::FileSystem::OPEN_EXISTING; - use windows_sys::Win32::Storage::FileSystem::PIPE_ACCESS_DUPLEX; - use windows_sys::Win32::System::Pipes::ConnectNamedPipe; - use windows_sys::Win32::System::Pipes::CreateNamedPipeW; - use windows_sys::Win32::System::Pipes::PIPE_READMODE_BYTE; - use windows_sys::Win32::System::Pipes::PIPE_TYPE_BYTE; - use windows_sys::Win32::System::Threading::GetCurrentProcess; - - use std::io; - use std::os::windows::ffi::OsStrExt; - use std::path::Path; - use std::ptr; - + { + let mut ipc_rid = None; + let mut handles_to_close = Vec::with_capacity(1); if let Some(ipc) = args.ipc { - if ipc < 0 { - return Ok((command, None)); + if ipc >= 0 { + let (hd1, hd2) = deno_io::bi_pipe_pair_raw()?; + + /* One end returned to parent process (this) */ + let pipe_rid = Some(state.resource_table.add( + deno_node::IpcJsonStreamResource::new( + hd1 as i64, + deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()), + )?, + )); + + /* The other end passed to child process via NODE_CHANNEL_FD */ + command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64)); + + handles_to_close.push(hd2); + + ipc_rid = pipe_rid; } - - let (path, hd1) = loop { - let name = format!("\\\\.\\pipe\\{}", uuid::Uuid::new_v4()); - let mut path = Path::new(&name) - .as_os_str() - .encode_wide() - .collect::>(); - path.push(0); - - let hd1 = CreateNamedPipeW( - path.as_ptr(), - PIPE_ACCESS_DUPLEX - | FILE_FLAG_FIRST_PIPE_INSTANCE - | FILE_FLAG_OVERLAPPED, - PIPE_TYPE_BYTE | PIPE_READMODE_BYTE, - 1, - 65536, - 65536, - 0, - std::ptr::null_mut(), - ); - - if hd1 == INVALID_HANDLE_VALUE { - let err = io::Error::last_os_error(); - /* If the pipe name is already in use, try again. */ - if err.raw_os_error() == Some(ERROR_ACCESS_DENIED as i32) { - continue; - } - - return Err(err.into()); - } - - break (path, hd1); - }; - - /* Create child pipe handle. */ - let s = SECURITY_ATTRIBUTES { - nLength: std::mem::size_of::() as u32, - lpSecurityDescriptor: ptr::null_mut(), - bInheritHandle: 1, - }; - let mut hd2 = CreateFileW( - path.as_ptr(), - GENERIC_READ | GENERIC_WRITE, - 0, - &s, - OPEN_EXISTING, - FILE_FLAG_OVERLAPPED, - 0, - ); - if hd2 == INVALID_HANDLE_VALUE { - return Err(io::Error::last_os_error().into()); - } - - // Will not block because we have create the pair. - if ConnectNamedPipe(hd1, ptr::null_mut()) == 0 { - let err = std::io::Error::last_os_error(); - if err.raw_os_error() != Some(ERROR_PIPE_CONNECTED as i32) { - CloseHandle(hd2); - return Err(err.into()); - } - } - - // Duplicating the handle to allow the child process to use it. - if DuplicateHandle( - GetCurrentProcess(), - hd2, - GetCurrentProcess(), - &mut hd2, - 0, - 1, - DUPLICATE_SAME_ACCESS, - ) == 0 - { - return Err(std::io::Error::last_os_error().into()); - } - - /* One end returned to parent process (this) */ - let pipe_fd = Some(state.resource_table.add( - deno_node::IpcJsonStreamResource::new( - hd1 as i64, - deno_node::IpcRefTracker::new(state.external_ops_tracker.clone()), - )?, - )); - - /* The other end passed to child process via NODE_CHANNEL_FD */ - command.env("NODE_CHANNEL_FD", format!("{}", hd2 as i64)); - - return Ok((command, pipe_fd)); } - } - #[cfg(not(unix))] - return Ok((command, None)); + if args.extra_stdio.iter().any(|s| matches!(s, Stdio::Piped)) { + log::warn!( + "Additional stdio pipes beyond stdin/stdout/stderr are not currently supported on windows" + ); + } + + Ok((command, ipc_rid, vec![], handles_to_close)) + } } #[derive(Serialize)] @@ -497,13 +391,15 @@ struct Child { stdin_rid: Option, stdout_rid: Option, stderr_rid: Option, - pipe_fd: Option, + ipc_pipe_rid: Option, + extra_pipe_rids: Vec>, } fn spawn_child( state: &mut OpState, command: std::process::Command, - pipe_fd: Option, + ipc_pipe_rid: Option, + extra_pipe_rids: Vec>, ) -> Result { let mut command = tokio::process::Command::from(command); // TODO(@crowlkats): allow detaching processes. @@ -585,10 +481,28 @@ fn spawn_child( stdin_rid, stdout_rid, stderr_rid, - pipe_fd, + ipc_pipe_rid, + extra_pipe_rids, }) } +fn close_raw_handle(handle: deno_io::RawBiPipeHandle) { + #[cfg(unix)] + { + // SAFETY: libc call + unsafe { + libc::close(handle); + } + } + #[cfg(windows)] + { + // SAFETY: win32 call + unsafe { + windows_sys::Win32::Foundation::CloseHandle(handle as _); + } + } +} + #[op2] #[serde] fn op_spawn_child( @@ -596,8 +510,13 @@ fn op_spawn_child( #[serde] args: SpawnArgs, #[string] api_name: String, ) -> Result { - let (command, pipe_rid) = create_command(state, args, &api_name)?; - spawn_child(state, command, pipe_rid) + let (command, pipe_rid, extra_pipe_rids, handles_to_close) = + create_command(state, args, &api_name)?; + let child = spawn_child(state, command, pipe_rid, extra_pipe_rids); + for handle in handles_to_close { + close_raw_handle(handle); + } + child } #[op2(async)] @@ -626,7 +545,7 @@ fn op_spawn_sync( ) -> Result { let stdout = matches!(args.stdio.stdout, StdioOrRid::Stdio(Stdio::Piped)); let stderr = matches!(args.stdio.stderr, StdioOrRid::Stdio(Stdio::Piped)); - let (mut command, _) = + let (mut command, _, _, _) = create_command(state, args, "Deno.Command().outputSync()")?; let output = command.output().with_context(|| { format!( diff --git a/tests/specs/node/child_process_extra_pipes/__test__.jsonc b/tests/specs/node/child_process_extra_pipes/__test__.jsonc new file mode 100644 index 0000000000..f8073f3dfd --- /dev/null +++ b/tests/specs/node/child_process_extra_pipes/__test__.jsonc @@ -0,0 +1,17 @@ +{ + "tempDir": true, + "steps": [ + { + "if": "unix", + "cwd": "./test-pipe", + "commandName": "cargo", + "args": "build", + "output": "[WILDCARD]" + }, + { + "if": "unix", + "args": "run -A main.ts", + "output": "main.out" + } + ] +} diff --git a/tests/specs/node/child_process_extra_pipes/main.out b/tests/specs/node/child_process_extra_pipes/main.out new file mode 100644 index 0000000000..694126b924 --- /dev/null +++ b/tests/specs/node/child_process_extra_pipes/main.out @@ -0,0 +1,5 @@ +data: hello world +[UNORDERED_START] +child closed +pipe closed +[UNORDERED_END] diff --git a/tests/specs/node/child_process_extra_pipes/main.ts b/tests/specs/node/child_process_extra_pipes/main.ts new file mode 100644 index 0000000000..a3683fe9ee --- /dev/null +++ b/tests/specs/node/child_process_extra_pipes/main.ts @@ -0,0 +1,26 @@ +import child_process from "node:child_process"; +import { Buffer } from "node:buffer"; +import console from "node:console"; + +const child = child_process.spawn("./test-pipe/target/debug/test-pipe", [], { + stdio: ["inherit", "inherit", "inherit", "ignore", "pipe"], +}); + +const extra = child.stdio[4]; + +const p = Promise.withResolvers(); + +child.on("close", () => { + console.log("child closed"); + p.resolve(); +}); + +extra.on("data", (d) => { + console.log("data:", d.toString().trim()); +}); + +extra.on("close", () => { + console.log("pipe closed"); +}); + +await p.promise; diff --git a/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.lock b/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.lock new file mode 100644 index 0000000000..51bfabdad6 --- /dev/null +++ b/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "test-pipe" +version = "0.1.0" diff --git a/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.toml b/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.toml new file mode 100644 index 0000000000..26c5522994 --- /dev/null +++ b/tests/specs/node/child_process_extra_pipes/test-pipe/Cargo.toml @@ -0,0 +1,8 @@ +[package] +name = "test-pipe" +version = "0.1.0" +edition = "2021" + +[dependencies] + +[workspace] diff --git a/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs b/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs new file mode 100644 index 0000000000..192f827313 --- /dev/null +++ b/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs @@ -0,0 +1,12 @@ +use std::io::prelude::*; +use std::os::fd::FromRawFd; +use std::os::unix::net::UnixStream; + +fn main() { + #[cfg(unix)] + { + let mut stream = unsafe { UnixStream::from_raw_fd(4) }; + + stream.write_all(b"hello world\n").unwrap(); + } +} diff --git a/tools/core_import_map.json b/tools/core_import_map.json index e7703a40c0..ba4cd105d9 100644 --- a/tools/core_import_map.json +++ b/tools/core_import_map.json @@ -166,6 +166,7 @@ "ext:deno_node/internal/test/binding.ts": "../ext/node/polyfills/internal/test/binding.ts", "ext:deno_node/internal/timers.mjs": "../ext/node/polyfills/internal/timers.mjs", "ext:deno_node/internal/url.ts": "../ext/node/polyfills/internal/url.ts", + "ext:deno_node/internal/stream_base_commons.ts": "../ext/node/polyfills/internal/stream_base_commons.ts", "ext:deno_node/internal/util.mjs": "../ext/node/polyfills/internal/util.mjs", "ext:deno_node/internal/util/debuglog.ts": "../ext/node/polyfills/internal/util/debuglog.ts", "ext:deno_node/internal/util/inspect.mjs": "../ext/node/polyfills/internal/util/inspect.mjs",