diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index 6336b1cbe2e..d7b44469177 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -118,6 +118,17 @@ impl ChanOne { rtassert!(!rt::in_sched_context()); } + // In order to prevent starvation of other tasks in situations + // where a task sends repeatedly without ever receiving, we + // occassionally yield instead of doing a send immediately. + // Only doing this if we're doing a rescheduling send, + // otherwise the caller is expecting not to context switch. + if do_resched { + // XXX: This TLS hit should be combined with other uses of the scheduler below + let sched: ~Scheduler = Local::take(); + sched.maybe_yield(); + } + let mut this = self; let mut recvr_active = true; let packet = this.packet(); diff --git a/src/libstd/rt/sched.rs b/src/libstd/rt/sched.rs index c3841760de3..a62d3335644 100644 --- a/src/libstd/rt/sched.rs +++ b/src/libstd/rt/sched.rs @@ -26,7 +26,7 @@ use rt::local::Local; use rt::rtio::{RemoteCallback, PausibleIdleCallback}; use borrow::{to_uint}; use cell::Cell; -use rand::{XorShiftRng, Rng}; +use rand::{XorShiftRng, Rng, Rand}; use iter::range; use vec::{OwnedVector}; @@ -78,7 +78,14 @@ pub struct Scheduler { /// A fast XorShift rng for scheduler use rng: XorShiftRng, /// A toggleable idle callback - idle_callback: Option<~PausibleIdleCallback> + idle_callback: Option<~PausibleIdleCallback>, + /// A countdown that starts at a random value and is decremented + /// every time a yield check is performed. When it hits 0 a task + /// will yield. + yield_check_count: uint, + /// A flag to tell the scheduler loop it needs to do some stealing + /// in order to introduce randomness as part of a yield + steal_for_yield: bool } /// An indication of how hard to work on a given operation, the difference @@ -89,6 +96,13 @@ enum EffortLevel { GiveItYourBest } +static MAX_YIELD_CHECKS: uint = 200; + +fn reset_yield_check(rng: &mut XorShiftRng) -> uint { + let r: uint = Rand::rand(rng); + r % MAX_YIELD_CHECKS + 1 +} + impl Scheduler { // * Initialization Functions @@ -113,7 +127,7 @@ impl Scheduler { friend: Option) -> Scheduler { - Scheduler { + let mut sched = Scheduler { sleeper_list: sleeper_list, message_queue: MessageQueue::new(), sleepy: false, @@ -127,8 +141,14 @@ impl Scheduler { run_anything: run_anything, friend_handle: friend, rng: XorShiftRng::new(), - idle_callback: None - } + idle_callback: None, + yield_check_count: 0, + steal_for_yield: false + }; + + sched.yield_check_count = reset_yield_check(&mut sched.rng); + + return sched; } // XXX: This may eventually need to be refactored so that @@ -307,8 +327,7 @@ impl Scheduler { } Some(TaskFromFriend(task)) => { rtdebug!("got a task from a friend. lovely!"); - this.process_task(task, - Scheduler::resume_task_immediately_cl).map_move(Local::put); + this.process_task(task, Scheduler::resume_task_immediately_cl); return None; } Some(Wake) => { @@ -352,8 +371,8 @@ impl Scheduler { match this.find_work() { Some(task) => { rtdebug!("found some work! processing the task"); - return this.process_task(task, - Scheduler::resume_task_immediately_cl); + this.process_task(task, Scheduler::resume_task_immediately_cl); + return None; } None => { rtdebug!("no work was found, returning the scheduler struct"); @@ -373,14 +392,35 @@ impl Scheduler { // there, trying to steal from the remote work queues. fn find_work(&mut self) -> Option<~Task> { rtdebug!("scheduler looking for work"); - match self.work_queue.pop() { - Some(task) => { - rtdebug!("found a task locally"); - return Some(task) + if !self.steal_for_yield { + match self.work_queue.pop() { + Some(task) => { + rtdebug!("found a task locally"); + return Some(task) + } + None => { + rtdebug!("scheduler trying to steal"); + return self.try_steals(); + } } - None => { - rtdebug!("scheduler trying to steal"); - return self.try_steals(); + } else { + // During execution of the last task, it performed a 'yield', + // so we're doing some work stealing in order to introduce some + // scheduling randomness. Otherwise we would just end up popping + // that same task again. This is pretty lame and is to work around + // the problem that work stealing is not designed for 'non-strict' + // (non-fork-join) task parallelism. + self.steal_for_yield = false; + match self.try_steals() { + Some(task) => { + rtdebug!("stole a task after yielding"); + return Some(task); + } + None => { + rtdebug!("did not steal a task after yielding"); + // Back to business + return self.find_work(); + } } } } @@ -409,7 +449,7 @@ impl Scheduler { // place. fn process_task(~self, task: ~Task, - schedule_fn: SchedulingFn) -> Option<~Scheduler> { + schedule_fn: SchedulingFn) { let mut this = self; let mut task = task; @@ -422,23 +462,23 @@ impl Scheduler { rtdebug!("sending task home"); task.give_home(Sched(home_handle)); Scheduler::send_task_home(task); - return Some(this); + Local::put(this); } else { rtdebug!("running task here"); task.give_home(Sched(home_handle)); - return schedule_fn(this, task); + schedule_fn(this, task); } } AnySched if this.run_anything => { rtdebug!("running anysched task here"); task.give_home(AnySched); - return schedule_fn(this, task); + schedule_fn(this, task); } AnySched => { rtdebug!("sending task to friend"); task.give_home(AnySched); this.send_to_friend(task); - return Some(this); + Local::put(this); } } } @@ -607,15 +647,14 @@ impl Scheduler { // * Context Swapping Helpers - Here be ugliness! - pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> { + pub fn resume_task_immediately(~self, task: ~Task) { do self.change_task_context(task) |sched, stask| { sched.sched_task = Some(stask); } - return None; } fn resume_task_immediately_cl(sched: ~Scheduler, - task: ~Task) -> Option<~Scheduler> { + task: ~Task) { sched.resume_task_immediately(task) } @@ -662,11 +701,10 @@ impl Scheduler { } } - fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> { + fn switch_task(sched: ~Scheduler, task: ~Task) { do sched.switch_running_tasks_and_then(task) |sched, last_task| { sched.enqueue_blocked_task(last_task); }; - return None; } // * Task Context Helpers @@ -686,7 +724,7 @@ impl Scheduler { pub fn run_task(task: ~Task) { let sched: ~Scheduler = Local::take(); - sched.process_task(task, Scheduler::switch_task).map_move(Local::put); + sched.process_task(task, Scheduler::switch_task); } pub fn run_task_later(next_task: ~Task) { @@ -696,6 +734,33 @@ impl Scheduler { }; } + /// Yield control to the scheduler, executing another task. This is guaranteed + /// to introduce some amount of randomness to the scheduler. Currently the + /// randomness is a result of performing a round of work stealing (which + /// may end up stealing from the current scheduler). + pub fn yield_now(~self) { + let mut this = self; + this.yield_check_count = reset_yield_check(&mut this.rng); + // Tell the scheduler to start stealing on the next iteration + this.steal_for_yield = true; + do this.deschedule_running_task_and_then |sched, task| { + sched.enqueue_blocked_task(task); + } + } + + pub fn maybe_yield(~self) { + // The number of times to do the yield check before yielding, chosen arbitrarily. + let mut this = self; + rtassert!(this.yield_check_count > 0); + this.yield_check_count -= 1; + if this.yield_check_count == 0 { + this.yield_now(); + } else { + Local::put(this); + } + } + + // * Utility Functions pub fn sched_id(&self) -> uint { to_uint(self) } @@ -718,7 +783,7 @@ impl Scheduler { // Supporting types -type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>; +type SchedulingFn = ~fn(~Scheduler, ~Task); pub enum SchedMessage { Wake, @@ -1231,4 +1296,40 @@ mod test { } } } + + #[test] + fn dont_starve_2() { + use rt::comm::oneshot; + + do stress_factor().times { + do run_in_newsched_task { + let (port, chan) = oneshot(); + let (_port2, chan2) = stream(); + + // This task should not be able to starve the other task. + // The sends should eventually yield. + do spawntask { + while !port.peek() { + chan2.send(()); + } + } + + chan.send(()); + } + } + } + + // Regression test for a logic bug that would cause single-threaded schedulers + // to sleep forever after yielding and stealing another task. + #[test] + fn single_threaded_yield() { + use task::{spawn, spawn_sched, SingleThreaded, deschedule}; + use num::Times; + + do spawn_sched(SingleThreaded) { + do 5.times { deschedule(); } + } + do spawn { } + do spawn { } + } } diff --git a/src/libstd/task/mod.rs b/src/libstd/task/mod.rs index 1dbc644c8e5..e6f6536956c 100644 --- a/src/libstd/task/mod.rs +++ b/src/libstd/task/mod.rs @@ -542,12 +542,9 @@ pub fn deschedule() { use rt::local::Local; use rt::sched::Scheduler; - // FIXME #6842: What does yield really mean in newsched? // FIXME(#7544): Optimize this, since we know we won't block. let sched: ~Scheduler = Local::take(); - do sched.deschedule_running_task_and_then |sched, task| { - sched.enqueue_blocked_task(task); - } + sched.yield_now(); } pub fn failing() -> bool {