Discontinue peek, add AnonReceiver

This commit is contained in:
Peter Krull 2024-09-23 20:09:35 +02:00
parent a2c473306f
commit a669611d7c

View File

@ -20,7 +20,9 @@ use crate::waitqueue::MultiWakerRegistration;
/// always provided with the latest value.
///
/// Typically, `Watch` instances are declared as `static`, and a [`Sender`] and [`Receiver`]
/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained and passed to the relevant parts of the program.
/// (or [`DynSender`] and/or [`DynReceiver`]) are obtained where relevant. An [`AnonReceiver`]
/// and [`DynAnonReceiver`] are also available, which do not increase the receiver count for the
/// channel, and unwrapping is therefore not required, but it is not possible to `.await` the channel.
/// ```
///
/// use futures_executor::block_on;
@ -41,25 +43,25 @@ use crate::waitqueue::MultiWakerRegistration;
/// assert_eq!(rcv1.try_changed(), None);
///
/// snd.send(10);
///
///
/// // Receive the new value (async or try)
/// assert_eq!(rcv0.changed().await, 10);
/// assert_eq!(rcv1.try_changed(), Some(10));
///
///
/// // No update
/// assert_eq!(rcv0.try_changed(), None);
/// assert_eq!(rcv1.try_changed(), None);
///
/// snd.send(20);
///
/// // Peek does not mark the value as seen
/// assert_eq!(rcv0.peek().await, 20);
/// assert_eq!(rcv0.try_changed(), Some(20));
///
/// // Get marks the value as seen
/// // Using `get` marks the value as seen
/// assert_eq!(rcv1.get().await, 20);
/// assert_eq!(rcv1.try_changed(), None);
///
/// // But `get` also returns when unchanged
/// assert_eq!(rcv1.get().await, 20);
/// assert_eq!(rcv1.get().await, 20);
///
/// };
/// block_on(f);
/// ```
@ -82,24 +84,11 @@ pub trait WatchBehavior<T: Clone> {
/// Clears the value of the `Watch`.
fn clear(&self);
/// Poll the `Watch` for the current value, **without** making it as seen.
fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T>;
/// Tries to peek the value of the `Watch`, **without** marking it as seen.
fn try_peek(&self) -> Option<T>;
/// Poll the `Watch` for the value if it matches the predicate function
/// `f`, **without** making it as seen.
fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T>;
/// Tries to peek the value of the `Watch` if it matches the predicate function `f`, **without** marking it as seen.
fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
/// Poll the `Watch` for the current value, making it as seen.
fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
/// Tries to get the value of the `Watch`, marking it as seen.
fn try_get(&self, id: &mut u64) -> Option<T>;
/// Tries to get the value of the `Watch`, marking it as seen, if an id is given.
fn try_get(&self, id: Option<&mut u64>) -> Option<T>;
/// Poll the `Watch` for the value if it matches the predicate function
/// `f`, making it as seen.
@ -107,9 +96,9 @@ pub trait WatchBehavior<T: Clone> {
/// Tries to get the value of the `Watch` if it matches the predicate function
/// `f`, marking it as seen.
fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T>;
/// Poll the `Watch` for a changed value, marking it as seen.
/// Poll the `Watch` for a changed value, marking it as seen, if an id is given.
fn poll_changed(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T>;
/// Tries to retrieve the value of the `Watch` if it has changed, marking it as seen.
@ -128,7 +117,11 @@ pub trait WatchBehavior<T: Clone> {
/// Modify the value of the `Watch` using a closure. Returns `false` if the
/// `Watch` does not already contain a value.
fn modify(&self, f: &mut dyn Fn(&mut Option<T>));
fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>));
/// Modify the value of the `Watch` using a closure. Returns `false` if the
/// `Watch` does not already contain a value.
fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool);
/// Used when a receiver is dropped to decrement the receiver count.
///
@ -153,46 +146,6 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
})
}
fn poll_peek(&self, cx: &mut Context<'_>) -> Poll<T> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
match &s.data {
Some(data) => Poll::Ready(data.clone()),
None => {
s.wakers.register(cx.waker());
Poll::Pending
}
}
})
}
fn try_peek(&self) -> Option<T> {
self.mutex.lock(|state| state.borrow().data.clone())
}
fn poll_peek_and(&self, f: &mut dyn Fn(&T) -> bool, cx: &mut Context<'_>) -> Poll<T> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
match s.data {
Some(ref data) if f(data) => Poll::Ready(data.clone()),
_ => {
s.wakers.register(cx.waker());
Poll::Pending
}
}
})
}
fn try_peek_and(&self, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
match s.data {
Some(ref data) if f(data) => Some(data.clone()),
_ => None,
}
})
}
fn poll_get(&self, id: &mut u64, cx: &mut Context<'_>) -> Poll<T> {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
@ -209,11 +162,13 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
})
}
fn try_get(&self, id: &mut u64) -> Option<T> {
fn try_get(&self, id: Option<&mut u64>) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
*id = s.current_id;
state.borrow().data.clone()
if let Some(id) = id {
*id = s.current_id;
}
s.data.clone()
})
}
@ -233,12 +188,14 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
})
}
fn try_get_and(&self, id: &mut u64, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
fn try_get_and(&self, id: Option<&mut u64>, f: &mut dyn Fn(&T) -> bool) -> Option<T> {
self.mutex.lock(|state| {
let s = state.borrow();
match s.data {
Some(ref data) if f(data) => {
*id = s.current_id;
if let Some(id) = id {
*id = s.current_id;
}
Some(data.clone())
}
_ => None,
@ -315,7 +272,7 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
})
}
fn modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
fn send_modify(&self, f: &mut dyn Fn(&mut Option<T>)) {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
f(&mut s.data);
@ -323,6 +280,16 @@ impl<M: RawMutex, T: Clone, const N: usize> WatchBehavior<T> for Watch<M, T, N>
s.wakers.wake();
})
}
fn send_if_modified(&self, f: &mut dyn Fn(&mut Option<T>) -> bool) {
self.mutex.lock(|state| {
let mut s = state.borrow_mut();
if f(&mut s.data) {
s.current_id += 1;
s.wakers.wake();
}
})
}
}
impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
@ -375,6 +342,60 @@ impl<M: RawMutex, T: Clone, const N: usize> Watch<M, T, N> {
}
})
}
/// Try to create a new [`AnonReceiver`] for the `Watch`.
pub fn anon_receiver(&self) -> AnonReceiver<'_, M, T, N> {
AnonReceiver(AnonRcv::new(self, 0))
}
/// Try to create a new [`DynAnonReceiver`] for the `Watch`.
pub fn dyn_anon_receiver(&self) -> DynAnonReceiver<'_, T> {
DynAnonReceiver(AnonRcv::new(self, 0))
}
/// Returns the message ID of the latest message sent to the `Watch`.
///
/// This counter is monotonic, and is incremented every time a new message is sent.
pub fn get_msg_id(&self) -> u64 {
self.mutex.lock(|state| state.borrow().current_id)
}
/// Waits for the `Watch` to be initialized with a value using a busy-wait mechanism.
///
/// This is useful for initialization code where receivers may only be interested in
/// awaiting the value once in the lifetime of the program. It is therefore a temporaryily
/// CPU-inefficient operation, while being more memory efficient than using a `Receiver`.
///
/// **Note** Be careful about using this within an InterruptExecutor, as it will starve
/// tasks in lower-priority executors.
pub async fn spin_get(&self) -> T {
poll_fn(|cx| {
self.mutex.lock(|state| {
let s = state.borrow();
match &s.data {
Some(data) => Poll::Ready(data.clone()),
None => {
cx.waker().wake_by_ref();
Poll::Pending
}
}
})
})
.await
}
/// Tries to get the value of the `Watch`.
pub fn try_get(&self) -> Option<T> {
WatchBehavior::try_get(self, None)
}
/// Tries to get the value of the `Watch` if it matches the predicate function `f`.
pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
WatchBehavior::try_get_and(self, None, &mut f)
}
}
/// A receiver can `.await` a change in the `Watch` value.
@ -407,23 +428,23 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
}
/// Clears the value of the `Watch`.
/// This will cause calls to [`Rcv::get`] and [`Rcv::peek`] to be pending.
/// This will cause calls to [`Rcv::get`] to be pending.
pub fn clear(&self) {
self.watch.clear()
}
/// Tries to retrieve the value of the `Watch`.
pub fn try_peek(&self) -> Option<T> {
self.watch.try_peek()
pub fn try_get(&self) -> Option<T> {
self.watch.try_get(None)
}
/// Tries to peek the current value of the `Watch` if it matches the predicate
/// function `f`.
pub fn try_peek_and<F>(&self, mut f: F) -> Option<T>
pub fn try_get_and<F>(&self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_peek_and(&mut f)
self.watch.try_get_and(None, &mut f)
}
/// Returns true if the `Watch` contains a value.
@ -432,11 +453,20 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Snd<'a, T, W> {
}
/// Modify the value of the `Watch` using a closure.
pub fn modify<F>(&self, mut f: F)
pub fn send_modify<F>(&self, mut f: F)
where
F: Fn(&mut Option<T>),
{
self.watch.modify(&mut f)
self.watch.send_modify(&mut f)
}
/// Modify the value of the `Watch` using a closure. The closure must return
/// `true` if the value was modified, which notifies all receivers.
pub fn send_if_modified<F>(&self, mut f: F)
where
F: Fn(&mut Option<T>) -> bool,
{
self.watch.send_if_modified(&mut f)
}
}
@ -521,38 +551,6 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
}
}
/// Returns the current value of the `Watch` once it is initialized, **without** marking it as seen.
///
/// **Note**: Futures do nothing unless you `.await` or poll them.
pub async fn peek(&self) -> T {
poll_fn(|cx| self.watch.poll_peek(cx)).await
}
/// Tries to peek the current value of the `Watch` without waiting, and **without** marking it as seen.
pub fn try_peek(&self) -> Option<T> {
self.watch.try_peek()
}
/// Returns the current value of the `Watch` if it matches the predicate function `f`,
/// or waits for it to match, **without** marking it as seen.
///
/// **Note**: Futures do nothing unless you `.await` or poll them.
pub async fn peek_and<F>(&self, mut f: F) -> T
where
F: Fn(&T) -> bool,
{
poll_fn(|cx| self.watch.poll_peek_and(&mut f, cx)).await
}
/// Tries to peek the current value of the `Watch` if it matches the predicate
/// function `f` without waiting, and **without** marking it as seen.
pub fn try_peek_and<F>(&self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_peek_and(&mut f)
}
/// Returns the current value of the `Watch` once it is initialized, marking it as seen.
///
/// **Note**: Futures do nothing unless you `.await` or poll them.
@ -562,7 +560,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
/// Tries to get the current value of the `Watch` without waiting, marking it as seen.
pub fn try_get(&mut self) -> Option<T> {
self.watch.try_get(&mut self.at_id)
self.watch.try_get(Some(&mut self.at_id))
}
/// Returns the value of the `Watch` if it matches the predicate function `f`,
@ -582,7 +580,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
where
F: Fn(&T) -> bool,
{
self.watch.try_get_and(&mut self.at_id, &mut f)
self.watch.try_get_and(Some(&mut self.at_id), &mut f)
}
/// Waits for the `Watch` to change and returns the new value, marking it as seen.
@ -618,7 +616,7 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Rcv<'a, T, W> {
}
/// Checks if the `Watch` contains a value. If this returns true,
/// then awaiting [`Rcv::get`] and [`Rcv::peek`] will return immediately.
/// then awaiting [`Rcv::get`] will return immediately.
pub fn contains_value(&self) -> bool {
self.watch.contains_value()
}
@ -630,6 +628,58 @@ impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> Drop for Rcv<'a, T, W> {
}
}
/// A anonymous receiver can NOT `.await` a change in the `Watch` value.
pub struct AnonRcv<'a, T: Clone, W: WatchBehavior<T> + ?Sized> {
watch: &'a W,
at_id: u64,
_phantom: PhantomData<T>,
}
impl<'a, T: Clone, W: WatchBehavior<T> + ?Sized> AnonRcv<'a, T, W> {
/// Creates a new `Receiver` with a reference to the `Watch`.
fn new(watch: &'a W, at_id: u64) -> Self {
Self {
watch,
at_id,
_phantom: PhantomData,
}
}
/// Tries to get the current value of the `Watch` without waiting, marking it as seen.
pub fn try_get(&mut self) -> Option<T> {
self.watch.try_get(Some(&mut self.at_id))
}
/// Tries to get the current value of the `Watch` if it matches the predicate
/// function `f` without waiting, marking it as seen.
pub fn try_get_and<F>(&mut self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_get_and(Some(&mut self.at_id), &mut f)
}
/// Tries to get the new value of the watch without waiting, marking it as seen.
pub fn try_changed(&mut self) -> Option<T> {
self.watch.try_changed(&mut self.at_id)
}
/// Tries to get the new value of the watch which satisfies the predicate
/// function `f` and returns the new value without waiting, marking it as seen.
pub fn try_changed_and<F>(&mut self, mut f: F) -> Option<T>
where
F: Fn(&T) -> bool,
{
self.watch.try_changed_and(&mut self.at_id, &mut f)
}
/// Checks if the `Watch` contains a value. If this returns true,
/// then awaiting [`Rcv::get`] will return immediately.
pub fn contains_value(&self) -> bool {
self.watch.contains_value()
}
}
/// A receiver of a `Watch` channel.
pub struct Receiver<'a, M: RawMutex, T: Clone, const N: usize>(Rcv<'a, T, Watch<M, T, N>>);
@ -682,6 +732,58 @@ impl<'a, T: Clone> DerefMut for DynReceiver<'a, T> {
}
}
/// A receiver of a `Watch` channel that cannot `.await` values.
pub struct AnonReceiver<'a, M: RawMutex, T: Clone, const N: usize>(AnonRcv<'a, T, Watch<M, T, N>>);
impl<'a, M: RawMutex, T: Clone, const N: usize> AnonReceiver<'a, M, T, N> {
/// Converts the `Receiver` into a [`DynReceiver`].
pub fn as_dyn(self) -> DynAnonReceiver<'a, T> {
let rcv = DynAnonReceiver(AnonRcv::new(self.0.watch, self.at_id));
core::mem::forget(self); // Ensures the destructor is not called
rcv
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Into<DynAnonReceiver<'a, T>> for AnonReceiver<'a, M, T, N> {
fn into(self) -> DynAnonReceiver<'a, T> {
self.as_dyn()
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> Deref for AnonReceiver<'a, M, T, N> {
type Target = AnonRcv<'a, T, Watch<M, T, N>>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a, M: RawMutex, T: Clone, const N: usize> DerefMut for AnonReceiver<'a, M, T, N> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
/// A receiver that cannot `.await` value, which holds a **dynamic** reference to a `Watch` channel.
///
/// This is an alternative to [`AnonReceiver`] with a simpler type definition, at the expense of
/// some runtime performance due to dynamic dispatch.
pub struct DynAnonReceiver<'a, T: Clone>(AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>);
impl<'a, T: Clone> Deref for DynAnonReceiver<'a, T> {
type Target = AnonRcv<'a, T, dyn WatchBehavior<T> + 'a>;
fn deref(&self) -> &Self::Target {
&self.0
}
}
impl<'a, T: Clone> DerefMut for DynAnonReceiver<'a, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
}
}
#[cfg(test)]
mod tests {
use futures_executor::block_on;
@ -715,6 +817,72 @@ mod tests {
block_on(f);
}
#[test]
fn all_try_get() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 1> = Watch::new();
// Obtain receiver and sender
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
// Not initialized
assert_eq!(WATCH.try_get(), None);
assert_eq!(rcv.try_get(), None);
assert_eq!(snd.try_get(), None);
// Receive the new value
snd.send(10);
assert_eq!(WATCH.try_get(), Some(10));
assert_eq!(rcv.try_get(), Some(10));
assert_eq!(snd.try_get(), Some(10));
assert_eq!(WATCH.try_get_and(|x| x > &5), Some(10));
assert_eq!(rcv.try_get_and(|x| x > &5), Some(10));
assert_eq!(snd.try_get_and(|x| x > &5), Some(10));
assert_eq!(WATCH.try_get_and(|x| x < &5), None);
assert_eq!(rcv.try_get_and(|x| x < &5), None);
assert_eq!(snd.try_get_and(|x| x < &5), None);
};
block_on(f);
}
#[test]
fn once_lock_like() {
let f = async {
static CONFIG0: u8 = 10;
static CONFIG1: u8 = 20;
static WATCH: Watch<CriticalSectionRawMutex, &'static u8, 1> = Watch::new();
// Obtain receiver and sender
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
// Not initialized
assert_eq!(rcv.try_changed(), None);
// Receive the new value
snd.send(&CONFIG0);
let rcv0 = rcv.changed().await;
assert_eq!(rcv0, &10);
// Receive another value
snd.send(&CONFIG1);
let rcv1 = rcv.try_changed();
assert_eq!(rcv1, Some(&20));
// No update
assert_eq!(rcv.try_changed(), None);
// Ensure similarity with original static
assert_eq!(rcv0, &CONFIG0);
assert_eq!(rcv1, Some(&CONFIG1));
};
block_on(f);
}
#[test]
fn sender_modify() {
let f = async {
@ -729,7 +897,7 @@ mod tests {
assert_eq!(rcv.try_changed(), Some(10));
// Modify the value inplace
snd.modify(|opt| {
snd.send_modify(|opt| {
if let Some(inner) = opt {
*inner += 5;
}
@ -751,11 +919,6 @@ mod tests {
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
snd.send(10);
assert_eq!(rcv.try_peek_and(|x| x > &5), Some(10));
assert_eq!(rcv.try_peek_and(|x| x < &5), None);
assert!(rcv.try_changed().is_some());
snd.send(15);
assert_eq!(rcv.try_get_and(|x| x > &5), Some(15));
assert_eq!(rcv.try_get_and(|x| x < &5), None);
@ -771,7 +934,6 @@ mod tests {
snd.send(30);
assert_eq!(rcv.changed_and(|x| x > &5).await, 30);
assert_eq!(rcv.peek_and(|x| x > &5).await, 30);
assert_eq!(rcv.get_and(|x| x > &5).await, 30);
};
block_on(f);
@ -825,7 +987,7 @@ mod tests {
// Obtain receivers and sender
let mut rcv0 = WATCH.receiver().unwrap();
let mut rcv1 = WATCH.receiver().unwrap();
let mut rcv1 = WATCH.anon_receiver();
let snd = WATCH.sender();
// No update for both
@ -864,41 +1026,13 @@ mod tests {
block_on(f);
}
#[test]
fn peek_get_changed() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
// Obtain receiver and sender
let mut rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
// Send a value
snd.send(10);
// Ensure peek does not mark as seen
assert_eq!(rcv.peek().await, 10);
assert_eq!(rcv.try_changed(), Some(10));
assert_eq!(rcv.try_changed(), None);
assert_eq!(rcv.try_peek(), Some(10));
// Send a value
snd.send(20);
// Ensure get does mark as seen
assert_eq!(rcv.get().await, 20);
assert_eq!(rcv.try_changed(), None);
assert_eq!(rcv.try_get(), Some(20));
};
block_on(f);
}
#[test]
fn use_dynamics() {
let f = async {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
// Obtain receiver and sender
let mut anon_rcv = WATCH.dyn_anon_receiver();
let mut dyn_rcv = WATCH.dyn_receiver().unwrap();
let dyn_snd = WATCH.dyn_sender();
@ -906,6 +1040,7 @@ mod tests {
dyn_snd.send(10);
// Ensure the dynamic receiver receives the value
assert_eq!(anon_rcv.try_changed(), Some(10));
assert_eq!(dyn_rcv.try_changed(), Some(10));
assert_eq!(dyn_rcv.try_changed(), None);
};
@ -918,10 +1053,12 @@ mod tests {
static WATCH: Watch<CriticalSectionRawMutex, u8, 2> = Watch::new();
// Obtain receiver and sender
let anon_rcv = WATCH.anon_receiver();
let rcv = WATCH.receiver().unwrap();
let snd = WATCH.sender();
// Convert to dynamic
let mut dyn_anon_rcv = anon_rcv.as_dyn();
let mut dyn_rcv = rcv.as_dyn();
let dyn_snd = snd.as_dyn();
@ -929,6 +1066,7 @@ mod tests {
dyn_snd.send(10);
// Ensure the dynamic receiver receives the value
assert_eq!(dyn_anon_rcv.try_changed(), Some(10));
assert_eq!(dyn_rcv.try_changed(), Some(10));
assert_eq!(dyn_rcv.try_changed(), None);
};