diff --git a/embassy-sync/src/pubsub/mod.rs b/embassy-sync/src/pubsub/mod.rs index adbf8cf02..af3d6db2a 100644 --- a/embassy-sync/src/pubsub/mod.rs +++ b/embassy-sync/src/pubsub/mod.rs @@ -248,13 +248,6 @@ impl usize { - self.inner.lock(|s| { - let s = s.borrow(); - s.queue.capacity() - s.queue.len() - }) - } - fn unregister_subscriber(&self, subscriber_next_message_id: u64) { self.inner.lock(|s| { let mut s = s.borrow_mut(); @@ -268,6 +261,26 @@ impl usize { + self.capacity() + } + + fn free_capacity(&self) -> usize { + self.free_capacity() + } + + fn len(&self) -> usize { + self.len() + } + + fn is_empty(&self) -> bool { + self.is_empty() + } + + fn is_full(&self) -> bool { + self.is_full() + } } /// Internal state for the PubSub channel @@ -439,8 +452,22 @@ trait SealedPubSubBehavior { /// Publish a message immediately fn publish_immediate(&self, message: T); - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers - fn space(&self) -> usize; + /// Returns the maximum number of elements the channel can hold. + fn capacity(&self) -> usize; + + /// Returns the free capacity of the channel. + /// + /// This is equivalent to `capacity() - len()` + fn free_capacity(&self) -> usize; + + /// Returns the number of elements currently in the channel. + fn len(&self) -> usize; + + /// Returns whether the channel is empty. + fn is_empty(&self) -> bool; + + /// Returns whether the channel is full. + fn is_full(&self) -> bool; /// Let the channel know that a subscriber has dropped fn unregister_subscriber(&self, subscriber_next_message_id: u64); @@ -588,6 +615,7 @@ mod tests { assert_eq!(pub0.try_publish(0), Ok(())); assert_eq!(pub0.try_publish(0), Ok(())); assert_eq!(pub0.try_publish(0), Ok(())); + assert!(pub0.is_full()); assert_eq!(pub0.try_publish(0), Err(0)); drop(sub0); @@ -620,32 +648,42 @@ mod tests { } #[futures_test::test] - async fn correct_space() { + async fn correct_len() { let channel = PubSubChannel::::new(); let mut sub0 = channel.subscriber().unwrap(); let mut sub1 = channel.subscriber().unwrap(); let pub0 = channel.publisher().unwrap(); - assert_eq!(pub0.space(), 4); + assert!(sub0.is_empty()); + assert!(sub1.is_empty()); + assert!(pub0.is_empty()); + assert_eq!(pub0.free_capacity(), 4); + assert_eq!(pub0.len(), 0); pub0.publish(42).await; - assert_eq!(pub0.space(), 3); + assert_eq!(pub0.free_capacity(), 3); + assert_eq!(pub0.len(), 1); pub0.publish(42).await; - assert_eq!(pub0.space(), 2); + assert_eq!(pub0.free_capacity(), 2); + assert_eq!(pub0.len(), 2); sub0.next_message().await; sub0.next_message().await; - assert_eq!(pub0.space(), 2); + assert_eq!(pub0.free_capacity(), 2); + assert_eq!(pub0.len(), 2); sub1.next_message().await; - assert_eq!(pub0.space(), 3); + assert_eq!(pub0.free_capacity(), 3); + assert_eq!(pub0.len(), 1); + sub1.next_message().await; - assert_eq!(pub0.space(), 4); + assert_eq!(pub0.free_capacity(), 4); + assert_eq!(pub0.len(), 0); } #[futures_test::test] @@ -656,29 +694,29 @@ mod tests { let mut sub0 = channel.subscriber().unwrap(); let mut sub1 = channel.subscriber().unwrap(); - assert_eq!(4, pub0.space()); + assert_eq!(4, pub0.free_capacity()); pub0.publish(1).await; pub0.publish(2).await; - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); assert_eq!(1, sub0.try_next_message_pure().unwrap()); assert_eq!(2, sub0.try_next_message_pure().unwrap()); - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); drop(sub0); - assert_eq!(2, channel.space()); + assert_eq!(2, channel.free_capacity()); assert_eq!(1, sub1.try_next_message_pure().unwrap()); - assert_eq!(3, channel.space()); + assert_eq!(3, channel.free_capacity()); drop(sub1); - assert_eq!(4, channel.space()); + assert_eq!(4, channel.free_capacity()); } struct CloneCallCounter(usize); diff --git a/embassy-sync/src/pubsub/publisher.rs b/embassy-sync/src/pubsub/publisher.rs index e1edc9eb9..26e2c63b7 100644 --- a/embassy-sync/src/pubsub/publisher.rs +++ b/embassy-sync/src/pubsub/publisher.rs @@ -43,12 +43,31 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Pub<'a, PSB, T> { self.channel.publish_with_context(message, None) } - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. /// - /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. - /// So checking doesn't give any guarantees.* - pub fn space(&self) -> usize { - self.channel.space() + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() } } @@ -124,12 +143,31 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> ImmediatePub<'a, PSB, T> { self.channel.publish_with_context(message, None) } - /// The amount of messages that can still be published without having to wait or without having to lag the subscribers + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. /// - /// *Note: In the time between checking this and a publish action, other publishers may have had time to publish something. - /// So checking doesn't give any guarantees.* - pub fn space(&self) -> usize { - self.channel.space() + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() } } diff --git a/embassy-sync/src/pubsub/subscriber.rs b/embassy-sync/src/pubsub/subscriber.rs index f420a75f0..2e2bd26a9 100644 --- a/embassy-sync/src/pubsub/subscriber.rs +++ b/embassy-sync/src/pubsub/subscriber.rs @@ -65,10 +65,39 @@ impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Sub<'a, PSB, T> { } } - /// The amount of messages this subscriber hasn't received yet + /// The amount of messages this subscriber hasn't received yet. This is like [Self::len] but specifically + /// for this subscriber. pub fn available(&self) -> u64 { self.channel.available(self.next_message_id) } + + /// Returns the maximum number of elements the ***channel*** can hold. + pub fn capacity(&self) -> usize { + self.channel.capacity() + } + + /// Returns the free capacity of the ***channel***. + /// + /// This is equivalent to `capacity() - len()` + pub fn free_capacity(&self) -> usize { + self.channel.free_capacity() + } + + /// Returns the number of elements currently in the ***channel***. + /// See [Self::available] for how many messages are available for this subscriber. + pub fn len(&self) -> usize { + self.channel.len() + } + + /// Returns whether the ***channel*** is empty. + pub fn is_empty(&self) -> bool { + self.channel.is_empty() + } + + /// Returns whether the ***channel*** is full. + pub fn is_full(&self) -> bool { + self.channel.is_full() + } } impl<'a, PSB: PubSubBehavior + ?Sized, T: Clone> Drop for Sub<'a, PSB, T> {