diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index dfb6a63ff..cef786143 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -26,6 +26,10 @@ jobs: - package: embassy target: thumbv6m-none-eabi features: defmt + - package: embassy-std-examples + target: x86_64-unknown-linux-gnu + - package: embassy-net-examples + target: x86_64-unknown-linux-gnu - package: embassy-nrf-examples target: thumbv7em-none-eabi - package: embassy-nrf diff --git a/.gitignore b/.gitignore index 893a87368..11781dc15 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ target Cargo.lock third_party -/Cargo.toml \ No newline at end of file +/Cargo.toml diff --git a/ci.sh b/ci.sh new file mode 100755 index 000000000..f43ea5e1a --- /dev/null +++ b/ci.sh @@ -0,0 +1,16 @@ +#!/bin/bash + +set -euxo pipefail + +# build for std +(cd embassy-net; cargo build --no-default-features --features log,medium-ethernet,tcp) +(cd embassy-net; cargo build --no-default-features --features log,medium-ethernet,tcp,dhcpv4) +(cd embassy-net; cargo build --no-default-features --features log,medium-ip,tcp) +(cd embassy-net; cargo build --no-default-features --features log,medium-ethernet,medium-ip,tcp,dhcpv4) + +# build for embedded +(cd embassy-net; cargo build --target thumbv7em-none-eabi --no-default-features --features log,medium-ethernet,medium-ip,tcp,dhcpv4) +(cd embassy-net; cargo build --target thumbv7em-none-eabi --no-default-features --features defmt,smoltcp/defmt,medium-ethernet,medium-ip,tcp,dhcpv4) + +# build examples +(cd embassy-net-examples; cargo build) diff --git a/embassy-net-examples/Cargo.toml b/embassy-net-examples/Cargo.toml new file mode 100644 index 000000000..413428bf6 --- /dev/null +++ b/embassy-net-examples/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "embassy-net-examples" +version = "0.1.0" +authors = ["Dario Nieuwenhuis "] +edition = "2018" + +[dependencies] +heapless = { version = "0.5.6", default-features = false } +embassy = { version = "0.1.0", path = "../embassy", features=["std", "log"] } +embassy-std = { version = "0.1.0", path = "../embassy-std" } +embassy-net = { version = "0.1.0", path = "../embassy-net", features=["std", "log", "medium-ethernet", "tcp", "dhcpv4"] } +env_logger = "0.8.2" +log = "0.4.11" +futures = "0.3.8" +libc = "0.2.81" +async-io = "1.3.1" +smoltcp = { git = "https://github.com/smoltcp-rs/smoltcp", rev="ec59aba5e10cf91df0c9253d9c2aca4dd143d2ff", default-features = false } +clap = { version = "3.0.0-beta.2", features = ["derive"] } +rand_core = { version = "0.6.0", features = ["std"] } diff --git a/embassy-net-examples/src/main.rs b/embassy-net-examples/src/main.rs new file mode 100644 index 000000000..d1c2658dd --- /dev/null +++ b/embassy-net-examples/src/main.rs @@ -0,0 +1,102 @@ +#![feature(type_alias_impl_trait)] +#![feature(min_type_alias_impl_trait)] +#![feature(impl_trait_in_bindings)] +#![allow(incomplete_features)] + +use clap::{AppSettings, Clap}; +use embassy::executor::Spawner; +use embassy::io::AsyncWriteExt; +use embassy::util::Forever; +use embassy_net::*; +use embassy_std::Executor; +use heapless::Vec; +use log::*; + +mod tuntap; + +use crate::tuntap::TunTapDevice; + +static DEVICE: Forever = Forever::new(); +static CONFIG: Forever = Forever::new(); + +#[derive(Clap)] +#[clap(version = "1.0")] +#[clap(setting = AppSettings::ColoredHelp)] +struct Opts { + /// TAP device name + #[clap(long, default_value = "tap0")] + tap: String, +} + +#[embassy::task] +async fn net_task() { + embassy_net::run().await +} + +#[embassy::task] +async fn main_task(spawner: Spawner) { + let opts: Opts = Opts::parse(); + + // Init network device + let device = TunTapDevice::new(&opts.tap).unwrap(); + + // Static IP configuration + let config = StaticConfigurator::new(Config { + address: Ipv4Cidr::new(Ipv4Address::new(192, 168, 69, 2), 24), + dns_servers: Vec::new(), + gateway: Some(Ipv4Address::new(192, 168, 69, 1)), + }); + + // DHCP configruation + let config = DhcpConfigurator::new(); + + // Init network stack + embassy_net::init(DEVICE.put(device), CONFIG.put(config)); + + // Launch network task + spawner.spawn(net_task()).unwrap(); + + // Then we can use it! + let mut rx_buffer = [0; 4096]; + let mut tx_buffer = [0; 4096]; + let mut socket = TcpSocket::new(&mut rx_buffer, &mut tx_buffer); + + socket.set_timeout(Some(embassy_net::SmolDuration::from_secs(10))); + + let remote_endpoint = (Ipv4Address::new(192, 168, 69, 74), 8000); + info!("connecting to {:?}...", remote_endpoint); + let r = socket.connect(remote_endpoint).await; + if let Err(e) = r { + warn!("connect error: {:?}", e); + return; + } + info!("connected!"); + loop { + let r = socket.write_all(b"Hello!\n").await; + if let Err(e) = r { + warn!("write error: {:?}", e); + return; + } + } +} + +#[no_mangle] +fn _embassy_rand(buf: &mut [u8]) { + use rand_core::{OsRng, RngCore}; + OsRng.fill_bytes(buf); +} + +static EXECUTOR: Forever = Forever::new(); + +fn main() { + env_logger::builder() + .filter_level(log::LevelFilter::Debug) + .filter_module("async_io", log::LevelFilter::Info) + .format_timestamp_nanos() + .init(); + + let executor = EXECUTOR.put(Executor::new()); + executor.run(|spawner| { + spawner.spawn(main_task(spawner)).unwrap(); + }); +} diff --git a/embassy-net-examples/src/tuntap.rs b/embassy-net-examples/src/tuntap.rs new file mode 100644 index 000000000..dd453deb3 --- /dev/null +++ b/embassy-net-examples/src/tuntap.rs @@ -0,0 +1,225 @@ +use async_io::Async; +use libc; +use log::*; +use smoltcp::wire::EthernetFrame; +use std::io; +use std::io::{Read, Write}; +use std::os::unix::io::{AsRawFd, RawFd}; + +pub const SIOCGIFMTU: libc::c_ulong = 0x8921; +pub const SIOCGIFINDEX: libc::c_ulong = 0x8933; +pub const ETH_P_ALL: libc::c_short = 0x0003; +pub const TUNSETIFF: libc::c_ulong = 0x400454CA; +pub const IFF_TUN: libc::c_int = 0x0001; +pub const IFF_TAP: libc::c_int = 0x0002; +pub const IFF_NO_PI: libc::c_int = 0x1000; + +#[repr(C)] +#[derive(Debug)] +struct ifreq { + ifr_name: [libc::c_char; libc::IF_NAMESIZE], + ifr_data: libc::c_int, /* ifr_ifindex or ifr_mtu */ +} + +fn ifreq_for(name: &str) -> ifreq { + let mut ifreq = ifreq { + ifr_name: [0; libc::IF_NAMESIZE], + ifr_data: 0, + }; + for (i, byte) in name.as_bytes().iter().enumerate() { + ifreq.ifr_name[i] = *byte as libc::c_char + } + ifreq +} + +fn ifreq_ioctl( + lower: libc::c_int, + ifreq: &mut ifreq, + cmd: libc::c_ulong, +) -> io::Result { + unsafe { + let res = libc::ioctl(lower, cmd as _, ifreq as *mut ifreq); + if res == -1 { + return Err(io::Error::last_os_error()); + } + } + + Ok(ifreq.ifr_data) +} + +#[derive(Debug)] +pub struct TunTap { + fd: libc::c_int, + ifreq: ifreq, + mtu: usize, +} + +impl AsRawFd for TunTap { + fn as_raw_fd(&self) -> RawFd { + self.fd + } +} + +impl TunTap { + pub fn new(name: &str) -> io::Result { + unsafe { + let fd = libc::open( + "/dev/net/tun\0".as_ptr() as *const libc::c_char, + libc::O_RDWR | libc::O_NONBLOCK, + ); + if fd == -1 { + return Err(io::Error::last_os_error()); + } + + let mut ifreq = ifreq_for(name); + ifreq.ifr_data = IFF_TAP | IFF_NO_PI; + ifreq_ioctl(fd, &mut ifreq, TUNSETIFF)?; + + let socket = libc::socket(libc::AF_INET, libc::SOCK_DGRAM, libc::IPPROTO_IP); + if socket == -1 { + return Err(io::Error::last_os_error()); + } + + let ip_mtu = ifreq_ioctl(socket, &mut ifreq, SIOCGIFMTU); + libc::close(socket); + let ip_mtu = ip_mtu? as usize; + + // SIOCGIFMTU returns the IP MTU (typically 1500 bytes.) + // smoltcp counts the entire Ethernet packet in the MTU, so add the Ethernet header size to it. + let mtu = ip_mtu + EthernetFrame::<&[u8]>::header_len(); + + Ok(TunTap { fd, mtu, ifreq }) + } + } +} + +impl Drop for TunTap { + fn drop(&mut self) { + unsafe { + libc::close(self.fd); + } + } +} + +impl io::Read for TunTap { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let len = unsafe { libc::read(self.fd, buf.as_mut_ptr() as *mut libc::c_void, buf.len()) }; + if len == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(len as usize) + } + } +} + +impl io::Write for TunTap { + fn write(&mut self, buf: &[u8]) -> io::Result { + let len = unsafe { libc::write(self.fd, buf.as_ptr() as *mut libc::c_void, buf.len()) }; + if len == -1 { + Err(io::Error::last_os_error()) + } else { + Ok(len as usize) + } + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +pub struct TunTapDevice { + device: Async, + waker: Option, +} + +impl TunTapDevice { + pub fn new(name: &str) -> io::Result { + Ok(Self { + device: Async::new(TunTap::new(name)?)?, + waker: None, + }) + } +} + +use core::task::Waker; +use embassy_net::{DeviceCapabilities, LinkState, Packet, PacketBox, PacketBoxExt, PacketBuf}; +use std::task::Context; + +impl crate::Device for TunTapDevice { + fn is_transmit_ready(&mut self) -> bool { + true + } + + fn transmit(&mut self, pkt: PacketBuf) { + // todo handle WouldBlock + match self.device.get_mut().write(&pkt) { + Ok(_) => {} + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + info!("transmit WouldBlock"); + } + Err(e) => panic!("transmit error: {:?}", e), + } + } + + fn receive(&mut self) -> Option { + let mut pkt = PacketBox::new(Packet::new()).unwrap(); + loop { + match self.device.get_mut().read(&mut pkt[..]) { + Ok(n) => { + return Some(pkt.slice(0..n)); + } + Err(e) if e.kind() == io::ErrorKind::WouldBlock => { + let ready = if let Some(w) = self.waker.as_ref() { + let mut cx = Context::from_waker(w); + let ready = self.device.poll_readable(&mut cx).is_ready(); + ready + } else { + false + }; + if !ready { + return None; + } + } + Err(e) => panic!("read error: {:?}", e), + } + } + } + + fn register_waker(&mut self, w: &Waker) { + match self.waker { + // Optimization: If both the old and new Wakers wake the same task, we can simply + // keep the old waker, skipping the clone. (In most executor implementations, + // cloning a waker is somewhat expensive, comparable to cloning an Arc). + Some(ref w2) if (w2.will_wake(w)) => {} + _ => { + // clone the new waker and store it + if let Some(old_waker) = core::mem::replace(&mut self.waker, Some(w.clone())) { + // We had a waker registered for another task. Wake it, so the other task can + // reregister itself if it's still interested. + // + // If two tasks are waiting on the same thing concurrently, this will cause them + // to wake each other in a loop fighting over this WakerRegistration. This wastes + // CPU but things will still work. + // + // If the user wants to have two tasks waiting on the same thing they should use + // a more appropriate primitive that can store multiple wakers. + old_waker.wake() + } + } + } + } + + fn capabilities(&mut self) -> DeviceCapabilities { + let mut caps = DeviceCapabilities::default(); + caps.max_transmission_unit = self.device.get_ref().mtu; + caps + } + + fn link_state(&mut self) -> LinkState { + LinkState::Up + } + + fn ethernet_address(&mut self) -> [u8; 6] { + [0x02, 0x03, 0x04, 0x05, 0x06, 0x07] + } +} diff --git a/embassy-net/Cargo.toml b/embassy-net/Cargo.toml new file mode 100644 index 000000000..30970c371 --- /dev/null +++ b/embassy-net/Cargo.toml @@ -0,0 +1,43 @@ +[package] +name = "embassy-net" +version = "0.1.0" +authors = ["Dario Nieuwenhuis "] +edition = "2018" + +[features] +std = [] +defmt-trace = [] +defmt-debug = [] +defmt-info = [] +defmt-warn = [] +defmt-error = [] + +tcp = ["smoltcp/socket-tcp"] +dhcpv4 = ["medium-ethernet", "smoltcp/socket-dhcpv4"] +medium-ethernet = ["smoltcp/medium-ethernet"] +medium-ip = ["smoltcp/medium-ip"] + +[dependencies] + +defmt = { version = "0.2.0", optional = true } +log = { version = "0.4.11", optional = true } + +embassy = { version = "0.1.0", path = "../embassy" } + +managed = { version = "0.8.0", default-features = false, features = [ "map" ]} +heapless = { version = "0.5.6", default-features = false } +as-slice = { version = "0.1.4" } +generic-array = { version = "0.14.4", default-features = false } +stable_deref_trait = { version = "1.2.0", default-features = false } +futures = { version = "0.3.5", default-features = false, features = [ "async-await" ]} +atomic-pool = "0.2.0" + +[dependencies.smoltcp] +git = "https://github.com/smoltcp-rs/smoltcp" +rev = "ec59aba5e10cf91df0c9253d9c2aca4dd143d2ff" +default-features = false +features = [ + "proto-ipv4", + "socket", + "async", +] diff --git a/embassy-net/README.md b/embassy-net/README.md new file mode 100644 index 000000000..64f656709 --- /dev/null +++ b/embassy-net/README.md @@ -0,0 +1,34 @@ +# embassy-net + +embassy-net contains an async network API based on smoltcp and embassy, designed +for embedded systems. + +## Running the example + +First, create the tap0 interface. You only need to do this once. + +```sh +sudo ip tuntap add name tap0 mode tap user $USER +sudo ip link set tap0 up +sudo ip addr add 192.168.69.100/24 dev tap0 +sudo ip -6 addr add fe80::100/64 dev tap0 +sudo ip -6 addr add fdaa::100/64 dev tap0 +sudo ip -6 route add fe80::/64 dev tap0 +sudo ip -6 route add fdaa::/64 dev tap0 +``` + +Then, run it + +```sh +cargo run --bin embassy-net-examples +``` + +## License + +This work is licensed under either of + +- Apache License, Version 2.0 ([LICENSE-APACHE](LICENSE-APACHE) or + http://www.apache.org/licenses/LICENSE-2.0) +- MIT license ([LICENSE-MIT](LICENSE-MIT) or http://opensource.org/licenses/MIT) + +at your option. diff --git a/embassy-net/src/config/dhcp.rs b/embassy-net/src/config/dhcp.rs new file mode 100644 index 000000000..f0c144321 --- /dev/null +++ b/embassy-net/src/config/dhcp.rs @@ -0,0 +1,59 @@ +use heapless::Vec; +use smoltcp::socket::{Dhcpv4Event, Dhcpv4Socket, SocketHandle}; +use smoltcp::time::Instant; + +use super::*; +use crate::device::LinkState; +use crate::fmt::*; +use crate::{Interface, SocketSet}; + +pub struct DhcpConfigurator { + handle: Option, +} + +impl DhcpConfigurator { + pub fn new() -> Self { + Self { handle: None } + } +} + +impl Configurator for DhcpConfigurator { + fn poll( + &mut self, + iface: &mut Interface, + sockets: &mut SocketSet, + _timestamp: Instant, + ) -> Event { + if self.handle.is_none() { + let handle = sockets.add(Dhcpv4Socket::new()); + self.handle = Some(handle) + } + + let mut socket = sockets.get::(self.handle.unwrap()); + + let link_up = iface.device_mut().device.link_state() == LinkState::Up; + if !link_up { + socket.reset(); + return Event::Deconfigured; + } + + match socket.poll() { + None => Event::NoChange, + Some(Dhcpv4Event::Deconfigured) => Event::Deconfigured, + Some(Dhcpv4Event::Configured(config)) => { + let mut dns_servers = Vec::new(); + for s in &config.dns_servers { + if let Some(addr) = s { + dns_servers.push(addr.clone()).unwrap(); + } + } + + Event::Configured(Config { + address: config.address, + gateway: config.router, + dns_servers, + }) + } + } + } +} diff --git a/embassy-net/src/config/mod.rs b/embassy-net/src/config/mod.rs new file mode 100644 index 000000000..16470f7e6 --- /dev/null +++ b/embassy-net/src/config/mod.rs @@ -0,0 +1,38 @@ +use heapless::consts::*; +use heapless::Vec; +use smoltcp::time::Instant; +use smoltcp::wire::{Ipv4Address, Ipv4Cidr}; + +use crate::fmt::*; +use crate::{Interface, SocketSet}; + +mod statik; +pub use statik::StaticConfigurator; + +#[cfg(feature = "dhcpv4")] +mod dhcp; +#[cfg(feature = "dhcpv4")] +pub use dhcp::DhcpConfigurator; + +/// Return value for the `Configurator::poll` function +#[derive(Debug, Clone)] +pub enum Event { + /// No change has occured to the configuration. + NoChange, + /// Configuration has been lost (for example, DHCP lease has expired) + Deconfigured, + /// Configuration has been newly acquired, or modified. + Configured(Config), +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Config { + pub address: Ipv4Cidr, + pub gateway: Option, + pub dns_servers: Vec, +} + +pub trait Configurator { + fn poll(&mut self, iface: &mut Interface, sockets: &mut SocketSet, timestamp: Instant) + -> Event; +} diff --git a/embassy-net/src/config/statik.rs b/embassy-net/src/config/statik.rs new file mode 100644 index 000000000..912143bff --- /dev/null +++ b/embassy-net/src/config/statik.rs @@ -0,0 +1,35 @@ +use smoltcp::time::Instant; + +use super::*; +use crate::fmt::*; +use crate::{Interface, SocketSet}; + +pub struct StaticConfigurator { + config: Config, + returned: bool, +} + +impl StaticConfigurator { + pub fn new(config: Config) -> Self { + Self { + config, + returned: false, + } + } +} + +impl Configurator for StaticConfigurator { + fn poll( + &mut self, + _iface: &mut Interface, + _sockets: &mut SocketSet, + _timestamp: Instant, + ) -> Event { + if self.returned { + Event::NoChange + } else { + self.returned = true; + Event::Configured(self.config.clone()) + } + } +} diff --git a/embassy-net/src/device.rs b/embassy-net/src/device.rs new file mode 100644 index 000000000..6c06b0605 --- /dev/null +++ b/embassy-net/src/device.rs @@ -0,0 +1,105 @@ +use core::task::Waker; +use smoltcp::phy::Device as SmolDevice; +use smoltcp::phy::DeviceCapabilities; +use smoltcp::time::Instant as SmolInstant; + +use crate::fmt::*; +use crate::packet_pool::PacketBoxExt; +use crate::Result; +use crate::{Packet, PacketBox, PacketBuf}; + +#[derive(PartialEq, Eq, Clone, Copy)] +pub enum LinkState { + Down, + Up, +} + +pub trait Device { + fn is_transmit_ready(&mut self) -> bool; + fn transmit(&mut self, pkt: PacketBuf); + fn receive(&mut self) -> Option; + + fn register_waker(&mut self, waker: &Waker); + fn capabilities(&mut self) -> DeviceCapabilities; + fn link_state(&mut self) -> LinkState; + fn ethernet_address(&mut self) -> [u8; 6]; +} + +pub struct DeviceAdapter { + pub device: &'static mut dyn Device, + caps: DeviceCapabilities, +} + +impl DeviceAdapter { + pub(crate) fn new(device: &'static mut dyn Device) -> Self { + Self { + caps: device.capabilities(), + device, + } + } +} + +impl<'a> SmolDevice<'a> for DeviceAdapter { + type RxToken = RxToken; + type TxToken = TxToken<'a>; + + fn receive(&'a mut self) -> Option<(Self::RxToken, Self::TxToken)> { + let rx_pkt = self.device.receive()?; + let tx_pkt = PacketBox::new(Packet::new()).unwrap(); // TODO: not sure about unwrap + let rx_token = RxToken { pkt: rx_pkt }; + let tx_token = TxToken { + device: self.device, + pkt: tx_pkt, + }; + + Some((rx_token, tx_token)) + } + + /// Construct a transmit token. + fn transmit(&'a mut self) -> Option { + if !self.device.is_transmit_ready() { + return None; + } + + let tx_pkt = PacketBox::new(Packet::new())?; + Some(TxToken { + device: self.device, + pkt: tx_pkt, + }) + } + + /// Get a description of device capabilities. + fn capabilities(&self) -> DeviceCapabilities { + self.caps.clone() + } +} + +pub struct RxToken { + pkt: PacketBuf, +} + +impl smoltcp::phy::RxToken for RxToken { + fn consume(mut self, _timestamp: SmolInstant, f: F) -> Result + where + F: FnOnce(&mut [u8]) -> Result, + { + f(&mut self.pkt) + } +} + +pub struct TxToken<'a> { + device: &'a mut dyn Device, + pkt: PacketBox, +} + +impl<'a> smoltcp::phy::TxToken for TxToken<'a> { + fn consume(self, _timestamp: SmolInstant, len: usize, f: F) -> Result + where + F: FnOnce(&mut [u8]) -> Result, + { + let mut buf = self.pkt.slice(0..len); + let r = f(&mut buf)?; + self.device.transmit(buf); + Ok(r) + } +} diff --git a/embassy-net/src/fmt.rs b/embassy-net/src/fmt.rs new file mode 100644 index 000000000..4da69766c --- /dev/null +++ b/embassy-net/src/fmt.rs @@ -0,0 +1,118 @@ +#![macro_use] + +#[cfg(all(feature = "defmt", feature = "log"))] +compile_error!("You may not enable both `defmt` and `log` features."); + +pub use fmt::*; + +#[cfg(feature = "defmt")] +mod fmt { + pub use defmt::{ + assert, assert_eq, assert_ne, debug, debug_assert, debug_assert_eq, debug_assert_ne, error, + info, panic, todo, trace, unreachable, unwrap, warn, + }; +} + +#[cfg(feature = "log")] +mod fmt { + pub use core::{ + assert, assert_eq, assert_ne, debug_assert, debug_assert_eq, debug_assert_ne, panic, todo, + unreachable, + }; + pub use log::{debug, error, info, trace, warn}; +} + +#[cfg(not(any(feature = "defmt", feature = "log")))] +mod fmt { + #![macro_use] + + pub use core::{ + assert, assert_eq, assert_ne, debug_assert, debug_assert_eq, debug_assert_ne, panic, todo, + unreachable, + }; + + #[macro_export] + macro_rules! trace { + ($($msg:expr),+ $(,)?) => { + () + }; + } + + #[macro_export] + macro_rules! debug { + ($($msg:expr),+ $(,)?) => { + () + }; + } + + #[macro_export] + macro_rules! info { + ($($msg:expr),+ $(,)?) => { + () + }; + } + + #[macro_export] + macro_rules! warn { + ($($msg:expr),+ $(,)?) => { + () + }; + } + + #[macro_export] + macro_rules! error { + ($($msg:expr),+ $(,)?) => { + () + }; + } +} + +#[cfg(not(feature = "defmt"))] +#[macro_export] +macro_rules! unwrap { + ($arg:expr) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {:?}", ::core::stringify!($arg), e); + } + } + }; + ($arg:expr, $($msg:expr),+ $(,)? ) => { + match $crate::fmt::Try::into_result($arg) { + ::core::result::Result::Ok(t) => t, + ::core::result::Result::Err(e) => { + ::core::panic!("unwrap of `{}` failed: {}: {:?}", ::core::stringify!($arg), ::core::format_args!($($msg,)*), e); + } + } + } +} + +#[derive(Debug, Copy, Clone, Eq, PartialEq)] +pub struct NoneError; + +pub trait Try { + type Ok; + type Error; + fn into_result(self) -> Result; +} + +impl Try for Option { + type Ok = T; + type Error = NoneError; + + #[inline] + fn into_result(self) -> Result { + self.ok_or(NoneError) + } +} + +impl Try for Result { + type Ok = T; + type Error = E; + + #[inline] + fn into_result(self) -> Self { + self + } +} diff --git a/embassy-net/src/lib.rs b/embassy-net/src/lib.rs new file mode 100644 index 000000000..88dcf0aa5 --- /dev/null +++ b/embassy-net/src/lib.rs @@ -0,0 +1,31 @@ +#![cfg_attr(not(feature = "std"), no_std)] + +// This mod MUST go first, so that the others see its macros. +pub(crate) mod fmt; + +mod config; +mod device; +mod packet_pool; +mod stack; + +#[cfg(feature = "dhcpv4")] +pub use config::DhcpConfigurator; +pub use config::{Config, Configurator, Event as ConfigEvent, StaticConfigurator}; + +pub use device::{Device, LinkState}; +pub use packet_pool::{Packet, PacketBox, PacketBoxExt, PacketBuf}; +pub use stack::{init, is_config_up, is_init, is_link_up, run}; + +#[cfg(feature = "tcp")] +mod tcp_socket; +#[cfg(feature = "tcp")] +pub use tcp_socket::TcpSocket; + +// smoltcp reexports +pub use smoltcp::phy::{DeviceCapabilities, Medium}; +pub use smoltcp::time::Duration as SmolDuration; +pub use smoltcp::time::Instant as SmolInstant; +pub use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr}; +pub type Interface = smoltcp::iface::Interface<'static, device::DeviceAdapter>; +pub type SocketSet = smoltcp::socket::SocketSet<'static>; +pub use smoltcp::{Error, Result}; diff --git a/embassy-net/src/packet_pool.rs b/embassy-net/src/packet_pool.rs new file mode 100644 index 000000000..2c27d4013 --- /dev/null +++ b/embassy-net/src/packet_pool.rs @@ -0,0 +1,92 @@ +use as_slice::{AsMutSlice, AsSlice}; +use core::ops::{Deref, DerefMut, Range}; + +use atomic_pool::{pool, Box}; + +pub const MTU: usize = 1514; +pub const PACKET_POOL_SIZE: usize = 4; + +pool!(pub PacketPool: [Packet; PACKET_POOL_SIZE]); +pub type PacketBox = Box; + +pub struct Packet(pub [u8; MTU]); + +impl Packet { + pub const fn new() -> Self { + Self([0; MTU]) + } +} + +pub trait PacketBoxExt { + fn slice(self, range: Range) -> PacketBuf; +} + +impl PacketBoxExt for PacketBox { + fn slice(self, range: Range) -> PacketBuf { + PacketBuf { + packet: self, + range, + } + } +} + +impl AsSlice for Packet { + type Element = u8; + + fn as_slice(&self) -> &[Self::Element] { + &self.deref()[..] + } +} + +impl AsMutSlice for Packet { + fn as_mut_slice(&mut self) -> &mut [Self::Element] { + &mut self.deref_mut()[..] + } +} + +impl Deref for Packet { + type Target = [u8; MTU]; + + fn deref(&self) -> &[u8; MTU] { + &self.0 + } +} + +impl DerefMut for Packet { + fn deref_mut(&mut self) -> &mut [u8; MTU] { + &mut self.0 + } +} + +pub struct PacketBuf { + packet: PacketBox, + range: Range, +} + +impl AsSlice for PacketBuf { + type Element = u8; + + fn as_slice(&self) -> &[Self::Element] { + &self.packet[self.range.clone()] + } +} + +impl AsMutSlice for PacketBuf { + fn as_mut_slice(&mut self) -> &mut [Self::Element] { + &mut self.packet[self.range.clone()] + } +} + +impl Deref for PacketBuf { + type Target = [u8]; + + fn deref(&self) -> &[u8] { + &self.packet[self.range.clone()] + } +} + +impl DerefMut for PacketBuf { + fn deref_mut(&mut self) -> &mut [u8] { + &mut self.packet[self.range.clone()] + } +} diff --git a/embassy-net/src/stack.rs b/embassy-net/src/stack.rs new file mode 100644 index 000000000..e436beb1e --- /dev/null +++ b/embassy-net/src/stack.rs @@ -0,0 +1,259 @@ +use core::cell::RefCell; +use core::future::Future; +use core::task::Context; +use core::task::Poll; +use embassy::time::{Instant, Timer}; +use embassy::util::ThreadModeMutex; +use embassy::util::{Forever, WakerRegistration}; +use futures::pin_mut; +use smoltcp::iface::InterfaceBuilder; +#[cfg(feature = "medium-ethernet")] +use smoltcp::iface::{Neighbor, NeighborCache, Route, Routes}; +use smoltcp::phy::Device as _; +use smoltcp::phy::Medium; +use smoltcp::socket::SocketSetItem; +use smoltcp::time::Instant as SmolInstant; +#[cfg(feature = "medium-ethernet")] +use smoltcp::wire::EthernetAddress; +use smoltcp::wire::{IpAddress, IpCidr, Ipv4Address, Ipv4Cidr}; + +use crate::config::Configurator; +use crate::config::Event; +use crate::device::{Device, DeviceAdapter, LinkState}; +use crate::fmt::*; +use crate::{Interface, SocketSet}; + +const ADDRESSES_LEN: usize = 1; +const NEIGHBOR_CACHE_LEN: usize = 8; +const SOCKETS_LEN: usize = 2; +const LOCAL_PORT_MIN: u16 = 1025; +const LOCAL_PORT_MAX: u16 = 65535; + +struct StackResources { + addresses: [IpCidr; ADDRESSES_LEN], + sockets: [Option>; SOCKETS_LEN], + + #[cfg(feature = "medium-ethernet")] + routes: [Option<(IpCidr, Route)>; 1], + #[cfg(feature = "medium-ethernet")] + neighbor_cache: [Option<(IpAddress, Neighbor)>; NEIGHBOR_CACHE_LEN], +} + +static STACK_RESOURCES: Forever = Forever::new(); +static STACK: ThreadModeMutex>> = ThreadModeMutex::new(RefCell::new(None)); + +pub(crate) struct Stack { + iface: Interface, + pub sockets: SocketSet, + link_up: bool, + config_up: bool, + next_local_port: u16, + configurator: &'static mut dyn Configurator, + waker: WakerRegistration, +} + +impl Stack { + pub(crate) fn with(f: impl FnOnce(&mut Stack) -> R) -> R { + let mut stack = STACK.borrow().borrow_mut(); + let stack = stack.as_mut().unwrap(); + f(stack) + } + + pub fn get_local_port(&mut self) -> u16 { + let res = self.next_local_port; + self.next_local_port = if res >= LOCAL_PORT_MAX { + LOCAL_PORT_MIN + } else { + res + 1 + }; + res + } + + pub(crate) fn wake(&mut self) { + self.waker.wake() + } + + fn poll_configurator(&mut self, timestamp: SmolInstant) { + let medium = self.iface.device().capabilities().medium; + + match self + .configurator + .poll(&mut self.iface, &mut self.sockets, timestamp) + { + Event::NoChange => {} + Event::Configured(config) => { + debug!("Acquired IP configuration:"); + + debug!(" IP address: {}", config.address); + set_ipv4_addr(&mut self.iface, config.address); + + #[cfg(feature = "medium-ethernet")] + if medium == Medium::Ethernet { + if let Some(gateway) = config.gateway { + debug!(" Default gateway: {}", gateway); + self.iface + .routes_mut() + .add_default_ipv4_route(gateway) + .unwrap(); + } else { + debug!(" Default gateway: None"); + self.iface.routes_mut().remove_default_ipv4_route(); + } + } + for (i, s) in config.dns_servers.iter().enumerate() { + debug!(" DNS server {}: {}", i, s); + } + + self.config_up = true; + } + Event::Deconfigured => { + debug!("Lost IP configuration"); + set_ipv4_addr(&mut self.iface, Ipv4Cidr::new(Ipv4Address::UNSPECIFIED, 0)); + #[cfg(feature = "medium-ethernet")] + if medium == Medium::Ethernet { + self.iface.routes_mut().remove_default_ipv4_route(); + } + self.config_up = false; + } + } + } + + fn poll(&mut self, cx: &mut Context<'_>) { + self.iface.device_mut().device.register_waker(cx.waker()); + self.waker.register(cx.waker()); + + let timestamp = instant_to_smoltcp(Instant::now()); + if let Err(_) = self.iface.poll(&mut self.sockets, timestamp) { + // If poll() returns error, it may not be done yet, so poll again later. + cx.waker().wake_by_ref(); + return; + } + + // Update link up + let old_link_up = self.link_up; + self.link_up = self.iface.device_mut().device.link_state() == LinkState::Up; + + // Print when changed + if old_link_up != self.link_up { + if self.link_up { + info!("Link up!"); + } else { + info!("Link down!"); + } + } + + if old_link_up || self.link_up { + self.poll_configurator(timestamp) + } + + if let Some(poll_at) = self.iface.poll_at(&mut self.sockets, timestamp) { + let t = Timer::at(instant_from_smoltcp(poll_at)); + pin_mut!(t); + if t.poll(cx).is_ready() { + cx.waker().wake_by_ref(); + } + } + } +} + +fn set_ipv4_addr(iface: &mut Interface, cidr: Ipv4Cidr) { + iface.update_ip_addrs(|addrs| { + let dest = addrs.iter_mut().next().unwrap(); + *dest = IpCidr::Ipv4(cidr); + }); +} + +/// Initialize embassy_net. +/// This function must be called from thread mode. +pub fn init(device: &'static mut dyn Device, configurator: &'static mut dyn Configurator) { + const NONE_SOCKET: Option> = None; + let res = STACK_RESOURCES.put(StackResources { + addresses: [IpCidr::new(Ipv4Address::UNSPECIFIED.into(), 32)], + sockets: [NONE_SOCKET; SOCKETS_LEN], + + #[cfg(feature = "medium-ethernet")] + routes: [None; 1], + #[cfg(feature = "medium-ethernet")] + neighbor_cache: [None; NEIGHBOR_CACHE_LEN], + }); + + let medium = device.capabilities().medium; + + #[cfg(feature = "medium-ethernet")] + let ethernet_addr = if medium == Medium::Ethernet { + device.ethernet_address() + } else { + [0, 0, 0, 0, 0, 0] + }; + + let mut b = InterfaceBuilder::new(DeviceAdapter::new(device)); + b = b.ip_addrs(&mut res.addresses[..]); + + #[cfg(feature = "medium-ethernet")] + if medium == Medium::Ethernet { + b = b.ethernet_addr(EthernetAddress(ethernet_addr)); + b = b.neighbor_cache(NeighborCache::new(&mut res.neighbor_cache[..])); + b = b.routes(Routes::new(&mut res.routes[..])); + } + + let iface = b.finalize(); + + let sockets = SocketSet::new(&mut res.sockets[..]); + + let local_port = loop { + let mut res = [0u8; 2]; + rand(&mut res); + let port = u16::from_le_bytes(res); + if port >= LOCAL_PORT_MIN && port <= LOCAL_PORT_MAX { + break port; + } + }; + + let stack = Stack { + iface, + sockets, + link_up: false, + config_up: false, + configurator, + next_local_port: local_port, + waker: WakerRegistration::new(), + }; + + *STACK.borrow().borrow_mut() = Some(stack); +} + +pub fn is_init() -> bool { + STACK.borrow().borrow().is_some() +} + +pub fn is_link_up() -> bool { + STACK.borrow().borrow().as_ref().unwrap().link_up +} + +pub fn is_config_up() -> bool { + STACK.borrow().borrow().as_ref().unwrap().config_up +} + +pub async fn run() { + futures::future::poll_fn(|cx| { + Stack::with(|stack| stack.poll(cx)); + Poll::<()>::Pending + }) + .await +} + +fn instant_to_smoltcp(instant: Instant) -> SmolInstant { + SmolInstant::from_millis(instant.as_millis() as i64) +} + +fn instant_from_smoltcp(instant: SmolInstant) -> Instant { + Instant::from_millis(instant.total_millis() as u64) +} + +extern "Rust" { + fn _embassy_rand(buf: &mut [u8]); +} + +fn rand(buf: &mut [u8]) { + unsafe { _embassy_rand(buf) } +} diff --git a/embassy-net/src/tcp_socket.rs b/embassy-net/src/tcp_socket.rs new file mode 100644 index 000000000..4f43bc611 --- /dev/null +++ b/embassy-net/src/tcp_socket.rs @@ -0,0 +1,177 @@ +use core::marker::PhantomData; +use core::mem; +use core::pin::Pin; +use core::task::{Context, Poll}; +use embassy::io; +use embassy::io::{AsyncBufRead, AsyncWrite}; +use smoltcp::socket::SocketHandle; +use smoltcp::socket::TcpSocket as SyncTcpSocket; +use smoltcp::socket::{TcpSocketBuffer, TcpState}; +use smoltcp::time::Duration; +use smoltcp::wire::IpEndpoint; + +use super::stack::Stack; +use crate::fmt::*; +use crate::{Error, Result}; + +pub struct TcpSocket<'a> { + handle: SocketHandle, + ghost: PhantomData<&'a mut [u8]>, +} + +impl<'a> Unpin for TcpSocket<'a> {} + +impl<'a> TcpSocket<'a> { + pub fn new(rx_buffer: &'a mut [u8], tx_buffer: &'a mut [u8]) -> Self { + let handle = Stack::with(|stack| { + let rx_buffer: &'static mut [u8] = unsafe { mem::transmute(rx_buffer) }; + let tx_buffer: &'static mut [u8] = unsafe { mem::transmute(tx_buffer) }; + stack.sockets.add(SyncTcpSocket::new( + TcpSocketBuffer::new(rx_buffer), + TcpSocketBuffer::new(tx_buffer), + )) + }); + + Self { + handle, + ghost: PhantomData, + } + } + + pub async fn connect(&mut self, remote_endpoint: T) -> Result<()> + where + T: Into, + { + let local_port = Stack::with(|stack| stack.get_local_port()); + self.with(|s| s.connect(remote_endpoint, local_port))?; + + futures::future::poll_fn(|cx| { + self.with(|s| match s.state() { + TcpState::Closed | TcpState::TimeWait => Poll::Ready(Err(Error::Unaddressable)), + TcpState::Listen => Poll::Ready(Err(Error::Illegal)), + TcpState::SynSent | TcpState::SynReceived => { + s.register_send_waker(cx.waker()); + Poll::Pending + } + _ => Poll::Ready(Ok(())), + }) + }) + .await + } + + pub fn set_timeout(&mut self, duration: Option) { + self.with(|s| s.set_timeout(duration)) + } + + pub fn set_keep_alive(&mut self, interval: Option) { + self.with(|s| s.set_keep_alive(interval)) + } + + pub fn set_hop_limit(&mut self, hop_limit: Option) { + self.with(|s| s.set_hop_limit(hop_limit)) + } + + pub fn local_endpoint(&self) -> IpEndpoint { + self.with(|s| s.local_endpoint()) + } + + pub fn remote_endpoint(&self) -> IpEndpoint { + self.with(|s| s.remote_endpoint()) + } + + pub fn state(&self) -> TcpState { + self.with(|s| s.state()) + } + + pub fn close(&mut self) { + self.with(|s| s.close()) + } + + pub fn abort(&mut self) { + self.with(|s| s.abort()) + } + + pub fn may_send(&self) -> bool { + self.with(|s| s.may_send()) + } + + pub fn may_recv(&self) -> bool { + self.with(|s| s.may_recv()) + } + + fn with(&self, f: impl FnOnce(&mut SyncTcpSocket) -> R) -> R { + Stack::with(|stack| { + let res = { + let mut s = stack.sockets.get::(self.handle); + f(&mut *s) + }; + stack.wake(); + res + }) + } +} + +fn to_ioerr(_err: Error) -> io::Error { + // todo + io::Error::Other +} + +impl<'a> Drop for TcpSocket<'a> { + fn drop(&mut self) { + Stack::with(|stack| { + stack.sockets.remove(self.handle); + }) + } +} + +impl<'a> AsyncBufRead for TcpSocket<'a> { + fn poll_fill_buf<'z>( + self: Pin<&'z mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { + self.with(|socket| match socket.peek(1 << 30) { + // No data ready + Ok(buf) if buf.len() == 0 => { + socket.register_recv_waker(cx.waker()); + Poll::Pending + } + // Data ready! + Ok(buf) => { + // Safety: + // - User can't touch the inner TcpSocket directly at all. + // - The socket itself won't touch these bytes until consume() is called, which + // requires the user to release this borrow. + let buf: &'z [u8] = unsafe { core::mem::transmute(&*buf) }; + Poll::Ready(Ok(buf)) + } + // EOF + Err(Error::Finished) => Poll::Ready(Ok(&[][..])), + // Error + Err(e) => Poll::Ready(Err(to_ioerr(e))), + }) + } + + fn consume(self: Pin<&mut Self>, amt: usize) { + self.with(|s| s.recv(|_| (amt, ()))).unwrap() + } +} + +impl<'a> AsyncWrite for TcpSocket<'a> { + fn poll_write( + self: Pin<&mut Self>, + cx: &mut Context<'_>, + buf: &[u8], + ) -> Poll> { + self.with(|s| match s.send_slice(buf) { + // Not ready to send (no space in the tx buffer) + Ok(0) => { + s.register_send_waker(cx.waker()); + Poll::Pending + } + // Some data sent + Ok(n) => Poll::Ready(Ok(n)), + // Error + Err(e) => Poll::Ready(Err(to_ioerr(e))), + }) + } +}