diff --git a/src/tools/miri/src/shims/unix/foreign_items.rs b/src/tools/miri/src/shims/unix/foreign_items.rs index 6c35281ecf2..273a99b3116 100644 --- a/src/tools/miri/src/shims/unix/foreign_items.rs +++ b/src/tools/miri/src/shims/unix/foreign_items.rs @@ -288,14 +288,25 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { this.write_scalar(result, dest)?; } - // Sockets + // Unnamed sockets and pipes "socketpair" => { let [domain, type_, protocol, sv] = this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; - let result = this.socketpair(domain, type_, protocol, sv)?; this.write_scalar(result, dest)?; } + "pipe" => { + let [pipefd] = + this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; + let result = this.pipe2(pipefd, /*flags*/ None)?; + this.write_scalar(result, dest)?; + } + "pipe2" => { + let [pipefd, flags] = + this.check_shim(abi, Abi::C { unwind: false }, link_name, args)?; + let result = this.pipe2(pipefd, Some(flags))?; + this.write_scalar(result, dest)?; + } // Time "gettimeofday" => { diff --git a/src/tools/miri/src/shims/unix/linux/epoll.rs b/src/tools/miri/src/shims/unix/linux/epoll.rs index 53f27868aeb..127817d5bbe 100644 --- a/src/tools/miri/src/shims/unix/linux/epoll.rs +++ b/src/tools/miri/src/shims/unix/linux/epoll.rs @@ -62,9 +62,10 @@ pub struct EpollEventInterest { /// EpollReadyEvents reflects the readiness of a file description. pub struct EpollReadyEvents { - /// The associated file is available for read(2) operations. + /// The associated file is available for read(2) operations, in the sense that a read will not block. + /// (I.e., returning EOF is considered "ready".) pub epollin: bool, - /// The associated file is available for write(2) operations. + /// The associated file is available for write(2) operations, in the sense that a write will not block. pub epollout: bool, /// Stream socket peer closed connection, or shut down writing /// half of connection. diff --git a/src/tools/miri/src/shims/unix/mod.rs b/src/tools/miri/src/shims/unix/mod.rs index 8cfa659d90a..7da6d7b02a2 100644 --- a/src/tools/miri/src/shims/unix/mod.rs +++ b/src/tools/miri/src/shims/unix/mod.rs @@ -4,9 +4,9 @@ mod env; mod fd; mod fs; mod mem; -mod socket; mod sync; mod thread; +mod unnamed_socket; mod android; mod freebsd; @@ -23,9 +23,9 @@ pub use env::EvalContextExt as _; pub use fd::EvalContextExt as _; pub use fs::EvalContextExt as _; pub use mem::EvalContextExt as _; -pub use socket::EvalContextExt as _; pub use sync::EvalContextExt as _; pub use thread::EvalContextExt as _; +pub use unnamed_socket::EvalContextExt as _; // Make up some constants. const UID: u32 = 1000; diff --git a/src/tools/miri/src/shims/unix/socket.rs b/src/tools/miri/src/shims/unix/unnamed_socket.rs similarity index 71% rename from src/tools/miri/src/shims/unix/socket.rs rename to src/tools/miri/src/shims/unix/unnamed_socket.rs index 3f2adb6e79c..5f4640a2d84 100644 --- a/src/tools/miri/src/shims/unix/socket.rs +++ b/src/tools/miri/src/shims/unix/unnamed_socket.rs @@ -1,3 +1,7 @@ +//! This implements "anonymous" sockets, that do not correspond to anything on the host system and +//! are entirely implemented inside Miri. +//! We also use the same infrastructure to implement unnamed pipes. + use std::cell::{OnceCell, RefCell}; use std::collections::VecDeque; use std::io; @@ -13,12 +17,13 @@ use crate::{concurrency::VClock, *}; /// be configured in the real system. const MAX_SOCKETPAIR_BUFFER_CAPACITY: usize = 212992; -/// Pair of connected sockets. +/// One end of a pair of connected unnamed sockets. #[derive(Debug)] -struct SocketPair { - /// The buffer we are reading from. - readbuf: RefCell, - /// The `SocketPair` file descriptor that is our "peer", and that holds the buffer we are +struct AnonSocket { + /// The buffer we are reading from, or `None` if this is the writing end of a pipe. + /// (In that case, the peer FD will be the reading end of that pipe.) + readbuf: Option>, + /// The `AnonSocket` file descriptor that is our "peer", and that holds the buffer we are /// writing to. This is a weak reference because the other side may be closed before us; all /// future writes will then trigger EPIPE. peer_fd: OnceCell, @@ -37,13 +42,13 @@ impl Buffer { } } -impl SocketPair { +impl AnonSocket { fn peer_fd(&self) -> &WeakFileDescriptionRef { self.peer_fd.get().unwrap() } } -impl FileDescription for SocketPair { +impl FileDescription for AnonSocket { fn name(&self) -> &'static str { "socketpair" } @@ -55,17 +60,25 @@ impl FileDescription for SocketPair { let mut epoll_ready_events = EpollReadyEvents::new(); // Check if it is readable. - let readbuf = self.readbuf.borrow(); - if !readbuf.buf.is_empty() { + if let Some(readbuf) = &self.readbuf { + if !readbuf.borrow().buf.is_empty() { + epoll_ready_events.epollin = true; + } + } else { + // Without a read buffer, reading never blocks, so we are always ready. epoll_ready_events.epollin = true; } // Check if is writable. if let Some(peer_fd) = self.peer_fd().upgrade() { - let writebuf = &peer_fd.downcast::().unwrap().readbuf.borrow(); - let data_size = writebuf.buf.len(); - let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size); - if available_space != 0 { + if let Some(writebuf) = &peer_fd.downcast::().unwrap().readbuf { + let data_size = writebuf.borrow().buf.len(); + let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size); + if available_space != 0 { + epoll_ready_events.epollout = true; + } + } else { + // Without a write buffer, writing never blocks. epoll_ready_events.epollout = true; } } else { @@ -108,7 +121,12 @@ impl FileDescription for SocketPair { return Ok(Ok(0)); } - let mut readbuf = self.readbuf.borrow_mut(); + let Some(readbuf) = &self.readbuf else { + // FIXME: This should return EBADF, but there's no nice way to do that as there's no + // corresponding ErrorKind variant. + throw_unsup_format!("reading from the write end of a pipe"); + }; + let mut readbuf = readbuf.borrow_mut(); if readbuf.buf.is_empty() { if self.peer_fd().upgrade().is_none() { // Socketpair with no peer and empty buffer. @@ -176,7 +194,13 @@ impl FileDescription for SocketPair { // closed. return Ok(Err(Error::from(ErrorKind::BrokenPipe))); }; - let mut writebuf = peer_fd.downcast::().unwrap().readbuf.borrow_mut(); + + let Some(writebuf) = &peer_fd.downcast::().unwrap().readbuf else { + // FIXME: This should return EBADF, but there's no nice way to do that as there's no + // corresponding ErrorKind variant. + throw_unsup_format!("writing to the reading end of a pipe"); + }; + let mut writebuf = writebuf.borrow_mut(); let data_size = writebuf.buf.len(); let available_space = MAX_SOCKETPAIR_BUFFER_CAPACITY.strict_sub(data_size); if available_space == 0 { @@ -227,12 +251,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { let mut is_sock_nonblock = false; - // Parse and remove the type flags that we support. If type != 0 after removing, - // unsupported flags are used. - if type_ & this.eval_libc_i32("SOCK_STREAM") == this.eval_libc_i32("SOCK_STREAM") { - type_ &= !(this.eval_libc_i32("SOCK_STREAM")); - } - + // Parse and remove the type flags that we support. // SOCK_NONBLOCK only exists on Linux. if this.tcx.sess.target.os == "linux" { if type_ & this.eval_libc_i32("SOCK_NONBLOCK") == this.eval_libc_i32("SOCK_NONBLOCK") { @@ -253,7 +272,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { and AF_LOCAL are allowed", domain ); - } else if type_ != 0 { + } else if type_ != this.eval_libc_i32("SOCK_STREAM") { throw_unsup_format!( "socketpair: type {:#x} is unsupported, only SOCK_STREAM, \ SOCK_CLOEXEC and SOCK_NONBLOCK are allowed", @@ -268,20 +287,20 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // Generate file descriptions. let fds = &mut this.machine.fds; - let fd0 = fds.new_ref(SocketPair { - readbuf: RefCell::new(Buffer::new()), + let fd0 = fds.new_ref(AnonSocket { + readbuf: Some(RefCell::new(Buffer::new())), peer_fd: OnceCell::new(), is_nonblock: is_sock_nonblock, }); - let fd1 = fds.new_ref(SocketPair { - readbuf: RefCell::new(Buffer::new()), + let fd1 = fds.new_ref(AnonSocket { + readbuf: Some(RefCell::new(Buffer::new())), peer_fd: OnceCell::new(), is_nonblock: is_sock_nonblock, }); // Make the file descriptions point to each other. - fd0.downcast::().unwrap().peer_fd.set(fd1.downgrade()).unwrap(); - fd1.downcast::().unwrap().peer_fd.set(fd0.downgrade()).unwrap(); + fd0.downcast::().unwrap().peer_fd.set(fd1.downgrade()).unwrap(); + fd1.downcast::().unwrap().peer_fd.set(fd0.downgrade()).unwrap(); // Insert the file description to the fd table, generating the file descriptors. let sv0 = fds.insert(fd0); @@ -295,4 +314,51 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { Ok(Scalar::from_i32(0)) } + + fn pipe2( + &mut self, + pipefd: &OpTy<'tcx>, + flags: Option<&OpTy<'tcx>>, + ) -> InterpResult<'tcx, Scalar> { + let this = self.eval_context_mut(); + + let pipefd = this.deref_pointer(pipefd)?; + let flags = match flags { + Some(flags) => this.read_scalar(flags)?.to_i32()?, + None => 0, + }; + + // As usual we ignore CLOEXEC. + let cloexec = this.eval_libc_i32("O_CLOEXEC"); + if flags != 0 && flags != cloexec { + throw_unsup_format!("unsupported flags in `pipe2`"); + } + + // Generate file descriptions. + // pipefd[0] refers to the read end of the pipe. + let fds = &mut this.machine.fds; + let fd0 = fds.new_ref(AnonSocket { + readbuf: Some(RefCell::new(Buffer::new())), + peer_fd: OnceCell::new(), + is_nonblock: false, + }); + let fd1 = + fds.new_ref(AnonSocket { readbuf: None, peer_fd: OnceCell::new(), is_nonblock: false }); + + // Make the file descriptions point to each other. + fd0.downcast::().unwrap().peer_fd.set(fd1.downgrade()).unwrap(); + fd1.downcast::().unwrap().peer_fd.set(fd0.downgrade()).unwrap(); + + // Insert the file description to the fd table, generating the file descriptors. + let pipefd0 = fds.insert(fd0); + let pipefd1 = fds.insert(fd1); + + // Return file descriptors to the caller. + let pipefd0 = Scalar::from_int(pipefd0, pipefd.layout.size); + let pipefd1 = Scalar::from_int(pipefd1, pipefd.layout.size); + this.write_scalar(pipefd0, &pipefd)?; + this.write_scalar(pipefd1, &pipefd.offset(pipefd.layout.size, pipefd.layout, this)?)?; + + Ok(Scalar::from_i32(0)) + } } diff --git a/src/tools/miri/tests/pass-dep/libc/libc-pipe.rs b/src/tools/miri/tests/pass-dep/libc/libc-pipe.rs new file mode 100644 index 00000000000..5dff612bd89 --- /dev/null +++ b/src/tools/miri/tests/pass-dep/libc/libc-pipe.rs @@ -0,0 +1,99 @@ +//@ignore-target-windows: No libc pipe on Windows +// test_race depends on a deterministic schedule. +//@compile-flags: -Zmiri-preemption-rate=0 +use std::thread; +fn main() { + test_pipe(); + test_pipe_threaded(); + test_race(); +} + +fn test_pipe() { + let mut fds = [-1, -1]; + let res = unsafe { libc::pipe(fds.as_mut_ptr()) }; + assert_eq!(res, 0); + + // Read size == data available in buffer. + let data = "12345".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) }; + assert_eq!(res, 5); + let mut buf3: [u8; 5] = [0; 5]; + let res = unsafe { libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t) }; + assert_eq!(res, 5); + assert_eq!(buf3, "12345".as_bytes()); + + // Read size > data available in buffer. + let data = "123".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3) }; + assert_eq!(res, 3); + let mut buf4: [u8; 5] = [0; 5]; + let res = unsafe { libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t) }; + assert_eq!(res, 3); + assert_eq!(&buf4[0..3], "123".as_bytes()); +} + +fn test_pipe_threaded() { + let mut fds = [-1, -1]; + let res = unsafe { libc::pipe(fds.as_mut_ptr()) }; + assert_eq!(res, 0); + + let thread1 = thread::spawn(move || { + let mut buf: [u8; 5] = [0; 5]; + let res: i64 = unsafe { + libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) + .try_into() + .unwrap() + }; + assert_eq!(res, 5); + assert_eq!(buf, "abcde".as_bytes()); + }); + // FIXME: we should yield here once blocking is implemented. + //thread::yield_now(); + let data = "abcde".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) }; + assert_eq!(res, 5); + thread1.join().unwrap(); + + // Read and write from different direction + let thread2 = thread::spawn(move || { + // FIXME: we should yield here once blocking is implemented. + //thread::yield_now(); + let data = "12345".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) }; + assert_eq!(res, 5); + }); + // FIXME: we should not yield here once blocking is implemented. + thread::yield_now(); + let mut buf: [u8; 5] = [0; 5]; + let res = unsafe { libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 5); + assert_eq!(buf, "12345".as_bytes()); + thread2.join().unwrap(); +} + +fn test_race() { + static mut VAL: u8 = 0; + let mut fds = [-1, -1]; + let res = unsafe { libc::pipe(fds.as_mut_ptr()) }; + assert_eq!(res, 0); + let thread1 = thread::spawn(move || { + let mut buf: [u8; 1] = [0; 1]; + // write() from the main thread will occur before the read() here + // because preemption is disabled and the main thread yields after write(). + let res: i32 = unsafe { + libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) + .try_into() + .unwrap() + }; + assert_eq!(res, 1); + assert_eq!(buf, "a".as_bytes()); + // The read above establishes a happens-before so it is now safe to access this global variable. + unsafe { assert_eq!(VAL, 1) }; + }); + unsafe { VAL = 1 }; + let data = "a".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 1) }; + assert_eq!(res, 1); + thread::yield_now(); + thread1.join().unwrap(); +} diff --git a/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs b/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs index 324c0127ee9..15e040116de 100644 --- a/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs +++ b/src/tools/miri/tests/pass-dep/libc/libc-socketpair.rs @@ -10,65 +10,68 @@ fn main() { fn test_socketpair() { let mut fds = [-1, -1]; - let mut res = - unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; assert_eq!(res, 0); // Read size == data available in buffer. let data = "abcde".as_bytes().as_ptr(); - res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() }; + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5) }; assert_eq!(res, 5); let mut buf: [u8; 5] = [0; 5]; - res = unsafe { - libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap() - }; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; assert_eq!(res, 5); assert_eq!(buf, "abcde".as_bytes()); // Read size > data available in buffer. let data = "abc".as_bytes().as_ptr(); - res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3).try_into().unwrap() }; + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; assert_eq!(res, 3); let mut buf2: [u8; 5] = [0; 5]; - res = unsafe { - libc::read(fds[1], buf2.as_mut_ptr().cast(), buf2.len() as libc::size_t).try_into().unwrap() - }; + let res = unsafe { libc::read(fds[1], buf2.as_mut_ptr().cast(), buf2.len() as libc::size_t) }; assert_eq!(res, 3); assert_eq!(&buf2[0..3], "abc".as_bytes()); // Test read and write from another direction. // Read size == data available in buffer. let data = "12345".as_bytes().as_ptr(); - res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5).try_into().unwrap() }; + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) }; assert_eq!(res, 5); let mut buf3: [u8; 5] = [0; 5]; - res = unsafe { - libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t).try_into().unwrap() - }; + let res = unsafe { libc::read(fds[0], buf3.as_mut_ptr().cast(), buf3.len() as libc::size_t) }; assert_eq!(res, 5); assert_eq!(buf3, "12345".as_bytes()); // Read size > data available in buffer. let data = "123".as_bytes().as_ptr(); - res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3).try_into().unwrap() }; + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 3) }; assert_eq!(res, 3); let mut buf4: [u8; 5] = [0; 5]; - res = unsafe { - libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t).try_into().unwrap() - }; + let res = unsafe { libc::read(fds[0], buf4.as_mut_ptr().cast(), buf4.len() as libc::size_t) }; assert_eq!(res, 3); assert_eq!(&buf4[0..3], "123".as_bytes()); + + // Test when happens when we close one end, with some data in the buffer. + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 3) }; + assert_eq!(res, 3); + unsafe { libc::close(fds[0]) }; + // Reading the other end should return that data, then EOF. + let mut buf: [u8; 5] = [0; 5]; + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 3); + assert_eq!(&buf[0..3], "123".as_bytes()); + let res = unsafe { libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; + assert_eq!(res, 0); // 0-sized read: EOF. + // Writing the other end should emit EPIPE. + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 1) }; + assert_eq!(res, -1); + assert_eq!(std::io::Error::last_os_error().raw_os_error(), Some(libc::EPIPE)); } fn test_socketpair_threaded() { let mut fds = [-1, -1]; - let mut res = - unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; assert_eq!(res, 0); - let data = "abcde".as_bytes().as_ptr(); - res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() }; - assert_eq!(res, 5); let thread1 = thread::spawn(move || { let mut buf: [u8; 5] = [0; 5]; let res: i64 = unsafe { @@ -79,28 +82,34 @@ fn test_socketpair_threaded() { assert_eq!(res, 5); assert_eq!(buf, "abcde".as_bytes()); }); + // FIXME: we should yield here once blocking is implemented. + //thread::yield_now(); + let data = "abcde".as_bytes().as_ptr(); + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 5) }; + assert_eq!(res, 5); thread1.join().unwrap(); // Read and write from different direction let thread2 = thread::spawn(move || { + // FIXME: we should yield here once blocking is implemented. + //thread::yield_now(); let data = "12345".as_bytes().as_ptr(); - let res: i64 = - unsafe { libc::write(fds[0], data as *const libc::c_void, 5).try_into().unwrap() }; + let res = unsafe { libc::write(fds[1], data as *const libc::c_void, 5) }; assert_eq!(res, 5); }); - thread2.join().unwrap(); + // FIXME: we should not yield here once blocking is implemented. + thread::yield_now(); let mut buf: [u8; 5] = [0; 5]; - res = unsafe { - libc::read(fds[1], buf.as_mut_ptr().cast(), buf.len() as libc::size_t).try_into().unwrap() - }; + let res = unsafe { libc::read(fds[0], buf.as_mut_ptr().cast(), buf.len() as libc::size_t) }; assert_eq!(res, 5); assert_eq!(buf, "12345".as_bytes()); + thread2.join().unwrap(); } + fn test_race() { static mut VAL: u8 = 0; let mut fds = [-1, -1]; - let mut res = - unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; + let res = unsafe { libc::socketpair(libc::AF_UNIX, libc::SOCK_STREAM, 0, fds.as_mut_ptr()) }; assert_eq!(res, 0); let thread1 = thread::spawn(move || { let mut buf: [u8; 1] = [0; 1]; @@ -113,11 +122,12 @@ fn test_race() { }; assert_eq!(res, 1); assert_eq!(buf, "a".as_bytes()); + // The read above establishes a happens-before so it is now safe to access this global variable. unsafe { assert_eq!(VAL, 1) }; }); unsafe { VAL = 1 }; let data = "a".as_bytes().as_ptr(); - res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1).try_into().unwrap() }; + let res = unsafe { libc::write(fds[0], data as *const libc::c_void, 1) }; assert_eq!(res, 1); thread::yield_now(); thread1.join().unwrap();