Implement embedded-nal-async traits for embassy-net

This commit is contained in:
Ulf Lilleengen 2022-08-08 16:51:34 +02:00
parent b7b4c84067
commit 18671b94ba
3 changed files with 171 additions and 1 deletions

View File

@ -31,6 +31,7 @@ pool-16 = []
pool-32 = []
pool-64 = []
pool-128 = []
unstable-traits = []
[dependencies]
@ -48,6 +49,8 @@ generic-array = { version = "0.14.4", default-features = false }
stable_deref_trait = { version = "1.2.0", default-features = false }
futures = { version = "0.3.17", default-features = false, features = [ "async-await" ] }
atomic-pool = "0.2.1"
atomic-polyfill = "0.1.5"
embedded-nal-async = "0.2.0"
[dependencies.smoltcp]
version = "0.8.0"

View File

@ -328,3 +328,170 @@ impl<'d> embedded_io::asynch::Write for TcpWriter<'d> {
self.io.flush()
}
}
#[cfg(feature = "unstable-traits")]
pub mod client {
use core::mem::MaybeUninit;
use core::ptr::NonNull;
use atomic_polyfill::{AtomicBool, Ordering};
use embedded_nal_async::IpAddr;
use super::*;
pub struct TcpClient<'d, D: Device, const N: usize, const TX_SZ: usize = 1024, const RX_SZ: usize = 1024> {
stack: &'d Stack<D>,
tx: &'d BufferPool<TX_SZ, N>,
rx: &'d BufferPool<RX_SZ, N>,
}
impl<'d, D: Device, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpClient<'d, D, N, TX_SZ, RX_SZ> {
pub fn new(stack: &'d Stack<D>, tx: &'d BufferPool<TX_SZ, N>, rx: &'d BufferPool<RX_SZ, N>) -> Self {
Self { stack, tx, rx }
}
}
impl<'d, D: Device, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_nal_async::TcpConnect
for TcpClient<'d, D, N, TX_SZ, RX_SZ>
{
type Error = Error;
type Connection<'m> = TcpConnection<'m, N, TX_SZ, RX_SZ> where Self: 'm;
type ConnectFuture<'m> = impl Future<Output = Result<Self::Connection<'m>, Self::Error>> + 'm
where
Self: 'm;
fn connect<'m>(&'m self, remote: embedded_nal_async::SocketAddr) -> Self::ConnectFuture<'m> {
async move {
let addr: crate::IpAddress = match remote.ip() {
IpAddr::V4(addr) => crate::IpAddress::Ipv4(crate::Ipv4Address::from_bytes(&addr.octets())),
#[cfg(feature = "proto-ipv6")]
IpAddr::V6(addr) => crate::IpAddress::Ipv6(crate::Ipv6Address::from_bytes(&addr.octets())),
#[cfg(not(feature = "proto-ipv6"))]
IpAddr::V6(_) => panic!("ipv6 support not enabled"),
};
let remote_endpoint = (addr, remote.port());
let mut socket = TcpConnection::new(&self.stack, self.tx, self.rx)?;
socket
.socket
.connect(remote_endpoint)
.await
.map_err(|_| Error::ConnectionReset)?;
Ok(socket)
}
}
}
pub struct TcpConnection<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> {
socket: TcpSocket<'d>,
tx: &'d BufferPool<TX_SZ, N>,
rx: &'d BufferPool<RX_SZ, N>,
txb: NonNull<[u8; TX_SZ]>,
rxb: NonNull<[u8; RX_SZ]>,
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpConnection<'d, N, TX_SZ, RX_SZ> {
fn new<D: Device>(
stack: &'d Stack<D>,
tx: &'d BufferPool<TX_SZ, N>,
rx: &'d BufferPool<RX_SZ, N>,
) -> Result<Self, Error> {
let mut txb = tx.alloc().ok_or(Error::ConnectionReset)?;
let mut rxb = rx.alloc().ok_or(Error::ConnectionReset)?;
Ok(Self {
socket: unsafe { TcpSocket::new(stack, rxb.as_mut(), txb.as_mut()) },
tx,
rx,
txb,
rxb,
})
}
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> Drop for TcpConnection<'d, N, TX_SZ, RX_SZ> {
fn drop(&mut self) {
unsafe {
self.socket.close();
self.rx.free(self.rxb);
self.tx.free(self.txb);
}
}
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::Io
for TcpConnection<'d, N, TX_SZ, RX_SZ>
{
type Error = Error;
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::asynch::Read
for TcpConnection<'d, N, TX_SZ, RX_SZ>
{
type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
where
Self: 'a;
fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
self.socket.read(buf)
}
}
impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::asynch::Write
for TcpConnection<'d, N, TX_SZ, RX_SZ>
{
type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
where
Self: 'a;
fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
self.socket.write(buf)
}
type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
where
Self: 'a;
fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
self.socket.flush()
}
}
pub type BufferPool<const BUFSZ: usize, const N: usize> = Pool<[u8; BUFSZ], N>;
pub struct Pool<T, const N: usize> {
used: [AtomicBool; N],
data: [UnsafeCell<MaybeUninit<T>>; N],
}
impl<T, const N: usize> Pool<T, N> {
const VALUE: AtomicBool = AtomicBool::new(false);
const UNINIT: UnsafeCell<MaybeUninit<T>> = UnsafeCell::new(MaybeUninit::uninit());
pub const fn new() -> Self {
Self {
used: [Self::VALUE; N],
data: [Self::UNINIT; N],
}
}
}
impl<T, const N: usize> Pool<T, N> {
fn alloc(&self) -> Option<NonNull<T>> {
for n in 0..N {
if self.used[n].swap(true, Ordering::SeqCst) == false {
let p = self.data[n].get() as *mut T;
return Some(unsafe { NonNull::new_unchecked(p) });
}
}
None
}
/// safety: p must be a pointer obtained from self.alloc that hasn't been freed yet.
unsafe fn free(&self, p: NonNull<T>) {
let origin = self.data.as_ptr() as *mut T;
let n = p.as_ptr().offset_from(origin);
assert!(n >= 0);
assert!((n as usize) < N);
self.used[n as usize].store(false, Ordering::SeqCst);
}
}
}

View File

@ -7,7 +7,7 @@ version = "0.1.0"
embassy-util = { version = "0.1.0", path = "../../embassy-util", features = ["defmt"] }
embassy-executor = { version = "0.1.0", path = "../../embassy-executor", features = ["defmt", "defmt-timestamp-uptime", "unstable-traits", "time-tick-32768hz"] }
embassy-stm32 = { version = "0.1.0", path = "../../embassy-stm32", features = ["nightly", "defmt", "stm32h743bi", "net", "time-driver-any", "exti", "unstable-pac", "unstable-traits"] }
embassy-net = { path = "../../embassy-net", features = ["defmt", "tcp", "dhcpv4", "medium-ethernet", "pool-16"] }
embassy-net = { path = "../../embassy-net", features = ["defmt", "tcp", "dhcpv4", "medium-ethernet", "pool-16", "unstable-traits"] }
embedded-io = { version = "0.3.0", features = ["async"] }
defmt = "0.3"