From 6defb4fed98432dee948634f3b2001cb4ea7ec5b Mon Sep 17 00:00:00 2001 From: Peter Krull Date: Wed, 28 Feb 2024 20:59:38 +0100 Subject: [PATCH] Changed name to `Watch`, added `DynReceiver`, `get`-method and more reworks. --- embassy-sync/src/lib.rs | 2 +- embassy-sync/src/multi_signal.rs | 529 ------------------------------- embassy-sync/src/watch.rs | 515 ++++++++++++++++++++++++++++++ 3 files changed, 516 insertions(+), 530 deletions(-) delete mode 100644 embassy-sync/src/multi_signal.rs create mode 100644 embassy-sync/src/watch.rs diff --git a/embassy-sync/src/lib.rs b/embassy-sync/src/lib.rs index f02985564..8a69541a5 100644 --- a/embassy-sync/src/lib.rs +++ b/embassy-sync/src/lib.rs @@ -12,11 +12,11 @@ mod ring_buffer; pub mod blocking_mutex; pub mod channel; -pub mod multi_signal; pub mod mutex; pub mod pipe; pub mod priority_channel; pub mod pubsub; pub mod signal; pub mod waitqueue; +pub mod watch; pub mod zerocopy_channel; diff --git a/embassy-sync/src/multi_signal.rs b/embassy-sync/src/multi_signal.rs deleted file mode 100644 index ff9f72f2e..000000000 --- a/embassy-sync/src/multi_signal.rs +++ /dev/null @@ -1,529 +0,0 @@ -//! A synchronization primitive for passing the latest value to **multiple** tasks. -use core::cell::RefCell; -use core::ops::{Deref, DerefMut}; -use core::pin::Pin; -use core::task::{Context, Poll}; - -use futures_util::Future; - -use crate::blocking_mutex::raw::RawMutex; -use crate::blocking_mutex::Mutex; -use crate::waitqueue::MultiWakerRegistration; - -/// A `MultiSignal` is a single-slot signaling primitive, which can awake `N` separate [`Receiver`]s. -/// -/// Similar to a [`Signal`](crate::signal::Signal), except `MultiSignal` allows for multiple tasks to -/// `.await` the latest value, and all receive it. -/// -/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except -/// "sending" to it (calling [`MultiSignal::write`]) will immediately overwrite the previous value instead -/// of waiting for the receivers to pop the previous value. -/// -/// `MultiSignal` is useful when a single task is responsible for updating a value or "state", which multiple other -/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for -/// [`Receiver`]s to "lose" stale values. -/// -/// Anyone with a reference to the MultiSignal can update or peek the value. MultiSignals are generally declared -/// as `static`s and then borrowed as required to either [`MultiSignal::peek`] the value or obtain a [`Receiver`] -/// with [`MultiSignal::receiver`] which has async methods. -/// ``` -/// -/// use futures_executor::block_on; -/// use embassy_sync::multi_signal::MultiSignal; -/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; -/// -/// let f = async { -/// -/// static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); -/// -/// // Obtain Receivers -/// let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); -/// let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); -/// assert!(SOME_SIGNAL.receiver().is_err()); -/// -/// SOME_SIGNAL.write(10); -/// -/// // Receive the new value -/// assert_eq!(rcv0.changed().await, 10); -/// assert_eq!(rcv1.try_changed(), Some(10)); -/// -/// // No update -/// assert_eq!(rcv0.try_changed(), None); -/// assert_eq!(rcv1.try_changed(), None); -/// -/// SOME_SIGNAL.write(20); -/// -/// // Receive new value with predicate -/// assert_eq!(rcv0.changed_and(|x|x>&10).await, 20); -/// assert_eq!(rcv1.try_changed_and(|x|x>&30), None); -/// -/// // Anyone can peek the current value -/// assert_eq!(rcv0.peek(), 20); -/// assert_eq!(rcv1.peek(), 20); -/// assert_eq!(SOME_SIGNAL.peek(), 20); -/// assert_eq!(SOME_SIGNAL.peek_and(|x|x>&30), None); -/// }; -/// block_on(f); -/// ``` -pub struct MultiSignal { - mutex: Mutex>>, -} - -struct MultiSignalState { - data: T, - current_id: u64, - wakers: MultiWakerRegistration, - receiver_count: usize, -} - -#[derive(Debug)] -/// An error that can occur when a `MultiSignal` returns a `Result`. -pub enum Error { - /// The maximum number of [`Receiver`](crate::multi_signal::Receiver) has been reached. - MaximumReceiversReached, -} - -impl<'a, M: RawMutex, T: Clone, const N: usize> MultiSignal { - /// Create a new `MultiSignal` initialized with the given value. - pub const fn new(init: T) -> Self { - Self { - mutex: Mutex::new(RefCell::new(MultiSignalState { - data: init, - current_id: 1, - wakers: MultiWakerRegistration::new(), - receiver_count: 0, - })), - } - } - - /// Get a [`Receiver`] for the `MultiSignal`. - pub fn receiver<'s>(&'a self) -> Result, Error> { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - if s.receiver_count < N { - s.receiver_count += 1; - Ok(Receiver(Rcv::new(self))) - } else { - Err(Error::MaximumReceiversReached) - } - }) - } - - /// Update the value of the `MultiSignal`. - pub fn write(&self, data: T) { - self.mutex.lock(|state| { - let mut s = state.borrow_mut(); - s.data = data; - s.current_id += 1; - s.wakers.wake(); - }) - } - - /// Peek the current value of the `MultiSignal`. - pub fn peek(&self) -> T { - self.mutex.lock(|state| state.borrow().data.clone()) - } - - /// Peek the current value of the `MultiSignal` and check if it satisfies the predicate `f`. - pub fn peek_and(&self, mut f: impl FnMut(&T) -> bool) -> Option { - self.mutex.lock(|state| { - let s = state.borrow(); - if f(&s.data) { - Some(s.data.clone()) - } else { - None - } - }) - } - - /// Get the ID of the current value of the `MultiSignal`. - /// This method is mostly for testing purposes. - #[allow(dead_code)] - fn get_id(&self) -> u64 { - self.mutex.lock(|state| state.borrow().current_id) - } -} - -/// A receiver is able to `.await` a changed `MultiSignal` value. -pub struct Rcv<'a, M: RawMutex, T: Clone, const N: usize> { - multi_sig: &'a MultiSignal, - at_id: u64, -} - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Rcv<'a, M, T, N> { - /// Create a new `Receiver` with a reference the given `MultiSignal`. - fn new(multi_sig: &'a MultiSignal) -> Self { - Self { multi_sig, at_id: 0 } - } - - /// Wait for a change to the value of the corresponding `MultiSignal`. - pub async fn changed(&mut self) -> T { - ReceiverWaitFuture { subscriber: self }.await - } - - /// Wait for a change to the value of the corresponding `MultiSignal` which matches the predicate `f`. - pub async fn changed_and(&mut self, f: F) -> T - where - F: FnMut(&T) -> bool, - { - ReceiverPredFuture { - subscriber: self, - predicate: f, - } - .await - } - - /// Try to get a changed value of the corresponding `MultiSignal`. - pub fn try_changed(&mut self) -> Option { - self.multi_sig.mutex.lock(|state| { - let s = state.borrow(); - match s.current_id > self.at_id { - true => { - self.at_id = s.current_id; - Some(s.data.clone()) - } - false => None, - } - }) - } - - /// Try to get a changed value of the corresponding `MultiSignal` which matches the predicate `f`. - pub fn try_changed_and(&mut self, mut f: F) -> Option - where - F: FnMut(&T) -> bool, - { - self.multi_sig.mutex.lock(|state| { - let s = state.borrow(); - match s.current_id > self.at_id && f(&s.data) { - true => { - self.at_id = s.current_id; - Some(s.data.clone()) - } - false => None, - } - }) - } - - /// Peek the current value of the corresponding `MultiSignal`. - pub fn peek(&self) -> T { - self.multi_sig.peek() - } - - /// Peek the current value of the corresponding `MultiSignal` and check if it satisfies the predicate `f`. - pub fn peek_and(&self, f: F) -> Option - where - F: FnMut(&T) -> bool, - { - self.multi_sig.peek_and(f) - } - - /// Check if the value of the corresponding `MultiSignal` has changed. - pub fn has_changed(&mut self) -> bool { - self.multi_sig - .mutex - .lock(|state| state.borrow().current_id > self.at_id) - } -} - -/// A `Receiver` is able to `.await` a change to the corresponding [`MultiSignal`] value. -pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, M, T, N>); - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { - type Target = Rcv<'a, M, T, N>; - - fn deref(&self) -> &Self::Target { - &self.0 - } -} - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } -} - -/// Future for the `Receiver` wait action -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ReceiverWaitFuture<'s, 'a, M: RawMutex, T: Clone, const N: usize> { - subscriber: &'s mut Rcv<'a, M, T, N>, -} - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Unpin for ReceiverWaitFuture<'s, 'a, M, T, N> {} -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> Future for ReceiverWaitFuture<'s, 'a, M, T, N> { - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.get_with_context(Some(cx)) - } -} - -impl<'s, 'a, M: RawMutex, T: Clone, const N: usize> ReceiverWaitFuture<'s, 'a, M, T, N> { - /// Poll the `MultiSignal` with an optional context. - fn get_with_context(&mut self, cx: Option<&mut Context>) -> Poll { - self.subscriber.multi_sig.mutex.lock(|state| { - let mut s = state.borrow_mut(); - match s.current_id > self.subscriber.at_id { - true => { - self.subscriber.at_id = s.current_id; - Poll::Ready(s.data.clone()) - } - _ => { - if let Some(cx) = cx { - s.wakers.register(cx.waker()); - } - Poll::Pending - } - } - }) - } -} - -/// Future for the `Receiver` wait action, with the ability to filter the value with a predicate. -#[must_use = "futures do nothing unless you `.await` or poll them"] -pub struct ReceiverPredFuture<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&'a T) -> bool, const N: usize> { - subscriber: &'s mut Rcv<'a, M, T, N>, - predicate: F, -} - -impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Unpin - for ReceiverPredFuture<'s, 'a, M, T, F, N> -{ -} -impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> Future - for ReceiverPredFuture<'s, 'a, M, T, F, N> -{ - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - self.get_with_context_pred(Some(cx)) - } -} - -impl<'s, 'a, M: RawMutex, T: Clone, F: FnMut(&T) -> bool, const N: usize> ReceiverPredFuture<'s, 'a, M, T, F, N> { - /// Poll the `MultiSignal` with an optional context. - fn get_with_context_pred(&mut self, cx: Option<&mut Context>) -> Poll { - self.subscriber.multi_sig.mutex.lock(|state| { - let mut s = state.borrow_mut(); - match s.current_id > self.subscriber.at_id { - true if (self.predicate)(&s.data) => { - self.subscriber.at_id = s.current_id; - Poll::Ready(s.data.clone()) - } - _ => { - if let Some(cx) = cx { - s.wakers.register(cx.waker()); - } - Poll::Pending - } - } - }) - } -} - -#[cfg(test)] -mod tests { - use futures_executor::block_on; - - use super::*; - use crate::blocking_mutex::raw::CriticalSectionRawMutex; - - #[test] - fn multiple_writes() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(10); - - // Receive the new value - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); - - // No update - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.changed().await, 20); - assert_eq!(rcv1.changed().await, 20); - }; - block_on(f); - } - - #[test] - fn max_receivers() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let _ = SOME_SIGNAL.receiver().unwrap(); - let _ = SOME_SIGNAL.receiver().unwrap(); - assert!(SOME_SIGNAL.receiver().is_err()); - }; - block_on(f); - } - - // Really weird edge case, but it's possible to have a receiver that never gets a value. - #[test] - fn receive_initial() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - assert_eq!(rcv0.try_changed(), Some(0)); - assert_eq!(rcv1.try_changed(), Some(0)); - - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); - }; - block_on(f); - } - - #[test] - fn count_ids() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(10); - - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); - - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); - - SOME_SIGNAL.write(20); - SOME_SIGNAL.write(20); - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.changed().await, 20); - assert_eq!(rcv1.changed().await, 20); - - assert_eq!(rcv0.try_changed(), None); - assert_eq!(rcv1.try_changed(), None); - - assert_eq!(SOME_SIGNAL.get_id(), 5); - }; - block_on(f); - } - - #[test] - fn peek_still_await() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(10); - - assert_eq!(rcv0.peek(), 10); - assert_eq!(rcv1.peek(), 10); - - assert_eq!(rcv0.changed().await, 10); - assert_eq!(rcv1.changed().await, 10); - }; - block_on(f); - } - - #[test] - fn predicate() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.changed_and(|x| x > &10).await, 20); - assert_eq!(rcv1.try_changed_and(|x| x > &30), None); - }; - block_on(f); - } - - #[test] - fn mutable_predicate() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(10); - - let mut largest = 0; - let mut predicate = |x: &u8| { - if *x > largest { - largest = *x; - } - true - }; - - assert_eq!(rcv.changed_and(&mut predicate).await, 10); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv.changed_and(&mut predicate).await, 20); - - SOME_SIGNAL.write(5); - - assert_eq!(rcv.changed_and(&mut predicate).await, 5); - - assert_eq!(largest, 20) - }; - block_on(f); - } - - #[test] - fn peek_and() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let mut rcv0 = SOME_SIGNAL.receiver().unwrap(); - let mut rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.peek_and(|x| x > &10), Some(20)); - assert_eq!(rcv1.peek_and(|x| x > &30), None); - - assert_eq!(rcv0.changed().await, 20); - assert_eq!(rcv1.changed().await, 20); - }; - block_on(f); - } - - #[test] - fn peek_with_static() { - let f = async { - static SOME_SIGNAL: MultiSignal = MultiSignal::new(0); - - // Obtain Receivers - let rcv0 = SOME_SIGNAL.receiver().unwrap(); - let rcv1 = SOME_SIGNAL.receiver().unwrap(); - - SOME_SIGNAL.write(20); - - assert_eq!(rcv0.peek(), 20); - assert_eq!(rcv1.peek(), 20); - assert_eq!(SOME_SIGNAL.peek(), 20); - assert_eq!(SOME_SIGNAL.peek_and(|x| x > &30), None); - }; - block_on(f); - } -} diff --git a/embassy-sync/src/watch.rs b/embassy-sync/src/watch.rs new file mode 100644 index 000000000..7e5e92741 --- /dev/null +++ b/embassy-sync/src/watch.rs @@ -0,0 +1,515 @@ +//! A synchronization primitive for passing the latest value to **multiple** tasks. + +use core::cell::RefCell; +use core::future::poll_fn; +use core::marker::PhantomData; +use core::ops::{Deref, DerefMut}; +use core::task::{Context, Poll}; + +use crate::blocking_mutex::raw::RawMutex; +use crate::blocking_mutex::Mutex; +use crate::waitqueue::MultiWakerRegistration; + +/// A `Watch` is a single-slot signaling primitive, which can awake `N` up to separate [`Receiver`]s. +/// +/// Similar to a [`Signal`](crate::signal::Signal), except `Watch` allows for multiple tasks to +/// `.await` the latest value, and all receive it. +/// +/// This is similar to a [`PubSubChannel`](crate::pubsub::PubSubChannel) with a buffer size of 1, except +/// "sending" to it (calling [`Watch::write`]) will immediately overwrite the previous value instead +/// of waiting for the receivers to pop the previous value. +/// +/// `Watch` is useful when a single task is responsible for updating a value or "state", which multiple other +/// tasks are interested in getting notified about changes to the latest value of. It is therefore fine for +/// [`Receiver`]s to "lose" stale values. +/// +/// Anyone with a reference to the Watch can update or peek the value. Watches are generally declared +/// as `static`s and then borrowed as required to either [`Watch::peek`] the value or obtain a [`Receiver`] +/// with [`Watch::receiver`] which has async methods. +/// ``` +/// +/// use futures_executor::block_on; +/// use embassy_sync::watch::Watch; +/// use embassy_sync::blocking_mutex::raw::CriticalSectionRawMutex; +/// +/// let f = async { +/// +/// static WATCH: Watch = Watch::new(); +/// +/// // Obtain Receivers +/// let mut rcv0 = WATCH.receiver().unwrap(); +/// let mut rcv1 = WATCH.receiver().unwrap(); +/// assert!(WATCH.receiver().is_err()); +/// +/// assert_eq!(rcv1.try_changed(), None); +/// +/// WATCH.write(10); +/// assert_eq!(WATCH.try_peek(), Some(10)); +/// +/// +/// // Receive the new value +/// assert_eq!(rcv0.changed().await, 10); +/// assert_eq!(rcv1.try_changed(), Some(10)); +/// +/// // No update +/// assert_eq!(rcv0.try_changed(), None); +/// assert_eq!(rcv1.try_changed(), None); +/// +/// WATCH.write(20); +/// +/// // Defference `between` peek `get`. +/// assert_eq!(rcv0.peek().await, 20); +/// assert_eq!(rcv1.get().await, 20); +/// +/// assert_eq!(rcv0.try_changed(), Some(20)); +/// assert_eq!(rcv1.try_changed(), None); +/// +/// }; +/// block_on(f); +/// ``` +pub struct Watch { + mutex: Mutex>>, +} + +struct WatchState { + data: Option, + current_id: u64, + wakers: MultiWakerRegistration, + receiver_count: usize, +} + +/// A trait representing the 'inner' behavior of the `Watch`. +pub trait WatchBehavior { + /// Poll the `Watch` for the current value, **without** making it as seen. + fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll; + + /// Tries to peek the value of the `Watch`, **without** marking it as seen. + fn inner_try_peek(&self) -> Option; + + /// Poll the `Watch` for the current value, making it as seen. + fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; + + /// Tries to get the value of the `Watch`, marking it as seen. + fn inner_try_get(&self, id: &mut u64) -> Option; + + /// Poll the `Watch` for a changed value, marking it as seen. + fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll; + + /// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen. + fn inner_try_changed(&self, id: &mut u64) -> Option; + + /// Checks if the `Watch` is been initialized with a value. + fn inner_contains_value(&self) -> bool; +} + +impl WatchBehavior for Watch { + fn inner_poll_peek(&self, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match &s.data { + Some(data) => Poll::Ready(data.clone()), + None => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn inner_try_peek(&self) -> Option { + self.mutex.lock(|state| state.borrow().data.clone()) + } + + fn inner_poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match &s.data { + Some(data) => { + *id = s.current_id; + Poll::Ready(data.clone()) + } + None => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn inner_try_get(&self, id: &mut u64) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + *id = s.current_id; + state.borrow().data.clone() + }) + } + + fn inner_poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + match (&s.data, s.current_id > *id) { + (Some(data), true) => { + *id = s.current_id; + Poll::Ready(data.clone()) + } + _ => { + s.wakers.register(cx.waker()); + Poll::Pending + } + } + }) + } + + fn inner_try_changed(&self, id: &mut u64) -> Option { + self.mutex.lock(|state| { + let s = state.borrow(); + match s.current_id > *id { + true => { + *id = s.current_id; + state.borrow().data.clone() + } + false => None, + } + }) + } + + fn inner_contains_value(&self) -> bool { + self.mutex.lock(|state| state.borrow().data.is_some()) + } +} + +#[derive(Debug)] +/// An error that can occur when a `Watch` returns a `Result::Err(_)`. +pub enum Error { + /// The maximum number of [`Receiver`](crate::watch::Receiver)/[`DynReceiver`](crate::watch::DynReceiver) has been reached. + MaximumReceiversReached, +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> Watch { + /// Create a new `Watch` channel. + pub const fn new() -> Self { + Self { + mutex: Mutex::new(RefCell::new(WatchState { + data: None, + current_id: 0, + wakers: MultiWakerRegistration::new(), + receiver_count: 0, + })), + } + } + + /// Write a new value to the `Watch`. + pub fn write(&self, val: T) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = Some(val); + s.current_id += 1; + s.wakers.wake(); + }) + } + + /// Create a new [`Receiver`] for the `Watch`. + pub fn receiver(&self) -> Result, Error> { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + if s.receiver_count < N { + s.receiver_count += 1; + Ok(Receiver(Rcv::new(self))) + } else { + Err(Error::MaximumReceiversReached) + } + }) + } + + /// Create a new [`DynReceiver`] for the `Watch`. + pub fn dyn_receiver(&self) -> Result, Error> { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + if s.receiver_count < N { + s.receiver_count += 1; + Ok(DynReceiver(Rcv::new(self))) + } else { + Err(Error::MaximumReceiversReached) + } + }) + } + + /// Tries to retrieve the value of the `Watch`. + pub fn try_peek(&self) -> Option { + self.inner_try_peek() + } + + /// Returns true if the `Watch` contains a value. + pub fn contains_value(&self) -> bool { + self.inner_contains_value() + } + + /// Clears the value of the `Watch`. This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending. + pub fn clear(&self) { + self.mutex.lock(|state| { + let mut s = state.borrow_mut(); + s.data = None; + }) + } +} + +/// A receiver can `.await` a change in the `Watch` value. +pub struct Rcv<'a, T: Clone, W: WatchBehavior + ?Sized> { + watch: &'a W, + at_id: u64, + _phantom: PhantomData, +} + +impl<'a, T: Clone, W: WatchBehavior + ?Sized> Rcv<'a, T, W> { + /// Creates a new `Receiver` with a reference to the `Watch`. + fn new(watch: &'a W) -> Self { + Self { + watch, + at_id: 0, + _phantom: PhantomData, + } + } + + /// Returns the current value of the `Watch` if it is initialized, **without** marking it as seen. + pub async fn peek(&self) -> T { + poll_fn(|cx| self.watch.inner_poll_peek(cx)).await + } + + /// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen. + pub fn try_peek(&self) -> Option { + self.watch.inner_try_peek() + } + + /// Returns the current value of the `Watch` if it is initialized, marking it as seen. + pub async fn get(&mut self) -> T { + poll_fn(|cx| self.watch.inner_poll_get(&mut self.at_id, cx)).await + } + + /// Tries to get the current value of the `Watch` without waiting, marking it as seen. + pub fn try_get(&mut self) -> Option { + self.watch.inner_try_get(&mut self.at_id) + } + + /// Waits for the `Watch` to change and returns the new value, marking it as seen. + pub async fn changed(&mut self) -> T { + poll_fn(|cx| self.watch.inner_poll_changed(&mut self.at_id, cx)).await + } + + /// Tries to get the new value of the watch without waiting, marking it as seen. + pub fn try_changed(&mut self) -> Option { + self.watch.inner_try_changed(&mut self.at_id) + } + + /// Checks if the `Watch` contains a value. If this returns true, + /// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately. + pub fn contains_value(&self) -> bool { + self.watch.inner_contains_value() + } +} + +/// A receiver of a `Watch` channel. +pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch>); + +/// A receiver which holds a **reference** to a `Watch` channel. +/// +/// This is an alternative to [`Receiver`] with a simpler type definition, at the expense of +/// some runtime performance due to dynamic dispatch. +pub struct DynReceiver<'a, T: Clone>(Rcv<'a, T, dyn WatchBehavior + 'a>); + +impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for Receiver<'a, M, T, N> { + type Target = Rcv<'a, T, Watch>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for Receiver<'a, M, T, N> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +impl<'a, T: Clone> Deref for DynReceiver<'a, T> { + type Target = Rcv<'a, T, dyn WatchBehavior + 'a>; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.0 + } +} + +#[cfg(test)] +mod tests { + use futures_executor::block_on; + + use super::*; + use crate::blocking_mutex::raw::CriticalSectionRawMutex; + + #[test] + fn multiple_writes() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.dyn_receiver().unwrap(); + + WATCH.write(10); + + // Receive the new value + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + + // No update + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + WATCH.write(20); + + assert_eq!(rcv0.changed().await, 20); + assert_eq!(rcv1.changed().await, 20); + }; + block_on(f); + } + + #[test] + fn max_receivers() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let _ = WATCH.receiver().unwrap(); + let _ = WATCH.receiver().unwrap(); + assert!(WATCH.receiver().is_err()); + }; + block_on(f); + } + + #[test] + fn receive_initial() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.receiver().unwrap(); + + assert_eq!(rcv0.contains_value(), false); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + WATCH.write(0); + + assert_eq!(rcv0.contains_value(), true); + + assert_eq!(rcv0.try_changed(), Some(0)); + assert_eq!(rcv1.try_changed(), Some(0)); + }; + block_on(f); + } + + #[test] + fn peek_get_changed() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + + WATCH.write(10); + + // Ensure peek does not mark as seen + assert_eq!(rcv0.peek().await, 10); + assert_eq!(rcv0.try_changed(), Some(10)); + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv0.peek().await, 10); + + WATCH.write(20); + + // Ensure get does mark as seen + assert_eq!(rcv0.get().await, 20); + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv0.try_get(), Some(20)); + }; + block_on(f); + } + + #[test] + fn count_ids() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.receiver().unwrap(); + + let get_id = || WATCH.mutex.lock(|state| state.borrow().current_id); + + WATCH.write(10); + + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + WATCH.write(20); + WATCH.write(20); + WATCH.write(20); + + assert_eq!(rcv0.changed().await, 20); + assert_eq!(rcv1.changed().await, 20); + + assert_eq!(rcv0.try_changed(), None); + assert_eq!(rcv1.try_changed(), None); + + assert_eq!(get_id(), 4); + }; + block_on(f); + } + + #[test] + fn peek_still_await() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let mut rcv0 = WATCH.receiver().unwrap(); + let mut rcv1 = WATCH.receiver().unwrap(); + + WATCH.write(10); + + assert_eq!(rcv0.peek().await, 10); + assert_eq!(rcv1.try_peek(), Some(10)); + + assert_eq!(rcv0.changed().await, 10); + assert_eq!(rcv1.changed().await, 10); + }; + block_on(f); + } + + #[test] + fn peek_with_static() { + let f = async { + static WATCH: Watch = Watch::new(); + + // Obtain Receivers + let rcv0 = WATCH.receiver().unwrap(); + let rcv1 = WATCH.receiver().unwrap(); + + WATCH.write(20); + + assert_eq!(rcv0.peek().await, 20); + assert_eq!(rcv1.peek().await, 20); + assert_eq!(WATCH.try_peek(), Some(20)); + }; + block_on(f); + } +}