mirror of
https://github.com/rust-lang/rust.git
synced 2025-02-23 20:34:06 +00:00
std::rt: Implement task yielding. Fix a starvation problem
This commit is contained in:
parent
00db6f6e7b
commit
d209717ddd
@ -118,6 +118,17 @@ impl<T> ChanOne<T> {
|
||||
rtassert!(!rt::in_sched_context());
|
||||
}
|
||||
|
||||
// In order to prevent starvation of other tasks in situations
|
||||
// where a task sends repeatedly without ever receiving, we
|
||||
// occassionally yield instead of doing a send immediately.
|
||||
// Only doing this if we're doing a rescheduling send,
|
||||
// otherwise the caller is expecting not to context switch.
|
||||
if do_resched {
|
||||
// XXX: This TLS hit should be combined with other uses of the scheduler below
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.maybe_yield();
|
||||
}
|
||||
|
||||
let mut this = self;
|
||||
let mut recvr_active = true;
|
||||
let packet = this.packet();
|
||||
|
@ -26,7 +26,7 @@ use rt::local::Local;
|
||||
use rt::rtio::{RemoteCallback, PausibleIdleCallback};
|
||||
use borrow::{to_uint};
|
||||
use cell::Cell;
|
||||
use rand::{XorShiftRng, Rng};
|
||||
use rand::{XorShiftRng, Rng, Rand};
|
||||
use iter::range;
|
||||
use vec::{OwnedVector};
|
||||
|
||||
@ -78,7 +78,14 @@ pub struct Scheduler {
|
||||
/// A fast XorShift rng for scheduler use
|
||||
rng: XorShiftRng,
|
||||
/// A toggleable idle callback
|
||||
idle_callback: Option<~PausibleIdleCallback>
|
||||
idle_callback: Option<~PausibleIdleCallback>,
|
||||
/// A countdown that starts at a random value and is decremented
|
||||
/// every time a yield check is performed. When it hits 0 a task
|
||||
/// will yield.
|
||||
yield_check_count: uint,
|
||||
/// A flag to tell the scheduler loop it needs to do some stealing
|
||||
/// in order to introduce randomness as part of a yield
|
||||
steal_for_yield: bool
|
||||
}
|
||||
|
||||
/// An indication of how hard to work on a given operation, the difference
|
||||
@ -89,6 +96,13 @@ enum EffortLevel {
|
||||
GiveItYourBest
|
||||
}
|
||||
|
||||
static MAX_YIELD_CHECKS: uint = 200;
|
||||
|
||||
fn reset_yield_check(rng: &mut XorShiftRng) -> uint {
|
||||
let r: uint = Rand::rand(rng);
|
||||
r % MAX_YIELD_CHECKS + 1
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
|
||||
// * Initialization Functions
|
||||
@ -113,7 +127,7 @@ impl Scheduler {
|
||||
friend: Option<SchedHandle>)
|
||||
-> Scheduler {
|
||||
|
||||
Scheduler {
|
||||
let mut sched = Scheduler {
|
||||
sleeper_list: sleeper_list,
|
||||
message_queue: MessageQueue::new(),
|
||||
sleepy: false,
|
||||
@ -127,8 +141,14 @@ impl Scheduler {
|
||||
run_anything: run_anything,
|
||||
friend_handle: friend,
|
||||
rng: XorShiftRng::new(),
|
||||
idle_callback: None
|
||||
}
|
||||
idle_callback: None,
|
||||
yield_check_count: 0,
|
||||
steal_for_yield: false
|
||||
};
|
||||
|
||||
sched.yield_check_count = reset_yield_check(&mut sched.rng);
|
||||
|
||||
return sched;
|
||||
}
|
||||
|
||||
// XXX: This may eventually need to be refactored so that
|
||||
@ -307,8 +327,7 @@ impl Scheduler {
|
||||
}
|
||||
Some(TaskFromFriend(task)) => {
|
||||
rtdebug!("got a task from a friend. lovely!");
|
||||
this.process_task(task,
|
||||
Scheduler::resume_task_immediately_cl).map_move(Local::put);
|
||||
this.process_task(task, Scheduler::resume_task_immediately_cl);
|
||||
return None;
|
||||
}
|
||||
Some(Wake) => {
|
||||
@ -352,8 +371,8 @@ impl Scheduler {
|
||||
match this.find_work() {
|
||||
Some(task) => {
|
||||
rtdebug!("found some work! processing the task");
|
||||
return this.process_task(task,
|
||||
Scheduler::resume_task_immediately_cl);
|
||||
this.process_task(task, Scheduler::resume_task_immediately_cl);
|
||||
return None;
|
||||
}
|
||||
None => {
|
||||
rtdebug!("no work was found, returning the scheduler struct");
|
||||
@ -373,14 +392,35 @@ impl Scheduler {
|
||||
// there, trying to steal from the remote work queues.
|
||||
fn find_work(&mut self) -> Option<~Task> {
|
||||
rtdebug!("scheduler looking for work");
|
||||
match self.work_queue.pop() {
|
||||
Some(task) => {
|
||||
rtdebug!("found a task locally");
|
||||
return Some(task)
|
||||
if !self.steal_for_yield {
|
||||
match self.work_queue.pop() {
|
||||
Some(task) => {
|
||||
rtdebug!("found a task locally");
|
||||
return Some(task)
|
||||
}
|
||||
None => {
|
||||
rtdebug!("scheduler trying to steal");
|
||||
return self.try_steals();
|
||||
}
|
||||
}
|
||||
None => {
|
||||
rtdebug!("scheduler trying to steal");
|
||||
return self.try_steals();
|
||||
} else {
|
||||
// During execution of the last task, it performed a 'yield',
|
||||
// so we're doing some work stealing in order to introduce some
|
||||
// scheduling randomness. Otherwise we would just end up popping
|
||||
// that same task again. This is pretty lame and is to work around
|
||||
// the problem that work stealing is not designed for 'non-strict'
|
||||
// (non-fork-join) task parallelism.
|
||||
self.steal_for_yield = false;
|
||||
match self.try_steals() {
|
||||
Some(task) => {
|
||||
rtdebug!("stole a task after yielding");
|
||||
return Some(task);
|
||||
}
|
||||
None => {
|
||||
rtdebug!("did not steal a task after yielding");
|
||||
// Back to business
|
||||
return self.find_work();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -409,7 +449,7 @@ impl Scheduler {
|
||||
// place.
|
||||
|
||||
fn process_task(~self, task: ~Task,
|
||||
schedule_fn: SchedulingFn) -> Option<~Scheduler> {
|
||||
schedule_fn: SchedulingFn) {
|
||||
let mut this = self;
|
||||
let mut task = task;
|
||||
|
||||
@ -422,23 +462,23 @@ impl Scheduler {
|
||||
rtdebug!("sending task home");
|
||||
task.give_home(Sched(home_handle));
|
||||
Scheduler::send_task_home(task);
|
||||
return Some(this);
|
||||
Local::put(this);
|
||||
} else {
|
||||
rtdebug!("running task here");
|
||||
task.give_home(Sched(home_handle));
|
||||
return schedule_fn(this, task);
|
||||
schedule_fn(this, task);
|
||||
}
|
||||
}
|
||||
AnySched if this.run_anything => {
|
||||
rtdebug!("running anysched task here");
|
||||
task.give_home(AnySched);
|
||||
return schedule_fn(this, task);
|
||||
schedule_fn(this, task);
|
||||
}
|
||||
AnySched => {
|
||||
rtdebug!("sending task to friend");
|
||||
task.give_home(AnySched);
|
||||
this.send_to_friend(task);
|
||||
return Some(this);
|
||||
Local::put(this);
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -607,15 +647,14 @@ impl Scheduler {
|
||||
|
||||
// * Context Swapping Helpers - Here be ugliness!
|
||||
|
||||
pub fn resume_task_immediately(~self, task: ~Task) -> Option<~Scheduler> {
|
||||
pub fn resume_task_immediately(~self, task: ~Task) {
|
||||
do self.change_task_context(task) |sched, stask| {
|
||||
sched.sched_task = Some(stask);
|
||||
}
|
||||
return None;
|
||||
}
|
||||
|
||||
fn resume_task_immediately_cl(sched: ~Scheduler,
|
||||
task: ~Task) -> Option<~Scheduler> {
|
||||
task: ~Task) {
|
||||
sched.resume_task_immediately(task)
|
||||
}
|
||||
|
||||
@ -662,11 +701,10 @@ impl Scheduler {
|
||||
}
|
||||
}
|
||||
|
||||
fn switch_task(sched: ~Scheduler, task: ~Task) -> Option<~Scheduler> {
|
||||
fn switch_task(sched: ~Scheduler, task: ~Task) {
|
||||
do sched.switch_running_tasks_and_then(task) |sched, last_task| {
|
||||
sched.enqueue_blocked_task(last_task);
|
||||
};
|
||||
return None;
|
||||
}
|
||||
|
||||
// * Task Context Helpers
|
||||
@ -686,7 +724,7 @@ impl Scheduler {
|
||||
|
||||
pub fn run_task(task: ~Task) {
|
||||
let sched: ~Scheduler = Local::take();
|
||||
sched.process_task(task, Scheduler::switch_task).map_move(Local::put);
|
||||
sched.process_task(task, Scheduler::switch_task);
|
||||
}
|
||||
|
||||
pub fn run_task_later(next_task: ~Task) {
|
||||
@ -696,6 +734,33 @@ impl Scheduler {
|
||||
};
|
||||
}
|
||||
|
||||
/// Yield control to the scheduler, executing another task. This is guaranteed
|
||||
/// to introduce some amount of randomness to the scheduler. Currently the
|
||||
/// randomness is a result of performing a round of work stealing (which
|
||||
/// may end up stealing from the current scheduler).
|
||||
pub fn yield_now(~self) {
|
||||
let mut this = self;
|
||||
this.yield_check_count = reset_yield_check(&mut this.rng);
|
||||
// Tell the scheduler to start stealing on the next iteration
|
||||
this.steal_for_yield = true;
|
||||
do this.deschedule_running_task_and_then |sched, task| {
|
||||
sched.enqueue_blocked_task(task);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn maybe_yield(~self) {
|
||||
// The number of times to do the yield check before yielding, chosen arbitrarily.
|
||||
let mut this = self;
|
||||
rtassert!(this.yield_check_count > 0);
|
||||
this.yield_check_count -= 1;
|
||||
if this.yield_check_count == 0 {
|
||||
this.yield_now();
|
||||
} else {
|
||||
Local::put(this);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// * Utility Functions
|
||||
|
||||
pub fn sched_id(&self) -> uint { to_uint(self) }
|
||||
@ -718,7 +783,7 @@ impl Scheduler {
|
||||
|
||||
// Supporting types
|
||||
|
||||
type SchedulingFn = ~fn(~Scheduler, ~Task) -> Option<~Scheduler>;
|
||||
type SchedulingFn = ~fn(~Scheduler, ~Task);
|
||||
|
||||
pub enum SchedMessage {
|
||||
Wake,
|
||||
@ -1231,4 +1296,40 @@ mod test {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn dont_starve_2() {
|
||||
use rt::comm::oneshot;
|
||||
|
||||
do stress_factor().times {
|
||||
do run_in_newsched_task {
|
||||
let (port, chan) = oneshot();
|
||||
let (_port2, chan2) = stream();
|
||||
|
||||
// This task should not be able to starve the other task.
|
||||
// The sends should eventually yield.
|
||||
do spawntask {
|
||||
while !port.peek() {
|
||||
chan2.send(());
|
||||
}
|
||||
}
|
||||
|
||||
chan.send(());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Regression test for a logic bug that would cause single-threaded schedulers
|
||||
// to sleep forever after yielding and stealing another task.
|
||||
#[test]
|
||||
fn single_threaded_yield() {
|
||||
use task::{spawn, spawn_sched, SingleThreaded, deschedule};
|
||||
use num::Times;
|
||||
|
||||
do spawn_sched(SingleThreaded) {
|
||||
do 5.times { deschedule(); }
|
||||
}
|
||||
do spawn { }
|
||||
do spawn { }
|
||||
}
|
||||
}
|
||||
|
@ -542,12 +542,9 @@ pub fn deschedule() {
|
||||
use rt::local::Local;
|
||||
use rt::sched::Scheduler;
|
||||
|
||||
// FIXME #6842: What does yield really mean in newsched?
|
||||
// FIXME(#7544): Optimize this, since we know we won't block.
|
||||
let sched: ~Scheduler = Local::take();
|
||||
do sched.deschedule_running_task_and_then |sched, task| {
|
||||
sched.enqueue_blocked_task(task);
|
||||
}
|
||||
sched.yield_now();
|
||||
}
|
||||
|
||||
pub fn failing() -> bool {
|
||||
|
Loading…
Reference in New Issue
Block a user