diff --git a/ext/io/bi_pipe.rs b/ext/io/bi_pipe.rs index b6fc70ca24..3492e2f441 100644 --- a/ext/io/bi_pipe.rs +++ b/ext/io/bi_pipe.rs @@ -183,9 +183,10 @@ fn from_raw( ) -> Result<(BiPipeRead, BiPipeWrite), std::io::Error> { use std::os::fd::FromRawFd; // Safety: The fd is part of a pair of connected sockets - let unix_stream = tokio::net::UnixStream::from_std(unsafe { - std::os::unix::net::UnixStream::from_raw_fd(stream) - })?; + let unix_stream = + unsafe { std::os::unix::net::UnixStream::from_raw_fd(stream) }; + unix_stream.set_nonblocking(true)?; + let unix_stream = tokio::net::UnixStream::from_std(unix_stream)?; let (read, write) = unix_stream.into_split(); Ok((BiPipeRead { inner: read }, BiPipeWrite { inner: write })) } @@ -280,7 +281,7 @@ pub fn bi_pipe_pair_raw( // https://github.com/nix-rust/nix/issues/861 let mut fds = [-1, -1]; #[cfg(not(target_os = "macos"))] - let flags = libc::SOCK_CLOEXEC | libc::SOCK_NONBLOCK; + let flags = libc::SOCK_CLOEXEC; #[cfg(target_os = "macos")] let flags = 0; @@ -301,13 +302,13 @@ pub fn bi_pipe_pair_raw( if cfg!(target_os = "macos") { let fcntl = |fd: i32, flag: libc::c_int| -> Result<(), std::io::Error> { // SAFETY: libc call, fd is valid - let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + let flags = unsafe { libc::fcntl(fd, libc::F_GETFD) }; if flags == -1 { return Err(fail(fds)); } // SAFETY: libc call, fd is valid - let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flags | flag) }; + let ret = unsafe { libc::fcntl(fd, libc::F_SETFD, flags | flag) }; if ret == -1 { return Err(fail(fds)); } @@ -323,13 +324,9 @@ pub fn bi_pipe_pair_raw( std::io::Error::last_os_error() } - // SOCK_NONBLOCK is not supported on macOS. - (fcntl)(fds[0], libc::O_NONBLOCK)?; - (fcntl)(fds[1], libc::O_NONBLOCK)?; - // SOCK_CLOEXEC is not supported on macOS. - (fcntl)(fds[0], libc::FD_CLOEXEC)?; - (fcntl)(fds[1], libc::FD_CLOEXEC)?; + fcntl(fds[0], libc::FD_CLOEXEC)?; + fcntl(fds[1], libc::FD_CLOEXEC)?; } let fd1 = fds[0]; diff --git a/tests/specs/node/child_process_extra_pipes/main.out b/tests/specs/node/child_process_extra_pipes/main.out index 694126b924..436afd2f75 100644 --- a/tests/specs/node/child_process_extra_pipes/main.out +++ b/tests/specs/node/child_process_extra_pipes/main.out @@ -1,5 +1,5 @@ -data: hello world [UNORDERED_START] child closed +got: hello world pipe closed [UNORDERED_END] diff --git a/tests/specs/node/child_process_extra_pipes/main.ts b/tests/specs/node/child_process_extra_pipes/main.ts index a3683fe9ee..2837cdc53c 100644 --- a/tests/specs/node/child_process_extra_pipes/main.ts +++ b/tests/specs/node/child_process_extra_pipes/main.ts @@ -1,5 +1,4 @@ import child_process from "node:child_process"; -import { Buffer } from "node:buffer"; import console from "node:console"; const child = child_process.spawn("./test-pipe/target/debug/test-pipe", [], { @@ -8,19 +7,32 @@ const child = child_process.spawn("./test-pipe/target/debug/test-pipe", [], { const extra = child.stdio[4]; -const p = Promise.withResolvers(); +if (!extra) { + throw new Error("no extra pipe"); +} + +const p = Promise.withResolvers(); + +let got = ""; child.on("close", () => { console.log("child closed"); - p.resolve(); + console.log("got:", got); + if (got === "hello world") { + p.resolve(); + } else { + p.reject(new Error(`wanted "hello world", got "${got}"`)); + } }); extra.on("data", (d) => { - console.log("data:", d.toString().trim()); + got += d.toString(); }); extra.on("close", () => { console.log("pipe closed"); }); +extra.write("start"); + await p.promise; diff --git a/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs b/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs index 192f827313..acc034830f 100644 --- a/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs +++ b/tests/specs/node/child_process_extra_pipes/test-pipe/src/main.rs @@ -1,12 +1,31 @@ +use std::fs::File; use std::io::prelude::*; use std::os::fd::FromRawFd; -use std::os::unix::net::UnixStream; fn main() { #[cfg(unix)] { - let mut stream = unsafe { UnixStream::from_raw_fd(4) }; + let mut pipe = unsafe { File::from_raw_fd(4) }; - stream.write_all(b"hello world\n").unwrap(); + let mut read = 0; + let mut buf = [0u8; 1024]; + loop { + if read > 4 { + assert_eq!(&buf[..5], b"start"); + break; + } + match pipe.read(&mut buf) { + Ok(n) => { + read += n; + } + Ok(0) => { + return; + } + Err(e) => { + eprintln!("GOT ERROR: {e:?}"); + } + } + } + pipe.write_all(b"hello world").unwrap(); } }