From c2404ee8ca6200d9037f096eee3f0eab98711778 Mon Sep 17 00:00:00 2001 From: ivmarkov Date: Tue, 6 Sep 2022 21:39:23 +0300 Subject: [PATCH] Initial generic timer queue impl --- embassy-time/Cargo.toml | 5 + embassy-time/src/lib.rs | 1 + embassy-time/src/queue.rs | 197 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 203 insertions(+) create mode 100644 embassy-time/src/queue.rs diff --git a/embassy-time/Cargo.toml b/embassy-time/Cargo.toml index c51a71d01..0e3391d1f 100644 --- a/embassy-time/Cargo.toml +++ b/embassy-time/Cargo.toml @@ -26,6 +26,9 @@ unstable-traits = ["embedded-hal-1"] # To use this you must have a time driver provided. defmt-timestamp-uptime = ["defmt"] +# TODO: Doc +generic-queue = [] + # Set the `embassy_time` tick rate. # # At most 1 `tick-*` feature can be enabled. If none is enabled, a default of 1MHz is used. @@ -111,9 +114,11 @@ embedded-hal-async = { version = "=0.1.0-alpha.2", optional = true} futures-util = { version = "0.3.17", default-features = false } embassy-macros = { version = "0.1.0", path = "../embassy-macros"} +embassy-sync = { version = "0.1", path = "../embassy-sync" } atomic-polyfill = "1.0.1" critical-section = "1.1" cfg-if = "1.0.0" +heapless = "0.7" # WASM dependencies wasm-bindgen = { version = "0.2.81", optional = true } diff --git a/embassy-time/src/lib.rs b/embassy-time/src/lib.rs index 4edc883fe..0457a6571 100644 --- a/embassy-time/src/lib.rs +++ b/embassy-time/src/lib.rs @@ -11,6 +11,7 @@ mod delay; pub mod driver; mod duration; mod instant; +pub mod queue; mod tick; mod timer; diff --git a/embassy-time/src/queue.rs b/embassy-time/src/queue.rs new file mode 100644 index 000000000..7e84090b1 --- /dev/null +++ b/embassy-time/src/queue.rs @@ -0,0 +1,197 @@ +//! Generic timer queue implementation +use core::cell::RefCell; +use core::cmp::Ordering; +use core::task::Waker; + +use atomic_polyfill::{AtomicBool, Ordering as AtomicOrdering}; +use embassy_sync::blocking_mutex::raw::{CriticalSectionRawMutex, RawMutex}; +use embassy_sync::blocking_mutex::Mutex; +use heapless::sorted_linked_list::{LinkedIndexU8, Min, SortedLinkedList}; + +use crate::driver::{allocate_alarm, set_alarm, set_alarm_callback, AlarmHandle}; +use crate::Instant; + +#[derive(Debug)] +struct Timer { + at: Instant, + waker: Waker, +} + +impl PartialEq for Timer { + fn eq(&self, other: &Self) -> bool { + self.at == other.at + } +} + +impl Eq for Timer {} + +impl PartialOrd for Timer { + fn partial_cmp(&self, other: &Self) -> Option { + self.at.partial_cmp(&other.at) + } +} + +impl Ord for Timer { + fn cmp(&self, other: &Self) -> Ordering { + self.at.cmp(&other.at) + } +} + +struct InnerQueue { + queue: SortedLinkedList, + alarm_at: Instant, + alarm: Option, +} + +impl InnerQueue { + const fn new() -> Self { + Self { + queue: SortedLinkedList::new_u8(), + alarm_at: Instant::MAX, + alarm: None, + } + } + + fn schedule(&mut self, at: Instant, waker: &Waker) { + self.queue + .find_mut(|timer| timer.waker.will_wake(waker)) + .map(|mut timer| { + timer.waker = waker.clone(); + timer.at = at; + + timer.finish(); + }) + .unwrap_or_else(|| { + let mut timer = Timer { + waker: waker.clone(), + at, + }; + + loop { + match self.queue.push(timer) { + Ok(()) => break, + Err(e) => timer = e, + } + + self.queue.pop().unwrap().waker.wake(); + } + }); + + // Don't wait for the alarm callback to trigger and directly + // dispatch all timers that are already due + // + // Then update the alarm if necessary + self.dispatch(); + } + + fn dispatch(&mut self) { + let now = Instant::now(); + + while self.queue.peek().filter(|timer| timer.at <= now).is_some() { + self.queue.pop().unwrap().waker.wake(); + } + + self.update_alarm(); + } + + fn update_alarm(&mut self) { + if let Some(timer) = self.queue.peek() { + let new_at = timer.at; + + if self.alarm_at != new_at { + self.alarm_at = new_at; + set_alarm(self.alarm.unwrap(), new_at.as_ticks()); + } + } else { + self.alarm_at = Instant::MAX; + } + } + + fn handle_alarm(&mut self) { + self.alarm_at = Instant::MAX; + + self.dispatch(); + } +} + +/// TODO: Doc +pub struct Queue { + initialized: AtomicBool, + inner: Mutex>>, +} + +impl Queue { + /// TODO: Doc + pub const fn new() -> Self { + Self { + initialized: AtomicBool::new(false), + inner: Mutex::new(RefCell::new(InnerQueue::::new())), + } + } + + /// TODO: Doc + pub unsafe fn initialize(&'static self) { + if self.initialized.load(AtomicOrdering::SeqCst) { + panic!("Queue already initialized"); + } + + let handle = allocate_alarm().unwrap(); + self.inner.lock(|inner| inner.borrow_mut().alarm = Some(handle)); + + set_alarm_callback(handle, Self::handle_alarm, self as *const _ as _); + + self.initialized.store(true, AtomicOrdering::SeqCst); + } + + /// TODO: Doc + pub fn schedule(&'static self, at: Instant, waker: &Waker) { + self.check_initialized(); + + self.inner.lock(|inner| inner.borrow_mut().schedule(at, waker)); + } + + fn check_initialized(&self) { + if !self.initialized.load(AtomicOrdering::SeqCst) { + panic!("Queue is not initialized"); + } + } + + fn handle_alarm(ctx: *mut ()) { + let this = unsafe { (ctx as *const Self).as_ref().unwrap() }; + + this.check_initialized(); + this.inner.lock(|inner| inner.borrow_mut().handle_alarm()); + } +} + +/// TODO: Doc +pub unsafe fn initialize() { + extern "Rust" { + fn _embassy_time_generic_queue_initialize(); + } + + _embassy_time_generic_queue_initialize(); +} + +/// TODO: Doc +#[macro_export] +macro_rules! generic_queue { + (static $name:ident: $t: ty = $val:expr) => { + static $name: $t = $val; + + #[no_mangle] + fn _embassy_time_generic_queue_initialize() { + unsafe { + $crate::queue::Queue::initialize(&$name); + } + } + + #[no_mangle] + fn _embassy_time_schedule_wake(at: $crate::Instant, waker: &core::task::Waker) { + $crate::queue::Queue::schedule(&$name, at, waker); + } + }; +} + +#[cfg(feature = "generic-queue")] +generic_queue!(static QUEUE: Queue = Queue::new());