From e05e5d33f0ab832b2ea1c48674c99b41581118be Mon Sep 17 00:00:00 2001 From: rafael Date: Sun, 28 Jul 2024 00:19:54 +0200 Subject: [PATCH] review comments --- examples/rp/src/bin/orchestrate_tasks.rs | 66 +++++++++++++++++++++--- 1 file changed, 59 insertions(+), 7 deletions(-) diff --git a/examples/rp/src/bin/orchestrate_tasks.rs b/examples/rp/src/bin/orchestrate_tasks.rs index b3c37de4c..0e21d5833 100644 --- a/examples/rp/src/bin/orchestrate_tasks.rs +++ b/examples/rp/src/bin/orchestrate_tasks.rs @@ -14,6 +14,7 @@ //! - a task that generates random numbers in intervals of 90s //! - a task that notifies about being attached/disattached from usb power //! - a task that measures vsys voltage in intervals of 30s +//! - a task that consumes the state information and reacts to it #![no_std] #![no_main] @@ -57,6 +58,7 @@ enum Events { FirstRandomSeed(u32), SecondRandomSeed(u32), ThirdRandomSeed(u32), + ResetFirstRandomSeed, } /// This is the type of Commands that we will send from the orchestrating task to the worker tasks. @@ -103,6 +105,11 @@ static EVENT_CHANNEL: channel::Channel = ch /// Signal for stopping the first random signal task. We use a signal here, because we need no queue. It is suffiient to have one signal active. static STOP_FIRST_RANDOM_SIGNAL: signal::Signal = signal::Signal::new(); +/// Channel for the state that we want the consumer task to react to. We use a channel here, because we want to have a queue of state changes, although +/// we want the queue to be of size 1, because we want to finish rwacting to the state change before the next one comes in. This is just a design choice +/// and depends on your use case. +static CONSUMER_CHANNEL: channel::Channel = channel::Channel::new(); + // And now we can put all this into use /// This is the main task, that will not do very much besides spawning the other tasks. This is a design choice, you could do the @@ -116,11 +123,11 @@ async fn main(spawner: Spawner) { // spawn the tasks spawner.spawn(orchestrate(spawner)).unwrap(); - spawner.spawn(random_30s(spawner)).unwrap(); spawner.spawn(random_60s(spawner)).unwrap(); spawner.spawn(random_90s(spawner)).unwrap(); spawner.spawn(usb_power(spawner, r.vbus)).unwrap(); spawner.spawn(vsys_voltage(spawner, r.vsys)).unwrap(); + spawner.spawn(consumer(spawner)).unwrap(); } /// This is the task handling the system state and orchestrating the other tasks. WEe can regard this as the "main loop" of the system. @@ -131,6 +138,9 @@ async fn orchestrate(_spawner: Spawner) { // we need to have a receiver for the events let receiver = EVENT_CHANNEL.receiver(); + // and we need a sender for the consumer task + let state_sender = CONSUMER_CHANNEL.sender(); + loop { // we await on the receiver, this will block until a new event is available // as an alternative to this, we could also await on multiple channels, this would block until at least one of the channels has an event @@ -172,23 +182,65 @@ async fn orchestrate(_spawner: Spawner) { state.third_random_seed = seed; info!("Third random seed: {}", seed); } + Events::ResetFirstRandomSeed => { + // update the state and/or react to the event here + state.times_we_got_first_random_seed = 0; + state.first_random_seed = 0; + info!("Resetting the first random seed counter"); + } } // we now have an altered state // there is a crate for detecting field changes on crates.io (https://crates.io/crates/fieldset) that might be useful here // for now we just keep it simple - info!("State: {:?}", &state); - // here we react to the state, in this case here we want to stop the first random seed task after we got it a defined number of times - if state.times_we_got_first_random_seed == state.maximum_times_we_want_first_random_seed { - info!("Stopping the first random signal task"); - // we send a command to the task - STOP_FIRST_RANDOM_SIGNAL.signal(Commands::Stop); + // we send the state to the consumer task + // since the channel has a size of 1, this will block until the consumer task has received the state, which is what we want here in this example + // **Note:** It is bad design to send too much data between tasks, with no clear definition of what "too much" is. In this example we send the + // whole state, in a real world application you might want to send only the data, that is relevant to the consumer task AND only when it has changed. + // We keep it simple here. + state_sender.send(state.clone()).await; + } +} + +/// This task will consume the state information and react to it. This is a simple example, in a real world application this would be more complex +/// and we could have multiple consumer tasks, each reacting to different parts of the state. +#[embassy_executor::task] +async fn consumer(spawner: Spawner) { + // we need to have a receiver for the state + let receiver = CONSUMER_CHANNEL.receiver(); + let sender = EVENT_CHANNEL.sender(); + loop { + // we await on the receiver, this will block until a new state is available + let state = receiver.receive().await; + // react to the state, in this case here we just log it + info!("The consumer has reveived this state: {:?}", &state); + + // here we react to the state, in this case here we want to start or stop the first random signal task depending on the state of the system + match state.times_we_got_first_random_seed { + max if max == state.maximum_times_we_want_first_random_seed => { + info!("Stopping the first random signal task"); + // we send a command to the task + STOP_FIRST_RANDOM_SIGNAL.signal(Commands::Stop); + // we notify the orchestrator that we have sent the command + sender.send(Events::ResetFirstRandomSeed).await; + } + 0 => { + // we start the task, which presents us with an interesting problem, because we may return here before the task has started + // here we just try and log if the task has started, in a real world application you might want to handle this more gracefully + info!("Starting the first random signal task"); + match spawner.spawn(random_30s(spawner)) { + Ok(_) => info!("Successfully spawned random_30s task"), + Err(e) => info!("Failed to spawn random_30s task: {:?}", e), + } + } + _ => {} } } } /// This task will generate random numbers in intervals of 30s /// The task will terminate after it has received a command signal to stop, see the orchestrate task for that. +/// Note that we are not spawning this task from main, as we will show how such a task can be spawned and closed dynamically. #[embassy_executor::task] async fn random_30s(_spawner: Spawner) { let mut rng = RoscRng;