mirror of
https://github.com/embassy-rs/embassy.git
synced 2024-10-30 05:40:55 +00:00
Move embassy-net-driver-channel::zerocopy_channel to embassy_sync::zero_copy_channel
This commit is contained in:
parent
3295ec94e5
commit
a2656f402b
@ -14,6 +14,7 @@ use embassy_net_driver::{Capabilities, LinkState, Medium};
|
||||
use embassy_sync::blocking_mutex::raw::NoopRawMutex;
|
||||
use embassy_sync::blocking_mutex::Mutex;
|
||||
use embassy_sync::waitqueue::WakerRegistration;
|
||||
use embassy_sync::zero_copy_channel;
|
||||
|
||||
pub struct State<const MTU: usize, const N_RX: usize, const N_TX: usize> {
|
||||
rx: [PacketBuf<MTU>; N_RX],
|
||||
@ -34,8 +35,8 @@ impl<const MTU: usize, const N_RX: usize, const N_TX: usize> State<MTU, N_RX, N_
|
||||
}
|
||||
|
||||
struct StateInner<'d, const MTU: usize> {
|
||||
rx: zerocopy_channel::Channel<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
tx: zerocopy_channel::Channel<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
rx: zero_copy_channel::Channel<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
tx: zero_copy_channel::Channel<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
shared: Mutex<NoopRawMutex, RefCell<Shared>>,
|
||||
}
|
||||
|
||||
@ -47,8 +48,8 @@ struct Shared {
|
||||
}
|
||||
|
||||
pub struct Runner<'d, const MTU: usize> {
|
||||
tx_chan: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
rx_chan: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
tx_chan: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
rx_chan: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
shared: &'d Mutex<NoopRawMutex, RefCell<Shared>>,
|
||||
}
|
||||
|
||||
@ -58,11 +59,11 @@ pub struct StateRunner<'d> {
|
||||
}
|
||||
|
||||
pub struct RxRunner<'d, const MTU: usize> {
|
||||
rx_chan: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
rx_chan: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
}
|
||||
|
||||
pub struct TxRunner<'d, const MTU: usize> {
|
||||
tx_chan: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
tx_chan: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
}
|
||||
|
||||
impl<'d, const MTU: usize> Runner<'d, MTU> {
|
||||
@ -243,8 +244,8 @@ pub fn new<'d, const MTU: usize, const N_RX: usize, const N_TX: usize>(
|
||||
let state_uninit: *mut MaybeUninit<StateInner<'d, MTU>> =
|
||||
(&mut state.inner as *mut MaybeUninit<StateInner<'static, MTU>>).cast();
|
||||
let state = unsafe { &mut *state_uninit }.write(StateInner {
|
||||
rx: zerocopy_channel::Channel::new(&mut state.rx[..]),
|
||||
tx: zerocopy_channel::Channel::new(&mut state.tx[..]),
|
||||
rx: zero_copy_channel::Channel::new(&mut state.rx[..]),
|
||||
tx: zero_copy_channel::Channel::new(&mut state.tx[..]),
|
||||
shared: Mutex::new(RefCell::new(Shared {
|
||||
link_state: LinkState::Down,
|
||||
hardware_address,
|
||||
@ -282,8 +283,8 @@ impl<const MTU: usize> PacketBuf<MTU> {
|
||||
}
|
||||
|
||||
pub struct Device<'d, const MTU: usize> {
|
||||
rx: zerocopy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
tx: zerocopy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
rx: zero_copy_channel::Receiver<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
tx: zero_copy_channel::Sender<'d, NoopRawMutex, PacketBuf<MTU>>,
|
||||
shared: &'d Mutex<NoopRawMutex, RefCell<Shared>>,
|
||||
caps: Capabilities,
|
||||
}
|
||||
@ -328,7 +329,7 @@ impl<'d, const MTU: usize> embassy_net_driver::Driver for Device<'d, MTU> {
|
||||
}
|
||||
|
||||
pub struct RxToken<'a, const MTU: usize> {
|
||||
rx: zerocopy_channel::Receiver<'a, NoopRawMutex, PacketBuf<MTU>>,
|
||||
rx: zero_copy_channel::Receiver<'a, NoopRawMutex, PacketBuf<MTU>>,
|
||||
}
|
||||
|
||||
impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> {
|
||||
@ -345,7 +346,7 @@ impl<'a, const MTU: usize> embassy_net_driver::RxToken for RxToken<'a, MTU> {
|
||||
}
|
||||
|
||||
pub struct TxToken<'a, const MTU: usize> {
|
||||
tx: zerocopy_channel::Sender<'a, NoopRawMutex, PacketBuf<MTU>>,
|
||||
tx: zero_copy_channel::Sender<'a, NoopRawMutex, PacketBuf<MTU>>,
|
||||
}
|
||||
|
||||
impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> {
|
||||
@ -361,215 +362,3 @@ impl<'a, const MTU: usize> embassy_net_driver::TxToken for TxToken<'a, MTU> {
|
||||
r
|
||||
}
|
||||
}
|
||||
|
||||
mod zerocopy_channel {
|
||||
use core::cell::RefCell;
|
||||
use core::future::poll_fn;
|
||||
use core::marker::PhantomData;
|
||||
use core::task::{Context, Poll};
|
||||
|
||||
use embassy_sync::blocking_mutex::raw::RawMutex;
|
||||
use embassy_sync::blocking_mutex::Mutex;
|
||||
use embassy_sync::waitqueue::WakerRegistration;
|
||||
|
||||
pub struct Channel<'a, M: RawMutex, T> {
|
||||
buf: *mut T,
|
||||
phantom: PhantomData<&'a mut T>,
|
||||
state: Mutex<M, RefCell<State>>,
|
||||
}
|
||||
|
||||
impl<'a, M: RawMutex, T> Channel<'a, M, T> {
|
||||
pub fn new(buf: &'a mut [T]) -> Self {
|
||||
let len = buf.len();
|
||||
assert!(len != 0);
|
||||
|
||||
Self {
|
||||
buf: buf.as_mut_ptr(),
|
||||
phantom: PhantomData,
|
||||
state: Mutex::new(RefCell::new(State {
|
||||
len,
|
||||
front: 0,
|
||||
back: 0,
|
||||
full: false,
|
||||
send_waker: WakerRegistration::new(),
|
||||
recv_waker: WakerRegistration::new(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
|
||||
(Sender { channel: self }, Receiver { channel: self })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Sender<'a, M: RawMutex, T> {
|
||||
channel: &'a Channel<'a, M, T>,
|
||||
}
|
||||
|
||||
impl<'a, M: RawMutex, T> Sender<'a, M, T> {
|
||||
pub fn borrow(&mut self) -> Sender<'_, M, T> {
|
||||
Sender { channel: self.channel }
|
||||
}
|
||||
|
||||
pub fn try_send(&mut self) -> Option<&mut T> {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.push_index() {
|
||||
Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
|
||||
None => None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.push_index() {
|
||||
Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
|
||||
None => {
|
||||
s.recv_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn send(&mut self) -> &mut T {
|
||||
let i = poll_fn(|cx| {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.push_index() {
|
||||
Some(i) => Poll::Ready(i),
|
||||
None => {
|
||||
s.recv_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.await;
|
||||
unsafe { &mut *self.channel.buf.add(i) }
|
||||
}
|
||||
|
||||
pub fn send_done(&mut self) {
|
||||
self.channel.state.lock(|s| s.borrow_mut().push_done())
|
||||
}
|
||||
}
|
||||
pub struct Receiver<'a, M: RawMutex, T> {
|
||||
channel: &'a Channel<'a, M, T>,
|
||||
}
|
||||
|
||||
impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
|
||||
pub fn borrow(&mut self) -> Receiver<'_, M, T> {
|
||||
Receiver { channel: self.channel }
|
||||
}
|
||||
|
||||
pub fn try_recv(&mut self) -> Option<&mut T> {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.pop_index() {
|
||||
Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
|
||||
None => None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.pop_index() {
|
||||
Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
|
||||
None => {
|
||||
s.send_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn recv(&mut self) -> &mut T {
|
||||
let i = poll_fn(|cx| {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.pop_index() {
|
||||
Some(i) => Poll::Ready(i),
|
||||
None => {
|
||||
s.send_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.await;
|
||||
unsafe { &mut *self.channel.buf.add(i) }
|
||||
}
|
||||
|
||||
pub fn recv_done(&mut self) {
|
||||
self.channel.state.lock(|s| s.borrow_mut().pop_done())
|
||||
}
|
||||
}
|
||||
|
||||
struct State {
|
||||
len: usize,
|
||||
|
||||
/// Front index. Always 0..=(N-1)
|
||||
front: usize,
|
||||
/// Back index. Always 0..=(N-1).
|
||||
back: usize,
|
||||
|
||||
/// Used to distinguish "empty" and "full" cases when `front == back`.
|
||||
/// May only be `true` if `front == back`, always `false` otherwise.
|
||||
full: bool,
|
||||
|
||||
send_waker: WakerRegistration,
|
||||
recv_waker: WakerRegistration,
|
||||
}
|
||||
|
||||
impl State {
|
||||
fn increment(&self, i: usize) -> usize {
|
||||
if i + 1 == self.len {
|
||||
0
|
||||
} else {
|
||||
i + 1
|
||||
}
|
||||
}
|
||||
|
||||
fn is_full(&self) -> bool {
|
||||
self.full
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.front == self.back && !self.full
|
||||
}
|
||||
|
||||
fn push_index(&mut self) -> Option<usize> {
|
||||
match self.is_full() {
|
||||
true => None,
|
||||
false => Some(self.back),
|
||||
}
|
||||
}
|
||||
|
||||
fn push_done(&mut self) {
|
||||
assert!(!self.is_full());
|
||||
self.back = self.increment(self.back);
|
||||
if self.back == self.front {
|
||||
self.full = true;
|
||||
}
|
||||
self.send_waker.wake();
|
||||
}
|
||||
|
||||
fn pop_index(&mut self) -> Option<usize> {
|
||||
match self.is_empty() {
|
||||
true => None,
|
||||
false => Some(self.front),
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_done(&mut self) {
|
||||
assert!(!self.is_empty());
|
||||
self.front = self.increment(self.front);
|
||||
self.full = false;
|
||||
self.recv_waker.wake();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -17,3 +17,4 @@ pub mod pipe;
|
||||
pub mod pubsub;
|
||||
pub mod signal;
|
||||
pub mod waitqueue;
|
||||
pub mod zero_copy_channel;
|
||||
|
209
embassy-sync/src/zero_copy_channel.rs
Normal file
209
embassy-sync/src/zero_copy_channel.rs
Normal file
@ -0,0 +1,209 @@
|
||||
use core::cell::RefCell;
|
||||
use core::future::poll_fn;
|
||||
use core::marker::PhantomData;
|
||||
use core::task::{Context, Poll};
|
||||
|
||||
use crate::blocking_mutex::raw::RawMutex;
|
||||
use crate::blocking_mutex::Mutex;
|
||||
use crate::waitqueue::WakerRegistration;
|
||||
|
||||
pub struct Channel<'a, M: RawMutex, T> {
|
||||
buf: *mut T,
|
||||
phantom: PhantomData<&'a mut T>,
|
||||
state: Mutex<M, RefCell<State>>,
|
||||
}
|
||||
|
||||
impl<'a, M: RawMutex, T> Channel<'a, M, T> {
|
||||
pub fn new(buf: &'a mut [T]) -> Self {
|
||||
let len = buf.len();
|
||||
assert!(len != 0);
|
||||
|
||||
Self {
|
||||
buf: buf.as_mut_ptr(),
|
||||
phantom: PhantomData,
|
||||
state: Mutex::new(RefCell::new(State {
|
||||
len,
|
||||
front: 0,
|
||||
back: 0,
|
||||
full: false,
|
||||
send_waker: WakerRegistration::new(),
|
||||
recv_waker: WakerRegistration::new(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn split(&mut self) -> (Sender<'_, M, T>, Receiver<'_, M, T>) {
|
||||
(Sender { channel: self }, Receiver { channel: self })
|
||||
}
|
||||
}
|
||||
|
||||
pub struct Sender<'a, M: RawMutex, T> {
|
||||
channel: &'a Channel<'a, M, T>,
|
||||
}
|
||||
|
||||
impl<'a, M: RawMutex, T> Sender<'a, M, T> {
|
||||
pub fn borrow(&mut self) -> Sender<'_, M, T> {
|
||||
Sender { channel: self.channel }
|
||||
}
|
||||
|
||||
pub fn try_send(&mut self) -> Option<&mut T> {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.push_index() {
|
||||
Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
|
||||
None => None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn poll_send(&mut self, cx: &mut Context) -> Poll<&mut T> {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.push_index() {
|
||||
Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
|
||||
None => {
|
||||
s.recv_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn send(&mut self) -> &mut T {
|
||||
let i = poll_fn(|cx| {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.push_index() {
|
||||
Some(i) => Poll::Ready(i),
|
||||
None => {
|
||||
s.recv_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.await;
|
||||
unsafe { &mut *self.channel.buf.add(i) }
|
||||
}
|
||||
|
||||
pub fn send_done(&mut self) {
|
||||
self.channel.state.lock(|s| s.borrow_mut().push_done())
|
||||
}
|
||||
}
|
||||
pub struct Receiver<'a, M: RawMutex, T> {
|
||||
channel: &'a Channel<'a, M, T>,
|
||||
}
|
||||
|
||||
impl<'a, M: RawMutex, T> Receiver<'a, M, T> {
|
||||
pub fn borrow(&mut self) -> Receiver<'_, M, T> {
|
||||
Receiver { channel: self.channel }
|
||||
}
|
||||
|
||||
pub fn try_recv(&mut self) -> Option<&mut T> {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.pop_index() {
|
||||
Some(i) => Some(unsafe { &mut *self.channel.buf.add(i) }),
|
||||
None => None,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn poll_recv(&mut self, cx: &mut Context) -> Poll<&mut T> {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.pop_index() {
|
||||
Some(i) => Poll::Ready(unsafe { &mut *self.channel.buf.add(i) }),
|
||||
None => {
|
||||
s.send_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn recv(&mut self) -> &mut T {
|
||||
let i = poll_fn(|cx| {
|
||||
self.channel.state.lock(|s| {
|
||||
let s = &mut *s.borrow_mut();
|
||||
match s.pop_index() {
|
||||
Some(i) => Poll::Ready(i),
|
||||
None => {
|
||||
s.send_waker.register(cx.waker());
|
||||
Poll::Pending
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
.await;
|
||||
unsafe { &mut *self.channel.buf.add(i) }
|
||||
}
|
||||
|
||||
pub fn recv_done(&mut self) {
|
||||
self.channel.state.lock(|s| s.borrow_mut().pop_done())
|
||||
}
|
||||
}
|
||||
|
||||
struct State {
|
||||
len: usize,
|
||||
|
||||
/// Front index. Always 0..=(N-1)
|
||||
front: usize,
|
||||
/// Back index. Always 0..=(N-1).
|
||||
back: usize,
|
||||
|
||||
/// Used to distinguish "empty" and "full" cases when `front == back`.
|
||||
/// May only be `true` if `front == back`, always `false` otherwise.
|
||||
full: bool,
|
||||
|
||||
send_waker: WakerRegistration,
|
||||
recv_waker: WakerRegistration,
|
||||
}
|
||||
|
||||
impl State {
|
||||
fn increment(&self, i: usize) -> usize {
|
||||
if i + 1 == self.len {
|
||||
0
|
||||
} else {
|
||||
i + 1
|
||||
}
|
||||
}
|
||||
|
||||
fn is_full(&self) -> bool {
|
||||
self.full
|
||||
}
|
||||
|
||||
fn is_empty(&self) -> bool {
|
||||
self.front == self.back && !self.full
|
||||
}
|
||||
|
||||
fn push_index(&mut self) -> Option<usize> {
|
||||
match self.is_full() {
|
||||
true => None,
|
||||
false => Some(self.back),
|
||||
}
|
||||
}
|
||||
|
||||
fn push_done(&mut self) {
|
||||
assert!(!self.is_full());
|
||||
self.back = self.increment(self.back);
|
||||
if self.back == self.front {
|
||||
self.full = true;
|
||||
}
|
||||
self.send_waker.wake();
|
||||
}
|
||||
|
||||
fn pop_index(&mut self) -> Option<usize> {
|
||||
match self.is_empty() {
|
||||
true => None,
|
||||
false => Some(self.front),
|
||||
}
|
||||
}
|
||||
|
||||
fn pop_done(&mut self) {
|
||||
assert!(!self.is_empty());
|
||||
self.front = self.increment(self.front);
|
||||
self.full = false;
|
||||
self.recv_waker.wake();
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user