Moved the logic for a pausible idle callback into a new type - PausibleIdleCallback and placed the appropriate signatures in rtio and implementation into uvio.

This commit is contained in:
toddaaro 2013-08-16 13:41:30 -07:00
parent 88d8baa76b
commit f83835b0e7
3 changed files with 100 additions and 93 deletions

View File

@ -24,10 +24,12 @@ pub type RtioTcpStreamObject = uvio::UvTcpStream;
pub type RtioTcpListenerObject = uvio::UvTcpListener;
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
pub type RtioTimerObject = uvio::UvTimer;
pub type PausibleIdleCallback = uvio::UvPausibleIdleCallback;
pub trait EventLoop {
fn run(&mut self);
fn callback(&mut self, ~fn());
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback;
fn callback_ms(&mut self, ms: u64, ~fn());
fn remote_callback(&mut self, ~fn()) -> ~RemoteCallbackObject;
/// The asynchronous I/O services. Not all event loops may provide one
@ -35,11 +37,12 @@ pub trait EventLoop {
}
pub trait RemoteCallback {
/// Trigger the remote callback. Note that the number of times the callback
/// is run is not guaranteed. All that is guaranteed is that, after calling 'fire',
/// the callback will be called at least once, but multiple callbacks may be coalesced
/// and callbacks may be called more often requested. Destruction also triggers the
/// callback.
/// Trigger the remote callback. Note that the number of times the
/// callback is run is not guaranteed. All that is guaranteed is
/// that, after calling 'fire', the callback will be called at
/// least once, but multiple callbacks may be coalesced and
/// callbacks may be called more often requested. Destruction also
/// triggers the callback.
fn fire(&mut self);
}

View File

@ -23,14 +23,13 @@ use super::message_queue::MessageQueue;
use rt::kill::BlockedTask;
use rt::local_ptr;
use rt::local::Local;
use rt::rtio::RemoteCallback;
use rt::rtio::{RemoteCallback, PausibleIdleCallback};
use rt::metrics::SchedMetrics;
use borrow::{to_uint};
use cell::Cell;
use rand::{XorShiftRng, RngUtil};
use iterator::{range};
use vec::{OwnedVector};
use rt::uv::idle::IdleWatcher;
/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
@ -78,10 +77,13 @@ pub struct Scheduler {
friend_handle: Option<SchedHandle>,
/// A fast XorShift rng for scheduler use
rng: XorShiftRng,
/// An IdleWatcher
idle_watcher: Option<IdleWatcher>,
/// A flag to indicate whether or not the idle callback is active.
idle_flag: bool
/// A toggleable idle callback
idle_callback: ~PausibleIdleCallback
}
enum CleanupJob {
DoNothing,
GiveTask(~Task, UnsafeTaskReceiver)
}
pub struct SchedHandle {
@ -97,11 +99,6 @@ pub enum SchedMessage {
TaskFromFriend(~Task)
}
enum CleanupJob {
DoNothing,
GiveTask(~Task, UnsafeTaskReceiver)
}
impl Scheduler {
pub fn sched_id(&self) -> uint { to_uint(self) }
@ -126,7 +123,10 @@ impl Scheduler {
sleeper_list: SleeperList,
run_anything: bool,
friend: Option<SchedHandle>)
-> Scheduler {
-> Scheduler {
let mut event_loop = event_loop;
let idle_callback = event_loop.pausible_idle_callback();
Scheduler {
sleeper_list: sleeper_list,
@ -142,9 +142,8 @@ impl Scheduler {
metrics: SchedMetrics::new(),
run_anything: run_anything,
friend_handle: friend,
rng: XorShiftRng::new(),
idle_watcher: None,
idle_flag: false
rng: XorShiftRng::new(),
idle_callback: idle_callback
}
}
@ -172,7 +171,7 @@ impl Scheduler {
// Before starting our first task, make sure the idle callback
// is active. As we do not start in the sleep state this is
// important.
this.activate_idle();
this.idle_callback.start(Scheduler::run_sched_once);
// Now, as far as all the scheduler state is concerned, we are
// inside the "scheduler" context. So we can act like the
@ -194,7 +193,7 @@ impl Scheduler {
// cleaning up the memory it uses. As we didn't actually call
// task.run() on the scheduler task we never get through all
// the cleanup code it runs.
let mut stask = Local::take::<Task>();
let mut stask = Local::take::<Task>();
rtdebug!("stopping scheduler %u", stask.sched.get_ref().sched_id());
@ -202,6 +201,9 @@ impl Scheduler {
let message = stask.sched.get_mut_ref().message_queue.pop();
assert!(message.is_none());
// Close the idle callback.
stask.sched.get_mut_ref().idle_callback.close();
stask.destroyed = true;
}
@ -211,11 +213,6 @@ impl Scheduler {
let mut self_sched = self;
// Always run through the scheduler loop at least once so that
// we enter the sleep state and can then be woken up by other
// schedulers.
// self_sched.event_loop.callback(Scheduler::run_sched_once);
// This is unsafe because we need to place the scheduler, with
// the event_loop inside, inside our task. But we still need a
// mutable reference to the event_loop to give it the "run"
@ -252,7 +249,7 @@ impl Scheduler {
// Assume that we need to continue idling unless we reach the
// end of this function without performing an action.
sched.activate_idle();
sched.idle_callback.resume();
// Our first task is to read mail to see if we have important
// messages.
@ -300,12 +297,12 @@ impl Scheduler {
let handle = sched.make_handle();
sched.sleeper_list.push(handle);
// Since we are sleeping, deactivate the idle callback.
sched.pause_idle();
sched.idle_callback.pause();
} else {
rtdebug!("not sleeping, already doing so or no_sleep set");
// We may not be sleeping, but we still need to deactivate
// the idle callback.
sched.pause_idle();
sched.idle_callback.pause();
}
// Finished a cycle without using the Scheduler. Place it back
@ -313,46 +310,6 @@ impl Scheduler {
Local::put(sched);
}
fn activate_idle(&mut self) {
rtdebug!("activating idle");
if self.idle_flag {
rtassert!(self.idle_watcher.is_some());
rtdebug!("idle flag already set, not reactivating idle watcher");
} else {
rtdebug!("idle flag was false, reactivating idle watcher");
self.idle_flag = true;
if self.idle_watcher.is_none() {
// There's no idle handle yet. Create one
let mut idle_watcher = IdleWatcher::new(self.event_loop.uvio.uv_loop());
do idle_watcher.start |_idle_watcher, _status| {
Scheduler::run_sched_once();
}
self.idle_watcher = Some(idle_watcher);
} else {
self.idle_watcher.get_mut_ref().restart();
}
}
}
fn pause_idle(&mut self) {
rtassert!(self.idle_watcher.is_some());
rtassert!(self.idle_flag);
rtdebug!("stopping idle watcher");
self.idle_flag = false;
if !self.no_sleep {
self.idle_watcher.get_mut_ref().stop();
} else {
rtdebug!("closing idle watcher");
// The scheduler is trying to exit. Destroy the idle watcher
// to drop the reference to the event loop.
let mut idle_watcher = self.idle_watcher.take_unwrap();
idle_watcher.stop();
idle_watcher.close(||());
}
}
pub fn make_handle(&mut self) -> SchedHandle {
let remote = self.event_loop.remote_callback(Scheduler::run_sched_once);
@ -376,10 +333,9 @@ impl Scheduler {
// We push the task onto our local queue clone.
this.work_queue.push(task);
// this.event_loop.callback(Scheduler::run_sched_once);
// There is definitely work to be done later. Make sure we wake up for it.
this.activate_idle();
this.idle_callback.resume();
// We've made work available. Notify a
// sleeping scheduler.
@ -420,28 +376,23 @@ impl Scheduler {
let mut this = self;
match this.message_queue.pop() {
Some(PinnedTask(task)) => {
// this.event_loop.callback(Scheduler::run_sched_once);
let mut task = task;
task.give_home(Sched(this.make_handle()));
this.resume_task_immediately(task);
return None;
}
Some(TaskFromFriend(task)) => {
// this.event_loop.callback(Scheduler::run_sched_once);
rtdebug!("got a task from a friend. lovely!");
this.sched_schedule_task(task).map_move(Local::put);
return None;
}
Some(Wake) => {
// this.event_loop.callback(Scheduler::run_sched_once);
this.sleepy = false;
Local::put(this);
return None;
// return Some(this);
}
Some(Shutdown) => {
rtdebug!("shutting down");
// this.event_loop.callback(Scheduler::run_sched_once);
if this.sleepy {
// There may be an outstanding handle on the
// sleeper list. Pop them all to make sure that's
@ -463,7 +414,6 @@ impl Scheduler {
Local::put(this);
return None;
// return Some(this);
}
None => {
return Some(this);

View File

@ -116,6 +116,15 @@ impl EventLoop for UvEventLoop {
}
}
fn pausible_idle_callback(&mut self) -> ~PausibleIdleCallback {
let idle_watcher = IdleWatcher::new(self.uvio.uv_loop());
return ~UvPausibleIdleCallback {
watcher: idle_watcher,
idle_flag: false,
closed: false
};
}
fn callback_ms(&mut self, ms: u64, f: ~fn()) {
let mut timer = TimerWatcher::new(self.uvio.uv_loop());
do timer.start(ms, 0) |timer, status| {
@ -134,6 +143,44 @@ impl EventLoop for UvEventLoop {
}
}
pub struct UvPausibleIdleCallback {
watcher: IdleWatcher,
idle_flag: bool,
closed: bool
}
impl UvPausibleIdleCallback {
#[inline]
pub fn start(&mut self, f: ~fn()) {
do self.watcher.start |_idle_watcher, _status| {
f();
};
self.idle_flag = true;
}
#[inline]
pub fn pause(&mut self) {
if self.idle_flag == true {
self.watcher.stop();
self.idle_flag = false;
}
}
#[inline]
pub fn resume(&mut self) {
if self.idle_flag == false {
self.watcher.restart();
self.idle_flag = true;
}
}
#[inline]
pub fn close(&mut self) {
self.pause();
if !self.closed {
self.closed = true;
self.watcher.close(||());
}
}
}
#[test]
fn test_callback_run_once() {
do run_in_bare_thread {
@ -163,24 +210,31 @@ impl UvRemoteCallback {
let async = do AsyncWatcher::new(loop_) |watcher, status| {
assert!(status.is_none());
// The synchronization logic here is subtle. To review, the uv async handle
// type promises that, after it is triggered the remote callback is definitely
// called at least once. UvRemoteCallback needs to maintain those semantics
// while also shutting down cleanly from the dtor. In our case that means that,
// when the UvRemoteCallback dtor calls `async.send()`, here `f` is always called
// later.
// The synchronization logic here is subtle. To review,
// the uv async handle type promises that, after it is
// triggered the remote callback is definitely called at
// least once. UvRemoteCallback needs to maintain those
// semantics while also shutting down cleanly from the
// dtor. In our case that means that, when the
// UvRemoteCallback dtor calls `async.send()`, here `f` is
// always called later.
// In the dtor both the exit flag is set and the async callback fired under a lock.
// Here, before calling `f`, we take the lock and check the flag. Because we are
// checking the flag before calling `f`, and the flag is set under the same lock
// as the send, then if the flag is set then we're guaranteed to call `f` after
// the final send.
// In the dtor both the exit flag is set and the async
// callback fired under a lock. Here, before calling `f`,
// we take the lock and check the flag. Because we are
// checking the flag before calling `f`, and the flag is
// set under the same lock as the send, then if the flag
// is set then we're guaranteed to call `f` after the
// final send.
// If the check was done after `f()` then there would be a period between that call
// and the check where the dtor could be called in the other thread, missing the
// final callback while still destroying the handle.
// If the check was done after `f()` then there would be a
// period between that call and the check where the dtor
// could be called in the other thread, missing the final
// callback while still destroying the handle.
let should_exit = unsafe { exit_flag_clone.with_imm(|&should_exit| should_exit) };
let should_exit = unsafe {
exit_flag_clone.with_imm(|&should_exit| should_exit)
};
f();