From 3778f55d80f70b336f6ca846f365cf619032a685 Mon Sep 17 00:00:00 2001 From: huntc Date: Thu, 15 Jul 2021 12:08:35 +1000 Subject: [PATCH] Provides a cleaner construction of the channel with the common "new" naming --- embassy/src/util/mpsc.rs | 122 ++++++++++------------------------- embassy/src/util/mutex.rs | 14 ++++ examples/nrf/src/bin/mpsc.rs | 3 +- 3 files changed, 51 insertions(+), 88 deletions(-) diff --git a/embassy/src/util/mpsc.rs b/embassy/src/util/mpsc.rs index 246bd27e4..cc9e2a5dd 100644 --- a/embassy/src/util/mpsc.rs +++ b/embassy/src/util/mpsc.rs @@ -581,75 +581,27 @@ where pub type WithCriticalSections = CriticalSectionMutex<()>; -impl Channel { - /// Establish a new bounded channel using critical sections. Critical sections - /// should be used only single core targets where communication is required - /// from exception mode e.g. interrupt handlers. To create one: - /// - /// ``` - /// use embassy::util::mpsc; - /// use embassy::util::mpsc::{Channel, WithCriticalSections}; - /// - /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::with_critical_sections(); - /// // once we have a channel, obtain its sender and receiver - /// let (sender, receiver) = mpsc::split(&mut channel); - /// ``` - pub const fn with_critical_sections() -> Self { - let mutex = CriticalSectionMutex::new(()); - let state = ChannelState::new(); - let channel_cell = ChannelCell { mutex, state }; - Channel { - channel_cell: UnsafeCell::new(channel_cell), - receiver_consumed: PhantomData, - } - } -} - pub type WithThreadModeOnly = ThreadModeMutex<()>; -impl Channel { - /// Establish a new bounded channel for use in Cortex-M thread mode. Thread - /// mode is intended for application threads on a single core, not interrupts. - /// As such, only one task at a time can acquire a resource and so this - /// channel avoids all locks. To create one: - /// - /// ``` no_run - /// use embassy::util::mpsc; - /// use embassy::util::mpsc::{Channel, WithThreadModeOnly}; - /// - /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::with_thread_mode_only(); - /// // once we have a channel, obtain its sender and receiver - /// let (sender, receiver) = mpsc::split(&mut channel); - /// ``` - pub const fn with_thread_mode_only() -> Self { - let mutex = ThreadModeMutex::new(()); - let state = ChannelState::new(); - let channel_cell = ChannelCell { mutex, state }; - Channel { - channel_cell: UnsafeCell::new(channel_cell), - receiver_consumed: PhantomData, - } - } -} - pub type WithNoThreads = NoopMutex<()>; -impl Channel { - /// Establish a new bounded channel for within a single thread. To create one: +impl Channel +where + M: Mutex, +{ + /// Establish a new bounded channel. For example, to create one with a NoopMutex: /// /// ``` /// use embassy::util::mpsc; /// use embassy::util::mpsc::{Channel, WithNoThreads}; /// /// // Declare a bounded channel of 3 u32s. - /// let mut channel = Channel::::with_no_threads(); + /// let mut channel = Channel::::new(); /// // once we have a channel, obtain its sender and receiver /// let (sender, receiver) = mpsc::split(&mut channel); /// ``` - pub const fn with_no_threads() -> Self { - let mutex = NoopMutex::new(()); + pub fn new() -> Self { + let mutex = M::new(()); let state = ChannelState::new(); let channel_cell = ChannelCell { mutex, state }; Channel { @@ -657,12 +609,7 @@ impl Channel { receiver_consumed: PhantomData, } } -} -impl Channel -where - M: Mutex, -{ fn lock( channel_cell: &UnsafeCell>, f: impl FnOnce(&mut ChannelState) -> R, @@ -684,6 +631,8 @@ mod tests { use futures_executor::ThreadPool; use futures_timer::Delay; + use crate::util::Forever; + use super::*; fn capacity(c: &ChannelState) -> usize { @@ -758,7 +707,7 @@ mod tests { #[test] fn simple_send_and_receive() { - let mut c = Channel::::with_no_threads(); + let mut c = Channel::::new(); let (s, r) = split(&mut c); assert!(s.clone().try_send(1).is_ok()); assert_eq!(r.try_recv().unwrap(), 1); @@ -766,7 +715,7 @@ mod tests { #[test] fn should_close_without_sender() { - let mut c = Channel::::with_no_threads(); + let mut c = Channel::::new(); let (s, r) = split(&mut c); drop(s); match r.try_recv() { @@ -777,7 +726,7 @@ mod tests { #[test] fn should_close_once_drained() { - let mut c = Channel::::with_no_threads(); + let mut c = Channel::::new(); let (s, r) = split(&mut c); assert!(s.try_send(1).is_ok()); drop(s); @@ -790,7 +739,7 @@ mod tests { #[test] fn should_reject_send_when_receiver_dropped() { - let mut c = Channel::::with_no_threads(); + let mut c = Channel::::new(); let (s, r) = split(&mut c); drop(r); match s.try_send(1) { @@ -801,7 +750,7 @@ mod tests { #[test] fn should_reject_send_when_channel_closed() { - let mut c = Channel::::with_no_threads(); + let mut c = Channel::::new(); let (s, mut r) = split(&mut c); assert!(s.try_send(1).is_ok()); r.close(); @@ -817,9 +766,9 @@ mod tests { async fn receiver_closes_when_sender_dropped_async() { let executor = ThreadPool::new().unwrap(); - static mut CHANNEL: Channel = - Channel::with_critical_sections(); - let (s, mut r) = split(unsafe { &mut CHANNEL }); + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, mut r) = split(c); assert!(executor .spawn(async move { drop(s); @@ -832,9 +781,9 @@ mod tests { async fn receiver_receives_given_try_send_async() { let executor = ThreadPool::new().unwrap(); - static mut CHANNEL: Channel = - Channel::with_critical_sections(); - let (s, mut r) = split(unsafe { &mut CHANNEL }); + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, mut r) = split(c); assert!(executor .spawn(async move { assert!(s.try_send(1).is_ok()); @@ -845,18 +794,17 @@ mod tests { #[futures_test::test] async fn sender_send_completes_if_capacity() { - static mut CHANNEL: Channel = - Channel::with_critical_sections(); - let (s, mut r) = split(unsafe { &mut CHANNEL }); + let mut c = Channel::::new(); + let (s, mut r) = split(&mut c); assert!(s.send(1).await.is_ok()); assert_eq!(r.recv().await, Some(1)); } #[futures_test::test] async fn sender_send_completes_if_closed() { - static mut CHANNEL: Channel = - Channel::with_critical_sections(); - let (s, r) = split(unsafe { &mut CHANNEL }); + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, r) = split(c); drop(r); match s.send(1).await { Err(SendError(1)) => assert!(true), @@ -868,9 +816,9 @@ mod tests { async fn senders_sends_wait_until_capacity() { let executor = ThreadPool::new().unwrap(); - static mut CHANNEL: Channel = - Channel::with_critical_sections(); - let (s0, mut r) = split(unsafe { &mut CHANNEL }); + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s0, mut r) = split(c); assert!(s0.try_send(1).is_ok()); let s1 = s0.clone(); let send_task_1 = executor.spawn_with_handle(async move { s0.send(2).await }); @@ -888,18 +836,18 @@ mod tests { #[futures_test::test] async fn sender_close_completes_if_closing() { - static mut CHANNEL: Channel = - Channel::with_critical_sections(); - let (s, mut r) = split(unsafe { &mut CHANNEL }); + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, mut r) = split(c); r.close(); s.closed().await; } #[futures_test::test] async fn sender_close_completes_if_closed() { - static mut CHANNEL: Channel = - Channel::with_critical_sections(); - let (s, r) = split(unsafe { &mut CHANNEL }); + static CHANNEL: Forever> = Forever::new(); + let c = CHANNEL.put(Channel::new()); + let (s, r) = split(c); drop(r); s.closed().await; } diff --git a/embassy/src/util/mutex.rs b/embassy/src/util/mutex.rs index c8fe84026..0506ffe6f 100644 --- a/embassy/src/util/mutex.rs +++ b/embassy/src/util/mutex.rs @@ -8,6 +8,8 @@ pub trait Mutex { /// Data protected by the mutex. type Data; + fn new(data: Self::Data) -> Self; + /// Creates a critical section and grants temporary access to the protected data. fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R; } @@ -47,6 +49,10 @@ impl CriticalSectionMutex { impl Mutex for CriticalSectionMutex { type Data = T; + fn new(data: T) -> Self { + Self::new(data) + } + fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { critical_section::with(|cs| f(self.borrow(cs))) } @@ -92,6 +98,10 @@ impl ThreadModeMutex { impl Mutex for ThreadModeMutex { type Data = T; + fn new(data: T) -> Self { + Self::new(data) + } + fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { f(self.borrow()) } @@ -126,6 +136,10 @@ impl NoopMutex { impl Mutex for NoopMutex { type Data = T; + fn new(data: T) -> Self { + Self::new(data) + } + fn lock(&mut self, f: impl FnOnce(&Self::Data) -> R) -> R { f(self.borrow()) } diff --git a/examples/nrf/src/bin/mpsc.rs b/examples/nrf/src/bin/mpsc.rs index c2cb107e1..443955239 100644 --- a/examples/nrf/src/bin/mpsc.rs +++ b/examples/nrf/src/bin/mpsc.rs @@ -37,9 +37,10 @@ async fn my_task(sender: Sender<'static, WithNoThreads, LedState, 1>) { #[embassy::main] async fn main(spawner: Spawner, p: Peripherals) { + let mut led = Output::new(p.P0_13, Level::Low, OutputDrive::Standard); - let channel = CHANNEL.put(Channel::with_no_threads()); + let channel = CHANNEL.put(Channel::new()); let (sender, mut receiver) = mpsc::split(channel); spawner.spawn(my_task(sender)).unwrap();