core::rt: Add another context switching operation to the scheduler

`switch_running_tasks_and_then` does a context switch to another
task then immediatly runs a closure.
This commit is contained in:
Brian Anderson 2013-04-18 20:16:33 -07:00
parent d261bb32d9
commit eddd817bf0

View File

@ -65,6 +65,8 @@ enum CleanupJob {
pub impl Scheduler { pub impl Scheduler {
fn in_task_context(&self) -> bool { self.current_task.is_some() }
fn new(event_loop: ~EventLoopObject) -> Scheduler { fn new(event_loop: ~EventLoopObject) -> Scheduler {
// Lazily initialize the global state, currently the scheduler TLS key // Lazily initialize the global state, currently the scheduler TLS key
@ -131,6 +133,59 @@ pub impl Scheduler {
} }
} }
// * Task-context operations
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
fn terminate_current_task(~self) {
let mut self = self;
assert!(self.in_task_context());
rtdebug!("ending running task");
let dead_task = self.current_task.swap_unwrap();
self.enqueue_cleanup_job(RecycleTask(dead_task));
local_sched::put(self);
let sched = unsafe { local_sched::unsafe_borrow() };
let (sched_context, last_task_context, _) = sched.get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
// Control never reaches here
}
/// Switch directly to another task, without going through the scheduler.
/// You would want to think hard about doing this, e.g. if there are
/// pending I/O events it would be a bad idea.
fn resume_task_from_running_task_direct(~self, next_task: ~Task) {
let mut self = self;
assert!(self.in_task_context());
rtdebug!("switching tasks");
let old_running_task = self.current_task.swap_unwrap();
self.enqueue_cleanup_job(RescheduleTask(old_running_task));
self.current_task = Some(next_task);
local_sched::put(self);
unsafe {
let sched = local_sched::unsafe_borrow();
let (_, last_task_context, next_task_context) = sched.get_contexts();
let last_task_context = last_task_context.unwrap();
let next_task_context = next_task_context.unwrap();
Context::swap(last_task_context, next_task_context);
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
sched.run_cleanup_job();
}
}
// Core scheduling ops
fn resume_task_immediately(~self, task: ~Task) { fn resume_task_immediately(~self, task: ~Task) {
let mut self = self; let mut self = self;
assert!(!self.in_task_context()); assert!(!self.in_task_context());
@ -161,30 +216,6 @@ pub impl Scheduler {
} }
} }
// * Task-context operations
/// Called by a running task to end execution, after which it will
/// be recycled by the scheduler for reuse in a new task.
fn terminate_current_task(~self) {
let mut self = self;
assert!(self.in_task_context());
rtdebug!("ending running task");
let dead_task = self.current_task.swap_unwrap();
self.enqueue_cleanup_job(RecycleTask(dead_task));
local_sched::put(self);
let sched = unsafe { local_sched::unsafe_borrow() };
let (sched_context, last_task_context, _) = sched.get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
// Control never reaches here
}
/// Block a running task, context switch to the scheduler, then pass the /// Block a running task, context switch to the scheduler, then pass the
/// blocked task to a closure. /// blocked task to a closure.
/// ///
@ -219,14 +250,16 @@ pub impl Scheduler {
/// Switch directly to another task, without going through the scheduler. /// Switch directly to another task, without going through the scheduler.
/// You would want to think hard about doing this, e.g. if there are /// You would want to think hard about doing this, e.g. if there are
/// pending I/O events it would be a bad idea. /// pending I/O events it would be a bad idea.
fn resume_task_from_running_task_direct(~self, next_task: ~Task) { fn switch_running_tasks_and_then(~self, next_task: ~Task, f: &fn(~Task)) {
let mut self = self; let mut self = self;
assert!(self.in_task_context()); assert!(self.in_task_context());
rtdebug!("switching tasks"); rtdebug!("switching tasks");
let old_running_task = self.current_task.swap_unwrap(); let old_running_task = self.current_task.swap_unwrap();
self.enqueue_cleanup_job(RescheduleTask(old_running_task)); let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) };
let f_opaque = ClosureConverter::from_fn(f_fake_region);
self.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
self.current_task = Some(next_task); self.current_task = Some(next_task);
local_sched::put(self); local_sched::put(self);
@ -244,9 +277,9 @@ pub impl Scheduler {
} }
} }
// * Other stuff
fn in_task_context(&self) -> bool { self.current_task.is_some() }
// * Other stuff
fn enqueue_cleanup_job(&mut self, job: CleanupJob) { fn enqueue_cleanup_job(&mut self, job: CleanupJob) {
assert!(self.cleanup_job.is_none()); assert!(self.cleanup_job.is_none());
@ -422,6 +455,34 @@ fn test_swap_tasks() {
} }
} }
#[test]
fn test_swap_tasks_then() {
do run_in_bare_thread {
let mut count = 0;
let count_ptr: *mut int = &mut count;
let mut sched = ~UvEventLoop::new_scheduler();
let task1 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
let mut sched = local_sched::take();
let task2 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
do sched.switch_running_tasks_and_then(task2) |task1| {
let task1 = Cell(task1);
do local_sched::borrow |sched| {
sched.task_queue.push_front(task1.take());
}
}
unsafe { *count_ptr = *count_ptr + 1; }
};
sched.task_queue.push_back(task1);
sched.run();
assert!(count == 3);
}
}
#[bench] #[test] #[ignore(reason = "long test")] #[bench] #[test] #[ignore(reason = "long test")]
fn test_run_a_lot_of_tasks_queued() { fn test_run_a_lot_of_tasks_queued() {
do run_in_bare_thread { do run_in_bare_thread {