From bef9b7a8539c3dddb1cf6ab46db161f1ca56b1a1 Mon Sep 17 00:00:00 2001 From: Dario Nieuwenhuis Date: Tue, 14 Nov 2023 22:32:48 +0100 Subject: [PATCH] executor: remove atomic-polyfill. --- embassy-executor/CHANGELOG.md | 5 + embassy-executor/Cargo.toml | 7 +- embassy-executor/src/arch/cortex_m.rs | 21 ++--- embassy-executor/src/arch/riscv32.rs | 2 +- embassy-executor/src/raw/mod.rs | 58 +++--------- .../{run_queue.rs => run_queue_atomics.rs} | 3 +- .../src/raw/run_queue_critical_section.rs | 75 +++++++++++++++ embassy-executor/src/raw/state_atomics.rs | 73 +++++++++++++++ .../src/raw/state_critical_section.rs | 93 +++++++++++++++++++ embassy-executor/src/raw/timer_queue.rs | 10 +- 10 files changed, 280 insertions(+), 67 deletions(-) rename embassy-executor/src/raw/{run_queue.rs => run_queue_atomics.rs} (98%) create mode 100644 embassy-executor/src/raw/run_queue_critical_section.rs create mode 100644 embassy-executor/src/raw/state_atomics.rs create mode 100644 embassy-executor/src/raw/state_critical_section.rs diff --git a/embassy-executor/CHANGELOG.md b/embassy-executor/CHANGELOG.md index e7914cbc0..f45b27700 100644 --- a/embassy-executor/CHANGELOG.md +++ b/embassy-executor/CHANGELOG.md @@ -5,6 +5,11 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +- Add `main` macro reexport for Xtensa arch. +- Remove use of `atomic-polyfill`. The executor now has multiple implementations of its internal data structures for cases where the target supports atomics or doesn't. + ## 0.3.2 - 2023-11-06 - Use `atomic-polyfill` for `riscv32` diff --git a/embassy-executor/Cargo.toml b/embassy-executor/Cargo.toml index a85c5dffe..4945455d3 100644 --- a/embassy-executor/Cargo.toml +++ b/embassy-executor/Cargo.toml @@ -34,7 +34,7 @@ _arch = [] # some arch was picked arch-std = ["_arch", "critical-section/std"] arch-cortex-m = ["_arch", "dep:cortex-m"] arch-xtensa = ["_arch"] -arch-riscv32 = ["_arch"] +arch-riscv32 = ["_arch", "dep:portable-atomic"] arch-wasm = ["_arch", "dep:wasm-bindgen", "dep:js-sys"] # Enable the thread-mode executor (using WFE/SEV in Cortex-M, WFI in other embedded archs) @@ -59,9 +59,12 @@ rtos-trace = { version = "0.1.2", optional = true } embassy-macros = { version = "0.2.1", path = "../embassy-macros" } embassy-time = { version = "0.1.5", path = "../embassy-time", optional = true} -atomic-polyfill = "1.0.1" critical-section = "1.1" +# needed for riscv +# remove when https://github.com/rust-lang/rust/pull/114499 is merged +portable-atomic = { version = "1.5", optional = true } + # arch-cortex-m dependencies cortex-m = { version = "0.7.6", optional = true } diff --git a/embassy-executor/src/arch/cortex_m.rs b/embassy-executor/src/arch/cortex_m.rs index fde862f3c..55299c94f 100644 --- a/embassy-executor/src/arch/cortex_m.rs +++ b/embassy-executor/src/arch/cortex_m.rs @@ -115,12 +115,12 @@ mod thread { pub use interrupt::*; #[cfg(feature = "executor-interrupt")] mod interrupt { - use core::cell::UnsafeCell; + use core::cell::{Cell, UnsafeCell}; use core::mem::MaybeUninit; - use atomic_polyfill::{AtomicBool, Ordering}; use cortex_m::interrupt::InterruptNumber; use cortex_m::peripheral::NVIC; + use critical_section::Mutex; use crate::raw; @@ -146,7 +146,7 @@ mod interrupt { /// It is somewhat more complex to use, it's recommended to use the thread-mode /// [`Executor`] instead, if it works for your use case. pub struct InterruptExecutor { - started: AtomicBool, + started: Mutex>, executor: UnsafeCell>, } @@ -158,7 +158,7 @@ mod interrupt { #[inline] pub const fn new() -> Self { Self { - started: AtomicBool::new(false), + started: Mutex::new(Cell::new(false)), executor: UnsafeCell::new(MaybeUninit::uninit()), } } @@ -167,7 +167,8 @@ mod interrupt { /// /// # Safety /// - /// You MUST call this from the interrupt handler, and from nowhere else. + /// - You MUST call this from the interrupt handler, and from nowhere else. + /// - You must not call this before calling `start()`. pub unsafe fn on_interrupt(&'static self) { let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; executor.poll(); @@ -196,11 +197,7 @@ mod interrupt { /// do it after. /// pub fn start(&'static self, irq: impl InterruptNumber) -> crate::SendSpawner { - if self - .started - .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed) - .is_err() - { + if critical_section::with(|cs| self.started.borrow(cs).replace(true)) { panic!("InterruptExecutor::start() called multiple times on the same executor."); } @@ -222,10 +219,10 @@ mod interrupt { /// This returns a [`SendSpawner`] you can use to spawn tasks on this /// executor. /// - /// This MUST only be called on an executor that has already been spawned. + /// This MUST only be called on an executor that has already been started. /// The function will panic otherwise. pub fn spawner(&'static self) -> crate::SendSpawner { - if !self.started.load(Ordering::Acquire) { + if !critical_section::with(|cs| self.started.borrow(cs).get()) { panic!("InterruptExecutor::spawner() called on uninitialized executor."); } let executor = unsafe { (&*self.executor.get()).assume_init_ref() }; diff --git a/embassy-executor/src/arch/riscv32.rs b/embassy-executor/src/arch/riscv32.rs index e5c0ff2ec..6814e7844 100644 --- a/embassy-executor/src/arch/riscv32.rs +++ b/embassy-executor/src/arch/riscv32.rs @@ -7,9 +7,9 @@ pub use thread::*; mod thread { use core::marker::PhantomData; - use atomic_polyfill::{AtomicBool, Ordering}; #[cfg(feature = "nightly")] pub use embassy_macros::main_riscv as main; + use portable_atomic::{AtomicBool, Ordering}; use crate::{raw, Spawner}; diff --git a/embassy-executor/src/raw/mod.rs b/embassy-executor/src/raw/mod.rs index 6d2c1c18a..ed0bedd25 100644 --- a/embassy-executor/src/raw/mod.rs +++ b/embassy-executor/src/raw/mod.rs @@ -7,7 +7,14 @@ //! Using this module requires respecting subtle safety contracts. If you can, prefer using the safe //! [executor wrappers](crate::Executor) and the [`embassy_executor::task`](embassy_macros::task) macro, which are fully safe. +#[cfg_attr(target_has_atomic = "ptr", path = "run_queue_atomics.rs")] +#[cfg_attr(not(target_has_atomic = "ptr"), path = "run_queue_critical_section.rs")] mod run_queue; + +#[cfg_attr(target_has_atomic = "8", path = "state_atomics.rs")] +#[cfg_attr(not(target_has_atomic = "8"), path = "state_critical_section.rs")] +mod state; + #[cfg(feature = "integrated-timers")] mod timer_queue; pub(crate) mod util; @@ -21,7 +28,6 @@ use core::pin::Pin; use core::ptr::NonNull; use core::task::{Context, Poll}; -use atomic_polyfill::{AtomicU32, Ordering}; #[cfg(feature = "integrated-timers")] use embassy_time::driver::{self, AlarmHandle}; #[cfg(feature = "integrated-timers")] @@ -30,21 +36,14 @@ use embassy_time::Instant; use rtos_trace::trace; use self::run_queue::{RunQueue, RunQueueItem}; +use self::state::State; use self::util::{SyncUnsafeCell, UninitCell}; pub use self::waker::task_from_waker; use super::SpawnToken; -/// Task is spawned (has a future) -pub(crate) const STATE_SPAWNED: u32 = 1 << 0; -/// Task is in the executor run queue -pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; -/// Task is in the executor timer queue -#[cfg(feature = "integrated-timers")] -pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; - /// Raw task header for use in task pointers. pub(crate) struct TaskHeader { - pub(crate) state: AtomicU32, + pub(crate) state: State, pub(crate) run_queue_item: RunQueueItem, pub(crate) executor: SyncUnsafeCell>, poll_fn: SyncUnsafeCell>, @@ -116,7 +115,7 @@ impl TaskStorage { pub const fn new() -> Self { Self { raw: TaskHeader { - state: AtomicU32::new(0), + state: State::new(), run_queue_item: RunQueueItem::new(), executor: SyncUnsafeCell::new(None), // Note: this is lazily initialized so that a static `TaskStorage` will go in `.bss` @@ -161,7 +160,7 @@ impl TaskStorage { match future.poll(&mut cx) { Poll::Ready(_) => { this.future.drop_in_place(); - this.raw.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + this.raw.state.despawn(); #[cfg(feature = "integrated-timers")] this.raw.expires_at.set(Instant::MAX); @@ -193,11 +192,7 @@ impl AvailableTask { /// /// This function returns `None` if a task has already been spawned and has not finished running. pub fn claim(task: &'static TaskStorage) -> Option { - task.raw - .state - .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) - .ok() - .map(|_| Self { task }) + task.raw.state.spawn().then(|| Self { task }) } fn initialize_impl(self, future: impl FnOnce() -> F) -> SpawnToken { @@ -394,8 +389,7 @@ impl SyncExecutor { #[cfg(feature = "integrated-timers")] task.expires_at.set(Instant::MAX); - let state = task.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); - if state & STATE_SPAWNED == 0 { + if !task.state.run_dequeue() { // If task is not running, ignore it. This can happen in the following scenario: // - Task gets dequeued, poll starts // - While task is being polled, it gets woken. It gets placed in the queue. @@ -546,18 +540,7 @@ impl Executor { /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. pub fn wake_task(task: TaskRef) { let header = task.header(); - - let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { - // If already scheduled, or if not started, - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - None - } else { - // Mark it as scheduled - Some(state | STATE_RUN_QUEUED) - } - }); - - if res.is_ok() { + if header.state.run_enqueue() { // We have just marked the task as scheduled, so enqueue it. unsafe { let executor = header.executor.get().unwrap_unchecked(); @@ -571,18 +554,7 @@ pub fn wake_task(task: TaskRef) { /// You can obtain a `TaskRef` from a `Waker` using [`task_from_waker`]. pub fn wake_task_no_pend(task: TaskRef) { let header = task.header(); - - let res = header.state.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { - // If already scheduled, or if not started, - if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { - None - } else { - // Mark it as scheduled - Some(state | STATE_RUN_QUEUED) - } - }); - - if res.is_ok() { + if header.state.run_enqueue() { // We have just marked the task as scheduled, so enqueue it. unsafe { let executor = header.executor.get().unwrap_unchecked(); diff --git a/embassy-executor/src/raw/run_queue.rs b/embassy-executor/src/raw/run_queue_atomics.rs similarity index 98% rename from embassy-executor/src/raw/run_queue.rs rename to embassy-executor/src/raw/run_queue_atomics.rs index f1ec19ac1..90907cfda 100644 --- a/embassy-executor/src/raw/run_queue.rs +++ b/embassy-executor/src/raw/run_queue_atomics.rs @@ -1,7 +1,6 @@ use core::ptr; use core::ptr::NonNull; - -use atomic_polyfill::{AtomicPtr, Ordering}; +use core::sync::atomic::{AtomicPtr, Ordering}; use super::{TaskHeader, TaskRef}; use crate::raw::util::SyncUnsafeCell; diff --git a/embassy-executor/src/raw/run_queue_critical_section.rs b/embassy-executor/src/raw/run_queue_critical_section.rs new file mode 100644 index 000000000..ba59c8f29 --- /dev/null +++ b/embassy-executor/src/raw/run_queue_critical_section.rs @@ -0,0 +1,75 @@ +use core::cell::Cell; + +use critical_section::{CriticalSection, Mutex}; + +use super::TaskRef; + +pub(crate) struct RunQueueItem { + next: Mutex>>, +} + +impl RunQueueItem { + pub const fn new() -> Self { + Self { + next: Mutex::new(Cell::new(None)), + } + } +} + +/// Atomic task queue using a very, very simple lock-free linked-list queue: +/// +/// To enqueue a task, task.next is set to the old head, and head is atomically set to task. +/// +/// Dequeuing is done in batches: the queue is emptied by atomically replacing head with +/// null. Then the batch is iterated following the next pointers until null is reached. +/// +/// Note that batches will be iterated in the reverse order as they were enqueued. This is OK +/// for our purposes: it can't create fairness problems since the next batch won't run until the +/// current batch is completely processed, so even if a task enqueues itself instantly (for example +/// by waking its own waker) can't prevent other tasks from running. +pub(crate) struct RunQueue { + head: Mutex>>, +} + +impl RunQueue { + pub const fn new() -> Self { + Self { + head: Mutex::new(Cell::new(None)), + } + } + + /// Enqueues an item. Returns true if the queue was empty. + /// + /// # Safety + /// + /// `item` must NOT be already enqueued in any queue. + #[inline(always)] + pub(crate) unsafe fn enqueue(&self, task: TaskRef) -> bool { + critical_section::with(|cs| { + let prev = self.head.borrow(cs).replace(Some(task)); + task.header().run_queue_item.next.borrow(cs).set(prev); + + prev.is_none() + }) + } + + /// Empty the queue, then call `on_task` for each task that was in the queue. + /// NOTE: It is OK for `on_task` to enqueue more tasks. In this case they're left in the queue + /// and will be processed by the *next* call to `dequeue_all`, *not* the current one. + pub(crate) fn dequeue_all(&self, on_task: impl Fn(TaskRef)) { + // Atomically empty the queue. + let mut next = critical_section::with(|cs| self.head.borrow(cs).take()); + + // Iterate the linked list of tasks that were previously in the queue. + while let Some(task) = next { + // If the task re-enqueues itself, the `next` pointer will get overwritten. + // Therefore, first read the next pointer, and only then process the task. + + // safety: we know if the task is enqueued, no one else will touch the `next` pointer. + let cs = unsafe { CriticalSection::new() }; + next = task.header().run_queue_item.next.borrow(cs).get(); + + on_task(task); + } + } +} diff --git a/embassy-executor/src/raw/state_atomics.rs b/embassy-executor/src/raw/state_atomics.rs new file mode 100644 index 000000000..e1279ac0b --- /dev/null +++ b/embassy-executor/src/raw/state_atomics.rs @@ -0,0 +1,73 @@ +use core::sync::atomic::{AtomicU32, Ordering}; + +/// Task is spawned (has a future) +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +/// Task is in the executor run queue +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; + +pub(crate) struct State { + state: AtomicU32, +} + +impl State { + pub const fn new() -> State { + Self { + state: AtomicU32::new(0), + } + } + + /// If task is idle, mark it as spawned + run_queued and return true. + #[inline(always)] + pub fn spawn(&self) -> bool { + self.state + .compare_exchange(0, STATE_SPAWNED | STATE_RUN_QUEUED, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + } + + /// Unmark the task as spawned. + #[inline(always)] + pub fn despawn(&self) { + self.state.fetch_and(!STATE_SPAWNED, Ordering::AcqRel); + } + + /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. + #[inline(always)] + pub fn run_enqueue(&self) -> bool { + self.state + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |state| { + // If already scheduled, or if not started, + if (state & STATE_RUN_QUEUED != 0) || (state & STATE_SPAWNED == 0) { + None + } else { + // Mark it as scheduled + Some(state | STATE_RUN_QUEUED) + } + }) + .is_ok() + } + + /// Unmark the task as run-queued. Return whether the task is spawned. + #[inline(always)] + pub fn run_dequeue(&self) -> bool { + let state = self.state.fetch_and(!STATE_RUN_QUEUED, Ordering::AcqRel); + state & STATE_SPAWNED != 0 + } + + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> bool { + let old_state = self.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); + old_state & STATE_TIMER_QUEUED == 0 + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); + } +} diff --git a/embassy-executor/src/raw/state_critical_section.rs b/embassy-executor/src/raw/state_critical_section.rs new file mode 100644 index 000000000..c3cc1b0b7 --- /dev/null +++ b/embassy-executor/src/raw/state_critical_section.rs @@ -0,0 +1,93 @@ +use core::cell::Cell; + +use critical_section::Mutex; + +/// Task is spawned (has a future) +pub(crate) const STATE_SPAWNED: u32 = 1 << 0; +/// Task is in the executor run queue +pub(crate) const STATE_RUN_QUEUED: u32 = 1 << 1; +/// Task is in the executor timer queue +#[cfg(feature = "integrated-timers")] +pub(crate) const STATE_TIMER_QUEUED: u32 = 1 << 2; + +pub(crate) struct State { + state: Mutex>, +} + +impl State { + pub const fn new() -> State { + Self { + state: Mutex::new(Cell::new(0)), + } + } + + fn update(&self, f: impl FnOnce(&mut u32) -> R) -> R { + critical_section::with(|cs| { + let s = self.state.borrow(cs); + let mut val = s.get(); + let r = f(&mut val); + s.set(val); + r + }) + } + + /// If task is idle, mark it as spawned + run_queued and return true. + #[inline(always)] + pub fn spawn(&self) -> bool { + self.update(|s| { + if *s == 0 { + *s = STATE_SPAWNED | STATE_RUN_QUEUED; + true + } else { + false + } + }) + } + + /// Unmark the task as spawned. + #[inline(always)] + pub fn despawn(&self) { + self.update(|s| *s &= !STATE_SPAWNED); + } + + /// Mark the task as run-queued if it's spawned and isn't already run-queued. Return true on success. + #[inline(always)] + pub fn run_enqueue(&self) -> bool { + self.update(|s| { + if (*s & STATE_RUN_QUEUED != 0) || (*s & STATE_SPAWNED == 0) { + false + } else { + *s |= STATE_RUN_QUEUED; + true + } + }) + } + + /// Unmark the task as run-queued. Return whether the task is spawned. + #[inline(always)] + pub fn run_dequeue(&self) -> bool { + self.update(|s| { + let ok = *s & STATE_SPAWNED != 0; + *s &= !STATE_RUN_QUEUED; + ok + }) + } + + /// Mark the task as timer-queued. Return whether it was newly queued (i.e. not queued before) + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_enqueue(&self) -> bool { + self.update(|s| { + let ok = *s & STATE_TIMER_QUEUED == 0; + *s |= STATE_TIMER_QUEUED; + ok + }) + } + + /// Unmark the task as timer-queued. + #[cfg(feature = "integrated-timers")] + #[inline(always)] + pub fn timer_dequeue(&self) { + self.update(|s| *s &= !STATE_TIMER_QUEUED); + } +} diff --git a/embassy-executor/src/raw/timer_queue.rs b/embassy-executor/src/raw/timer_queue.rs index dc71c95b1..59a3b43f5 100644 --- a/embassy-executor/src/raw/timer_queue.rs +++ b/embassy-executor/src/raw/timer_queue.rs @@ -1,9 +1,8 @@ use core::cmp::min; -use atomic_polyfill::Ordering; use embassy_time::Instant; -use super::{TaskRef, STATE_TIMER_QUEUED}; +use super::TaskRef; use crate::raw::util::SyncUnsafeCell; pub(crate) struct TimerQueueItem { @@ -32,10 +31,7 @@ impl TimerQueue { pub(crate) unsafe fn update(&self, p: TaskRef) { let task = p.header(); if task.expires_at.get() != Instant::MAX { - let old_state = task.state.fetch_or(STATE_TIMER_QUEUED, Ordering::AcqRel); - let is_new = old_state & STATE_TIMER_QUEUED == 0; - - if is_new { + if task.state.timer_enqueue() { task.timer_queue_item.next.set(self.head.get()); self.head.set(Some(p)); } @@ -75,7 +71,7 @@ impl TimerQueue { } else { // Remove it prev.set(task.timer_queue_item.next.get()); - task.state.fetch_and(!STATE_TIMER_QUEUED, Ordering::AcqRel); + task.state.timer_dequeue(); } } }