diff --git a/embassy-sync/src/zero_copy_channel.rs b/embassy-sync/src/zero_copy_channel.rs index cbb8cb526..f704cbd5d 100644 --- a/embassy-sync/src/zero_copy_channel.rs +++ b/embassy-sync/src/zero_copy_channel.rs @@ -1,3 +1,22 @@ +//! A zero-copy queue for sending values between asynchronous tasks. +//! +//! It can be used concurrently by multiple producers (senders) and multiple +//! consumers (receivers), i.e. it is an "MPMC channel". +//! +//! Receivers are competing for messages. So a message that is received by +//! one receiver is not received by any other. +//! +//! This queue takes a Mutex type so that various +//! targets can be attained. For example, a ThreadModeMutex can be used +//! for single-core Cortex-M targets where messages are only passed +//! between tasks running in thread mode. Similarly, a CriticalSectionMutex +//! can also be used for single-core targets where messages are to be +//! passed from exception mode e.g. out of an interrupt handler. +//! +//! This module provides a bounded channel that has a limit on the number of +//! messages that it can store, and if this limit is reached, trying to send +//! another message will result in an error being returned. + use core::cell::RefCell; use core::future::poll_fn; use core::marker::PhantomData; @@ -7,6 +26,17 @@ use crate::blocking_mutex::raw::RawMutex; use crate::blocking_mutex::Mutex; use crate::waitqueue::WakerRegistration; +/// A bounded zero-copy channel for communicating between asynchronous tasks +/// with backpressure. +/// +/// The channel will buffer up to the provided number of messages. Once the +/// buffer is full, attempts to `send` new messages will wait until a message is +/// received from the channel. +/// +/// All data sent will become available in the same order as it was sent. +/// +/// The channel requires a buffer of recyclable elements. Writing to the channel is done through +/// an `&mut T`. pub struct Channel<'a, M: RawMutex, T> { buf: *mut T, phantom: PhantomData<&'a mut T>, @@ -14,6 +44,10 @@ pub struct Channel<'a, M: RawMutex, T> { } impl<'a, M: RawMutex, T> Channel<'a, M, T> { + /// Initialize a new [`Channel`]. + /// + /// The provided buffer will be used and reused by the channel's logic, and thus dictates the + /// channel's capacity. pub fn new(buf: &'a mut [T]) -> Self { let len = buf.len(); assert!(len != 0); @@ -32,20 +66,27 @@ impl<'a, M: RawMutex, T> Channel<'a, M, T> { } } + /// Creates a [`Sender`] and [`Receiver`] from an existing channel. + /// + /// Further Senders and Receivers can be created through [`Sender::borrow`] and + /// [`Receiver::borrow`] respectively. pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) { (Sender { channel: self }, Receiver { channel: self }) } } +/// Send-only access to a [`Channel`]. pub struct Sender<'a, M: RawMutex, T> { channel: &'a Channel<'a, M, T>, } impl<'a, M: RawMutex, T> Sender<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. pub fn borrow(&mut self) -> Sender<'_, M, T> { Sender { channel: self.channel } } + /// Attempts to send a value over the channel. pub fn try_send(&mut self) -> Option<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -56,6 +97,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { }) } + /// Attempts to send a value over the channel. pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -69,6 +111,7 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { }) } + /// Asynchronously send a value over the channel. pub async fn send(&mut self) -> &mut T { let i = poll_fn(|cx| { self.channel.state.lock(|s| { @@ -86,19 +129,24 @@ impl<'a, M: RawMutex, T> Sender<'a, M, T> { unsafe { &mut *self.channel.buf.add(i) } } + /// Notify the channel that the sending of the value has been finalized. pub fn send_done(&mut self) { self.channel.state.lock(|s| s.borrow_mut().push_done()) } } + +/// Receive-only access to a [`Channel`]. pub struct Receiver<'a, M: RawMutex, T> { channel: &'a Channel<'a, M, T>, } impl<'a, M: RawMutex, T> Receiver<'a, M, T> { + /// Creates one further [`Sender`] over the same channel. pub fn borrow(&mut self) -> Receiver<'_, M, T> { Receiver { channel: self.channel } } + /// Attempts to receive a value over the channel. pub fn try_receive(&mut self) -> Option<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -109,6 +157,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } + /// Attempts to asynchronously receive a value over the channel. pub fn poll_receive(&mut self, cx: &mut Context) -> Poll<&mut T> { self.channel.state.lock(|s| { let s = &mut *s.borrow_mut(); @@ -122,6 +171,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { }) } + /// Asynchronously receive a value over the channel. pub async fn receive(&mut self) -> &mut T { let i = poll_fn(|cx| { self.channel.state.lock(|s| { @@ -139,6 +189,7 @@ impl<'a, M: RawMutex, T> Receiver<'a, M, T> { unsafe { &mut *self.channel.buf.add(i) } } + /// Notify the channel that the receiving of the value has been finalized. pub fn receive_done(&mut self) { self.channel.state.lock(|s| s.borrow_mut().pop_done()) }