From 6c165f8dc03455863bc123101d4f4a0dabdfcfbf Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Mon, 28 Aug 2023 01:53:15 +0200 Subject: [PATCH] sync/pipe: impl BufRead. --- embassy-sync/src/pipe.rs | 310 +++++++++++++++++++++----------- embassy-sync/src/ring_buffer.rs | 46 ++--- examples/rp/src/bin/pio_uart.rs | 10 +- 3 files changed, 233 insertions(+), 133 deletions(-) diff --git a/embassy-sync/src/pipe.rs b/embassy-sync/src/pipe.rs index 21d451ea6..ec0cbbf2a 100644 --- a/embassy-sync/src/pipe.rs +++ b/embassy-sync/src/pipe.rs @@ -1,7 +1,8 @@ //! Async byte stream pipe. -use core::cell::RefCell; +use core::cell::{RefCell, UnsafeCell}; use core::future::Future; +use core::ops::Range; use core::pin::Pin; use core::task::{Context, Poll}; @@ -82,17 +83,6 @@ where pipe: &'p Pipe, } -impl<'p, M, const N: usize> Clone for Reader<'p, M, N> -where - M: RawMutex, -{ - fn clone(&self) -> Self { - Reader { pipe: self.pipe } - } -} - -impl<'p, M, const N: usize> Copy for Reader<'p, M, N> where M: RawMutex {} - impl<'p, M, const N: usize> Reader<'p, M, N> where M: RawMutex, @@ -110,6 +100,29 @@ where pub fn try_read(&self, buf: &mut [u8]) -> Result { self.pipe.try_read(buf) } + + /// Return the contents of the internal buffer, filling it with more data from the inner reader if it is empty. + /// + /// If no bytes are currently available to read, this function waits until at least one byte is available. + /// + /// If the reader is at end-of-file (EOF), an empty slice is returned. + pub fn fill_buf(&mut self) -> FillBufFuture<'_, M, N> { + FillBufFuture { pipe: Some(self.pipe) } + } + + /// Try returning contents of the internal buffer. + /// + /// If no bytes are currently available to read, this function returns `Err(TryReadError::Empty)`. + /// + /// If the reader is at end-of-file (EOF), an empty slice is returned. + pub fn try_fill_buf(&mut self) -> Result<&[u8], TryReadError> { + unsafe { self.pipe.try_fill_buf_with_context(None) } + } + + /// Tell this buffer that `amt` bytes have been consumed from the buffer, so they should no longer be returned in calls to `fill_buf`. + pub fn consume(&mut self, amt: usize) { + self.pipe.consume(amt) + } } /// Future returned by [`Pipe::read`] and [`Reader::read`]. @@ -138,6 +151,35 @@ where impl<'p, M, const N: usize> Unpin for ReadFuture<'p, M, N> where M: RawMutex {} +/// Future returned by [`Pipe::fill_buf`] and [`Reader::fill_buf`]. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub struct FillBufFuture<'p, M, const N: usize> +where + M: RawMutex, +{ + pipe: Option<&'p Pipe>, +} + +impl<'p, M, const N: usize> Future for FillBufFuture<'p, M, N> +where + M: RawMutex, +{ + type Output = &'p [u8]; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let pipe = self.pipe.take().unwrap(); + match unsafe { pipe.try_fill_buf_with_context(Some(cx)) } { + Ok(buf) => Poll::Ready(buf), + Err(TryReadError::Empty) => { + self.pipe = Some(pipe); + Poll::Pending + } + } + } +} + +impl<'p, M, const N: usize> Unpin for FillBufFuture<'p, M, N> where M: RawMutex {} + /// Error returned by [`try_read`](Pipe::try_read). #[derive(PartialEq, Eq, Clone, Copy, Debug)] #[cfg_attr(feature = "defmt", derive(defmt::Format))] @@ -162,67 +204,24 @@ struct PipeState { write_waker: WakerRegistration, } -impl PipeState { - const fn new() -> Self { - PipeState { - buffer: RingBuffer::new(), - read_waker: WakerRegistration::new(), - write_waker: WakerRegistration::new(), - } +#[repr(transparent)] +struct Buffer(UnsafeCell<[u8; N]>); + +impl Buffer { + unsafe fn get<'a>(&self, r: Range) -> &'a [u8] { + let p = self.0.get() as *const u8; + core::slice::from_raw_parts(p.add(r.start), r.end - r.start) } - fn clear(&mut self) { - self.buffer.clear(); - self.write_waker.wake(); - } - - fn try_read(&mut self, buf: &mut [u8]) -> Result { - self.try_read_with_context(None, buf) - } - - fn try_read_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { - if self.buffer.is_full() { - self.write_waker.wake(); - } - - let available = self.buffer.pop_buf(); - if available.is_empty() { - if let Some(cx) = cx { - self.read_waker.register(cx.waker()); - } - return Err(TryReadError::Empty); - } - - let n = available.len().min(buf.len()); - buf[..n].copy_from_slice(&available[..n]); - self.buffer.pop(n); - Ok(n) - } - - fn try_write(&mut self, buf: &[u8]) -> Result { - self.try_write_with_context(None, buf) - } - - fn try_write_with_context(&mut self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { - if self.buffer.is_empty() { - self.read_waker.wake(); - } - - let available = self.buffer.push_buf(); - if available.is_empty() { - if let Some(cx) = cx { - self.write_waker.register(cx.waker()); - } - return Err(TryWriteError::Full); - } - - let n = available.len().min(buf.len()); - available[..n].copy_from_slice(&buf[..n]); - self.buffer.push(n); - Ok(n) + unsafe fn get_mut<'a>(&self, r: Range) -> &'a mut [u8] { + let p = self.0.get() as *mut u8; + core::slice::from_raw_parts_mut(p.add(r.start), r.end - r.start) } } +unsafe impl Send for Buffer {} +unsafe impl Sync for Buffer {} + /// A bounded byte-oriented pipe for communicating between asynchronous tasks /// with backpressure. /// @@ -234,6 +233,7 @@ pub struct Pipe where M: RawMutex, { + buf: Buffer, inner: Mutex>>, } @@ -252,7 +252,12 @@ where /// ``` pub const fn new() -> Self { Self { - inner: Mutex::new(RefCell::new(PipeState::new())), + buf: Buffer(UnsafeCell::new([0; N])), + inner: Mutex::new(RefCell::new(PipeState { + buffer: RingBuffer::new(), + read_waker: WakerRegistration::new(), + write_waker: WakerRegistration::new(), + })), } } @@ -261,21 +266,91 @@ where } fn try_read_with_context(&self, cx: Option<&mut Context<'_>>, buf: &mut [u8]) -> Result { - self.lock(|c| c.try_read_with_context(cx, buf)) + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + + if s.buffer.is_full() { + s.write_waker.wake(); + } + + let available = unsafe { self.buf.get(s.buffer.pop_buf()) }; + if available.is_empty() { + if let Some(cx) = cx { + s.read_waker.register(cx.waker()); + } + return Err(TryReadError::Empty); + } + + let n = available.len().min(buf.len()); + buf[..n].copy_from_slice(&available[..n]); + s.buffer.pop(n); + Ok(n) + }) + } + + // safety: While the returned slice is alive, + // no `read` or `consume` methods in the pipe must be called. + unsafe fn try_fill_buf_with_context(&self, cx: Option<&mut Context<'_>>) -> Result<&[u8], TryReadError> { + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + + if s.buffer.is_full() { + s.write_waker.wake(); + } + + let available = unsafe { self.buf.get(s.buffer.pop_buf()) }; + if available.is_empty() { + if let Some(cx) = cx { + s.read_waker.register(cx.waker()); + } + return Err(TryReadError::Empty); + } + + Ok(available) + }) + } + + fn consume(&self, amt: usize) { + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + let available = s.buffer.pop_buf(); + assert!(amt <= available.len()); + s.buffer.pop(amt); + }) } fn try_write_with_context(&self, cx: Option<&mut Context<'_>>, buf: &[u8]) -> Result { - self.lock(|c| c.try_write_with_context(cx, buf)) + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + + if s.buffer.is_empty() { + s.read_waker.wake(); + } + + let available = unsafe { self.buf.get_mut(s.buffer.push_buf()) }; + if available.is_empty() { + if let Some(cx) = cx { + s.write_waker.register(cx.waker()); + } + return Err(TryWriteError::Full); + } + + let n = available.len().min(buf.len()); + available[..n].copy_from_slice(&buf[..n]); + s.buffer.push(n); + Ok(n) + }) } - /// Get a writer for this pipe. - pub fn writer(&self) -> Writer<'_, M, N> { - Writer { pipe: self } - } - - /// Get a reader for this pipe. - pub fn reader(&self) -> Reader<'_, M, N> { - Reader { pipe: self } + /// Split this pipe into a BufRead-capable reader and a writer. + /// + /// The reader and writer borrow the current pipe mutably, so it is not + /// possible to use it directly while they exist. This is needed because + /// implementing `BufRead` requires there is a single reader. + /// + /// The writer is cloneable, the reader is not. + pub fn split(&mut self) -> (Reader<'_, M, N>, Writer<'_, M, N>) { + (Reader { pipe: self }, Writer { pipe: self }) } /// Write some bytes to the pipe. @@ -312,7 +387,7 @@ where /// or return an error if the pipe is empty. See [`write`](Self::write) for a variant /// that waits instead of returning an error. pub fn try_write(&self, buf: &[u8]) -> Result { - self.lock(|c| c.try_write(buf)) + self.try_write_with_context(None, buf) } /// Read some bytes from the pipe. @@ -339,12 +414,17 @@ where /// or return an error if the pipe is empty. See [`read`](Self::read) for a variant /// that waits instead of returning an error. pub fn try_read(&self, buf: &mut [u8]) -> Result { - self.lock(|c| c.try_read(buf)) + self.try_read_with_context(None, buf) } /// Clear the data in the pipe's buffer. pub fn clear(&self) { - self.lock(|c| c.clear()) + self.inner.lock(|rc: &RefCell>| { + let s = &mut *rc.borrow_mut(); + + s.buffer.clear(); + s.write_waker.wake(); + }) } /// Return whether the pipe is full (no free space in the buffer) @@ -433,6 +513,16 @@ mod io_impls { } } + impl embedded_io_async::BufRead for Reader<'_, M, N> { + async fn fill_buf(&mut self) -> Result<&[u8], Self::Error> { + Ok(Reader::fill_buf(self).await) + } + + fn consume(&mut self, amt: usize) { + Reader::consume(self, amt) + } + } + impl embedded_io_async::ErrorType for Writer<'_, M, N> { type Error = Infallible; } @@ -457,43 +547,39 @@ mod tests { use super::*; use crate::blocking_mutex::raw::{CriticalSectionRawMutex, NoopRawMutex}; - fn capacity(c: &PipeState) -> usize { - N - c.buffer.len() - } - #[test] fn writing_once() { - let mut c = PipeState::<3>::new(); + let c = Pipe::::new(); assert!(c.try_write(&[1]).is_ok()); - assert_eq!(capacity(&c), 2); + assert_eq!(c.free_capacity(), 2); } #[test] fn writing_when_full() { - let mut c = PipeState::<3>::new(); + let c = Pipe::::new(); assert_eq!(c.try_write(&[42]), Ok(1)); assert_eq!(c.try_write(&[43]), Ok(1)); assert_eq!(c.try_write(&[44]), Ok(1)); assert_eq!(c.try_write(&[45]), Err(TryWriteError::Full)); - assert_eq!(capacity(&c), 0); + assert_eq!(c.free_capacity(), 0); } #[test] fn receiving_once_with_one_send() { - let mut c = PipeState::<3>::new(); + let c = Pipe::::new(); assert!(c.try_write(&[42]).is_ok()); let mut buf = [0; 16]; assert_eq!(c.try_read(&mut buf), Ok(1)); assert_eq!(buf[0], 42); - assert_eq!(capacity(&c), 3); + assert_eq!(c.free_capacity(), 3); } #[test] fn receiving_when_empty() { - let mut c = PipeState::<3>::new(); + let c = Pipe::::new(); let mut buf = [0; 16]; assert_eq!(c.try_read(&mut buf), Err(TryReadError::Empty)); - assert_eq!(capacity(&c), 3); + assert_eq!(c.free_capacity(), 3); } #[test] @@ -506,13 +592,37 @@ mod tests { } #[test] - fn cloning() { - let c = Pipe::::new(); - let r1 = c.reader(); - let w1 = c.writer(); + fn read_buf() { + let mut c = Pipe::::new(); + let (mut r, w) = c.split(); + assert!(w.try_write(&[42, 43]).is_ok()); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[42, 43]); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[42, 43]); + r.consume(1); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[43]); + r.consume(1); + assert_eq!(r.try_fill_buf(), Err(TryReadError::Empty)); + assert_eq!(w.try_write(&[44, 45, 46]), Ok(1)); + assert_eq!(w.try_write(&[45, 46]), Ok(2)); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[44]); // only one byte due to wraparound. + r.consume(1); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[45, 46]); + assert!(w.try_write(&[47]).is_ok()); + let buf = r.try_fill_buf().unwrap(); + assert_eq!(buf, &[45, 46, 47]); + r.consume(3); + } - let _ = r1.clone(); - let _ = w1.clone(); + #[test] + fn writer_is_cloneable() { + let mut c = Pipe::::new(); + let (_r, w) = c.split(); + let _ = w.clone(); } #[futures_test::test] diff --git a/embassy-sync/src/ring_buffer.rs b/embassy-sync/src/ring_buffer.rs index 521084024..d95ffa7c9 100644 --- a/embassy-sync/src/ring_buffer.rs +++ b/embassy-sync/src/ring_buffer.rs @@ -1,5 +1,6 @@ +use core::ops::Range; + pub struct RingBuffer { - buf: [u8; N], start: usize, end: usize, empty: bool, @@ -8,27 +9,26 @@ pub struct RingBuffer { impl RingBuffer { pub const fn new() -> Self { Self { - buf: [0; N], start: 0, end: 0, empty: true, } } - pub fn push_buf(&mut self) -> &mut [u8] { + pub fn push_buf(&mut self) -> Range { if self.start == self.end && !self.empty { trace!(" ringbuf: push_buf empty"); - return &mut self.buf[..0]; + return 0..0; } let n = if self.start <= self.end { - self.buf.len() - self.end + N - self.end } else { self.start - self.end }; trace!(" ringbuf: push_buf {:?}..{:?}", self.end, self.end + n); - &mut self.buf[self.end..self.end + n] + self.end..self.end + n } pub fn push(&mut self, n: usize) { @@ -41,20 +41,20 @@ impl RingBuffer { self.empty = false; } - pub fn pop_buf(&mut self) -> &mut [u8] { + pub fn pop_buf(&mut self) -> Range { if self.empty { trace!(" ringbuf: pop_buf empty"); - return &mut self.buf[..0]; + return 0..0; } let n = if self.end <= self.start { - self.buf.len() - self.start + N - self.start } else { self.end - self.start }; trace!(" ringbuf: pop_buf {:?}..{:?}", self.start, self.start + n); - &mut self.buf[self.start..self.start + n] + self.start..self.start + n } pub fn pop(&mut self, n: usize) { @@ -93,8 +93,8 @@ impl RingBuffer { } fn wrap(&self, n: usize) -> usize { - assert!(n <= self.buf.len()); - if n == self.buf.len() { + assert!(n <= N); + if n == N { 0 } else { n @@ -110,37 +110,29 @@ mod tests { fn push_pop() { let mut rb: RingBuffer<4> = RingBuffer::new(); let buf = rb.push_buf(); - assert_eq!(4, buf.len()); - buf[0] = 1; - buf[1] = 2; - buf[2] = 3; - buf[3] = 4; + assert_eq!(0..4, buf); rb.push(4); let buf = rb.pop_buf(); - assert_eq!(4, buf.len()); - assert_eq!(1, buf[0]); + assert_eq!(0..4, buf); rb.pop(1); let buf = rb.pop_buf(); - assert_eq!(3, buf.len()); - assert_eq!(2, buf[0]); + assert_eq!(1..4, buf); rb.pop(1); let buf = rb.pop_buf(); - assert_eq!(2, buf.len()); - assert_eq!(3, buf[0]); + assert_eq!(2..4, buf); rb.pop(1); let buf = rb.pop_buf(); - assert_eq!(1, buf.len()); - assert_eq!(4, buf[0]); + assert_eq!(3..4, buf); rb.pop(1); let buf = rb.pop_buf(); - assert_eq!(0, buf.len()); + assert_eq!(0..0, buf); let buf = rb.push_buf(); - assert_eq!(4, buf.len()); + assert_eq!(0..4, buf); } } diff --git a/examples/rp/src/bin/pio_uart.rs b/examples/rp/src/bin/pio_uart.rs index 707c99b78..aa9e52cbd 100644 --- a/examples/rp/src/bin/pio_uart.rs +++ b/examples/rp/src/bin/pio_uart.rs @@ -91,13 +91,11 @@ async fn main(_spawner: Spawner) { let (mut uart_tx, mut uart_rx) = uart.split(); // Pipe setup - let usb_pipe: Pipe = Pipe::new(); - let mut usb_pipe_writer = usb_pipe.writer(); - let mut usb_pipe_reader = usb_pipe.reader(); + let mut usb_pipe: Pipe = Pipe::new(); + let (mut usb_pipe_reader, mut usb_pipe_writer) = usb_pipe.split(); - let uart_pipe: Pipe = Pipe::new(); - let mut uart_pipe_writer = uart_pipe.writer(); - let mut uart_pipe_reader = uart_pipe.reader(); + let mut uart_pipe: Pipe = Pipe::new(); + let (mut uart_pipe_reader, mut uart_pipe_writer) = uart_pipe.split(); let (mut usb_tx, mut usb_rx) = class.split();