From b1ec460b9af131ef80fcafd79a7f63aa326aaf94 Mon Sep 17 00:00:00 2001 From: Ruben De Smet Date: Fri, 11 Aug 2023 11:30:29 +0200 Subject: [PATCH] Implement Channel::poll_receive(..) -> Poll --- embassy-stm32-wpan/src/mac/driver.rs | 2 +- embassy-sync/src/channel.rs | 43 +++++++++++++++++++++++++--- 2 files changed, 40 insertions(+), 5 deletions(-) diff --git a/embassy-stm32-wpan/src/mac/driver.rs b/embassy-stm32-wpan/src/mac/driver.rs index a58ee231b..93898d888 100644 --- a/embassy-stm32-wpan/src/mac/driver.rs +++ b/embassy-stm32-wpan/src/mac/driver.rs @@ -46,7 +46,7 @@ impl<'d> embassy_net_driver::Driver for Driver<'d> { } fn transmit(&mut self, cx: &mut Context) -> Option> { - if self.runner.tx_buf_channel.poll_ready_to_receive(cx) { + if self.runner.tx_buf_channel.poll_ready_to_receive(cx).is_ready() { Some(TxToken { tx: &self.runner.tx_channel, tx_buf: &self.runner.tx_buf_channel, diff --git a/embassy-sync/src/channel.rs b/embassy-sync/src/channel.rs index e7224856c..dc727fb10 100644 --- a/embassy-sync/src/channel.rs +++ b/embassy-sync/src/channel.rs @@ -165,6 +165,13 @@ where pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { self.channel.poll_ready_to_receive(cx) } + + /// Poll the channel for the next item + /// + /// See [`Channel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } } /// Receive-only access to a [`Channel`] without knowing channel size. @@ -201,6 +208,13 @@ impl<'ch, T> DynamicReceiver<'ch, T> { pub fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { self.channel.poll_ready_to_receive(cx) } + + /// Poll the channel for the next item + /// + /// See [`Channel::poll_receive()`] + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.channel.poll_receive(cx) + } } impl<'ch, M, T, const N: usize> From> for DynamicReceiver<'ch, T> @@ -228,10 +242,7 @@ where type Output = T; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.channel.try_recv_with_context(Some(cx)) { - Ok(v) => Poll::Ready(v), - Err(TryRecvError::Empty) => Poll::Pending, - } + self.channel.poll_receive(cx) } } @@ -317,6 +328,8 @@ trait DynamicChannel { fn poll_ready_to_send(&self, cx: &mut Context<'_>) -> Poll<()>; fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()>; + + fn poll_receive(&self, cx: &mut Context<'_>) -> Poll; } /// Error returned by [`try_recv`](Channel::try_recv). @@ -370,6 +383,19 @@ impl ChannelState { } } + fn poll_receive(&mut self, cx: &mut Context<'_>) -> Poll { + if self.queue.is_full() { + self.senders_waker.wake(); + } + + if let Some(message) = self.queue.pop_front() { + Poll::Ready(message) + } else { + self.receiver_waker.register(cx.waker()); + Poll::Pending + } + } + fn poll_ready_to_receive(&mut self, cx: &mut Context<'_>) -> Poll<()> { self.receiver_waker.register(cx.waker()); @@ -452,6 +478,11 @@ where self.lock(|c| c.try_recv_with_context(cx)) } + /// Poll the channel for the next message + pub fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + self.lock(|c| c.poll_receive(cx)) + } + fn try_send_with_context(&self, m: T, cx: Option<&mut Context<'_>>) -> Result<(), TrySendError> { self.lock(|c| c.try_send_with_context(m, cx)) } @@ -539,6 +570,10 @@ where fn poll_ready_to_receive(&self, cx: &mut Context<'_>) -> Poll<()> { Channel::poll_ready_to_receive(self, cx) } + + fn poll_receive(&self, cx: &mut Context<'_>) -> Poll { + Channel::poll_receive(self, cx) + } } #[cfg(test)]