core::rt: Convert users of local_sched to Local trait

This commit is contained in:
Brian Anderson 2013-05-19 15:45:39 -07:00
parent 18fab45aab
commit 06f1a64bd6
10 changed files with 143 additions and 145 deletions

View File

@ -20,7 +20,8 @@ use cast;
use util; use util;
use ops::Drop; use ops::Drop;
use kinds::Owned; use kinds::Owned;
use rt::sched::Coroutine; use rt::sched::{Scheduler, Coroutine};
use rt::local::Local;
use rt::local_sched; use rt::local_sched;
use unstable::intrinsics::{atomic_xchg, atomic_load}; use unstable::intrinsics::{atomic_xchg, atomic_load};
use util::Void; use util::Void;
@ -127,7 +128,7 @@ impl<T> ChanOne<T> {
task_as_state => { task_as_state => {
// Port is blocked. Wake it up. // Port is blocked. Wake it up.
let recvr: ~Coroutine = cast::transmute(task_as_state); let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr); sched.schedule_task(recvr);
} }
} }
@ -157,7 +158,7 @@ impl<T> PortOne<T> {
// XXX: Optimize this to not require the two context switches when data is available // XXX: Optimize this to not require the two context switches when data is available
// Switch to the scheduler to put the ~Task into the Packet state. // Switch to the scheduler to put the ~Task into the Packet state.
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| { do sched.deschedule_running_task_and_then |task| {
unsafe { unsafe {
// Atomically swap the task pointer into the Packet state, issuing // Atomically swap the task pointer into the Packet state, issuing
@ -173,7 +174,7 @@ impl<T> PortOne<T> {
STATE_ONE => { STATE_ONE => {
// Channel is closed. Switch back and check the data. // Channel is closed. Switch back and check the data.
let task: ~Coroutine = cast::transmute(task_as_state); let task: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task); sched.resume_task_immediately(task);
} }
_ => util::unreachable() _ => util::unreachable()
@ -239,7 +240,7 @@ impl<T> Drop for ChanOneHack<T> {
// The port is blocked waiting for a message we will never send. Wake it. // The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none()); assert!((*this.packet()).payload.is_none());
let recvr: ~Coroutine = cast::transmute(task_as_state); let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr); sched.schedule_task(recvr);
} }
} }

View File

@ -12,17 +12,51 @@ use rt::sched::Scheduler;
use rt::local_ptr; use rt::local_ptr;
pub trait Local { pub trait Local {
fn put_local(value: ~Self); fn put(value: ~Self);
fn take_local() -> ~Self; fn take() -> ~Self;
fn exists_local() -> bool; fn exists() -> bool;
fn borrow_local(f: &fn(&mut Self)); fn borrow(f: &fn(&mut Self));
unsafe fn unsafe_borrow_local() -> *mut Self; unsafe fn unsafe_borrow() -> *mut Self;
} }
impl Local for Scheduler { impl Local for Scheduler {
fn put_local(value: ~Scheduler) { unsafe { local_ptr::put(value) }} fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }}
fn take_local() -> ~Scheduler { unsafe { local_ptr::take() } } fn take() -> ~Scheduler { unsafe { local_ptr::take() } }
fn exists_local() -> bool { local_ptr::exists() } fn exists() -> bool { local_ptr::exists() }
fn borrow_local(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } } fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } }
unsafe fn unsafe_borrow_local() -> *mut Scheduler { local_ptr::unsafe_borrow() } unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() }
}
#[cfg(test)]
mod test {
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use super::*;
#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
}
let _scheduler: ~Scheduler = Local::take();
}
} }

View File

@ -25,56 +25,8 @@ use rt::local::Local;
#[cfg(test)] use rt::uv::uvio::UvEventLoop; #[cfg(test)] use rt::uv::uvio::UvEventLoop;
/// Give the Scheduler to thread-local storage
pub fn put(sched: ~Scheduler) { Local::put_local(sched) }
/// Take ownership of the Scheduler from thread-local storage
pub fn take() -> ~Scheduler { Local::take_local() }
/// Check whether there is a thread-local Scheduler attached to the running thread
pub fn exists() -> bool { Local::exists_local::<Scheduler>() }
/// Borrow the thread-local scheduler from thread-local storage.
/// While the scheduler is borrowed it is not available in TLS.
pub fn borrow(f: &fn(&mut Scheduler)) { Local::borrow_local(f) }
/// Borrow a mutable reference to the thread-local Scheduler
///
/// # Safety Note
///
/// Because this leaves the Scheduler in thread-local storage it is possible
/// For the Scheduler pointer to be aliased
pub unsafe fn unsafe_borrow() -> *mut Scheduler { Local::unsafe_borrow_local() }
pub unsafe fn unsafe_borrow_io() -> *mut IoFactoryObject { pub unsafe fn unsafe_borrow_io() -> *mut IoFactoryObject {
let sched = unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap(); let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
return io; return io;
} }
#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
let _scheduler = take();
}
#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
let _scheduler = take();
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
let _scheduler = take();
}
#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
unsafe {
let _scheduler = unsafe_borrow();
}
let _scheduler = take();
}

View File

@ -188,16 +188,17 @@ pub enum RuntimeContext {
pub fn context() -> RuntimeContext { pub fn context() -> RuntimeContext {
use task::rt::rust_task; use task::rt::rust_task;
use self::sched::local_sched; use self::local::Local;
use self::sched::{local_sched, Scheduler};
// XXX: Hitting TLS twice to check if the scheduler exists // XXX: Hitting TLS twice to check if the scheduler exists
// then to check for the task is not good for perf // then to check for the task is not good for perf
if unsafe { rust_try_get_task().is_not_null() } { if unsafe { rust_try_get_task().is_not_null() } {
return OldTaskContext; return OldTaskContext;
} else { } else {
if local_sched::exists() { if Local::exists::<Scheduler>() {
let context = ::cell::empty_cell(); let context = ::cell::empty_cell();
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
if sched.in_task_context() { if sched.in_task_context() {
context.put_back(TaskContext); context.put_back(TaskContext);
} else { } else {
@ -219,9 +220,10 @@ pub fn context() -> RuntimeContext {
#[test] #[test]
fn test_context() { fn test_context() {
use unstable::run_in_bare_thread; use unstable::run_in_bare_thread;
use self::sched::{local_sched, Coroutine}; use self::sched::{local_sched, Scheduler, Coroutine};
use rt::uv::uvio::UvEventLoop; use rt::uv::uvio::UvEventLoop;
use cell::Cell; use cell::Cell;
use rt::local::Local;
assert_eq!(context(), OldTaskContext); assert_eq!(context(), OldTaskContext);
do run_in_bare_thread { do run_in_bare_thread {
@ -229,11 +231,11 @@ fn test_context() {
let mut sched = ~UvEventLoop::new_scheduler(); let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Coroutine::new(&mut sched.stack_pool) { let task = ~do Coroutine::new(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext); assert_eq!(context(), TaskContext);
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |task| { do sched.deschedule_running_task_and_then() |task| {
assert_eq!(context(), SchedulerContext); assert_eq!(context(), SchedulerContext);
let task = Cell(task); let task = Cell(task);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take()); sched.enqueue_task(task.take());
} }
} }

View File

@ -19,6 +19,7 @@ use super::rtio::{EventLoop, EventLoopObject};
use super::context::Context; use super::context::Context;
use super::task::Task; use super::task::Task;
use rt::local_ptr; use rt::local_ptr;
use rt::local::Local;
// A more convenient name for external callers, e.g. `local_sched::take()` // A more convenient name for external callers, e.g. `local_sched::take()`
pub mod local_sched; pub mod local_sched;
@ -94,12 +95,12 @@ pub impl Scheduler {
}; };
// Give ownership of the scheduler (self) to the thread // Give ownership of the scheduler (self) to the thread
local_sched::put(self_sched); Local::put(self_sched);
(*event_loop).run(); (*event_loop).run();
} }
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
assert!(sched.work_queue.is_empty()); assert!(sched.work_queue.is_empty());
return sched; return sched;
} }
@ -114,7 +115,7 @@ pub impl Scheduler {
self.event_loop.callback(resume_task_from_queue); self.event_loop.callback(resume_task_from_queue);
fn resume_task_from_queue() { fn resume_task_from_queue() {
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_from_queue(); scheduler.resume_task_from_queue();
} }
} }
@ -134,7 +135,7 @@ pub impl Scheduler {
} }
None => { None => {
rtdebug!("no tasks in queue"); rtdebug!("no tasks in queue");
local_sched::put(this); Local::put(this);
} }
} }
} }
@ -150,7 +151,7 @@ pub impl Scheduler {
do self.deschedule_running_task_and_then |dead_task| { do self.deschedule_running_task_and_then |dead_task| {
let dead_task = Cell(dead_task); let dead_task = Cell(dead_task);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
dead_task.take().recycle(&mut sched.stack_pool); dead_task.take().recycle(&mut sched.stack_pool);
} }
} }
@ -163,7 +164,7 @@ pub impl Scheduler {
do self.switch_running_tasks_and_then(task) |last_task| { do self.switch_running_tasks_and_then(task) |last_task| {
let last_task = Cell(last_task); let last_task = Cell(last_task);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(last_task.take()); sched.enqueue_task(last_task.take());
} }
} }
@ -174,7 +175,7 @@ pub impl Scheduler {
do self.switch_running_tasks_and_then(task) |last_task| { do self.switch_running_tasks_and_then(task) |last_task| {
let last_task = Cell(last_task); let last_task = Cell(last_task);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(last_task.take()); sched.enqueue_task(last_task.take());
} }
} }
@ -192,18 +193,18 @@ pub impl Scheduler {
this.current_task = Some(task); this.current_task = Some(task);
this.enqueue_cleanup_job(DoNothing); this.enqueue_cleanup_job(DoNothing);
local_sched::put(this); Local::put(this);
// Take pointers to both the task and scheduler's saved registers. // Take pointers to both the task and scheduler's saved registers.
unsafe { unsafe {
let sched = local_sched::unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
let (sched_context, _, next_task_context) = (*sched).get_contexts(); let (sched_context, _, next_task_context) = (*sched).get_contexts();
let next_task_context = next_task_context.unwrap(); let next_task_context = next_task_context.unwrap();
// Context switch to the task, restoring it's registers // Context switch to the task, restoring it's registers
// and saving the scheduler's // and saving the scheduler's
Context::swap(sched_context, next_task_context); Context::swap(sched_context, next_task_context);
let sched = local_sched::unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
// The running task should have passed ownership elsewhere // The running task should have passed ownership elsewhere
assert!((*sched).current_task.is_none()); assert!((*sched).current_task.is_none());
@ -233,16 +234,16 @@ pub impl Scheduler {
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
} }
local_sched::put(this); Local::put(this);
unsafe { unsafe {
let sched = local_sched::unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
let (sched_context, last_task_context, _) = (*sched).get_contexts(); let (sched_context, last_task_context, _) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap(); let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context); Context::swap(last_task_context, sched_context);
// We could be executing in a different thread now // We could be executing in a different thread now
let sched = local_sched::unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job(); (*sched).run_cleanup_job();
} }
} }
@ -262,17 +263,17 @@ pub impl Scheduler {
this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque)); this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
this.current_task = Some(next_task); this.current_task = Some(next_task);
local_sched::put(this); Local::put(this);
unsafe { unsafe {
let sched = local_sched::unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
let (_, last_task_context, next_task_context) = (*sched).get_contexts(); let (_, last_task_context, next_task_context) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap(); let last_task_context = last_task_context.unwrap();
let next_task_context = next_task_context.unwrap(); let next_task_context = next_task_context.unwrap();
Context::swap(last_task_context, next_task_context); Context::swap(last_task_context, next_task_context);
// We could be executing in a different thread now // We could be executing in a different thread now
let sched = local_sched::unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job(); (*sched).run_cleanup_job();
} }
} }
@ -377,16 +378,16 @@ pub impl Coroutine {
// context switch to the task. The previous context may // context switch to the task. The previous context may
// have asked us to do some cleanup. // have asked us to do some cleanup.
unsafe { unsafe {
let sched = local_sched::unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job(); (*sched).run_cleanup_job();
let sched = local_sched::unsafe_borrow(); let sched = Local::unsafe_borrow::<Scheduler>();
let task = (*sched).current_task.get_mut_ref(); let task = (*sched).current_task.get_mut_ref();
// FIXME #6141: shouldn't neet to put `start()` in another closure // FIXME #6141: shouldn't neet to put `start()` in another closure
task.task.run(||start()); task.task.run(||start());
} }
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.terminate_current_task(); sched.terminate_current_task();
}; };
return wrapper; return wrapper;
@ -409,6 +410,7 @@ mod test {
use rt::uv::uvio::UvEventLoop; use rt::uv::uvio::UvEventLoop;
use unstable::run_in_bare_thread; use unstable::run_in_bare_thread;
use task::spawn; use task::spawn;
use rt::local::Local;
use rt::test::*; use rt::test::*;
use super::*; use super::*;
@ -456,14 +458,14 @@ mod test {
let mut sched = ~UvEventLoop::new_scheduler(); let mut sched = ~UvEventLoop::new_scheduler();
let task1 = ~do Coroutine::new(&mut sched.stack_pool) { let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; } unsafe { *count_ptr = *count_ptr + 1; }
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
let task2 = ~do Coroutine::new(&mut sched.stack_pool) { let task2 = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; } unsafe { *count_ptr = *count_ptr + 1; }
}; };
// Context switch directly to the new task // Context switch directly to the new task
do sched.switch_running_tasks_and_then(task2) |task1| { do sched.switch_running_tasks_and_then(task2) |task1| {
let task1 = Cell(task1); let task1 = Cell(task1);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task1.take()); sched.enqueue_task(task1.take());
} }
} }
@ -493,7 +495,7 @@ mod test {
assert_eq!(count, MAX); assert_eq!(count, MAX);
fn run_task(count_ptr: *mut int) { fn run_task(count_ptr: *mut int) {
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
let task = ~do Coroutine::new(&mut sched.stack_pool) { let task = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { unsafe {
*count_ptr = *count_ptr + 1; *count_ptr = *count_ptr + 1;
@ -513,11 +515,11 @@ mod test {
do run_in_bare_thread { do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler(); let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Coroutine::new(&mut sched.stack_pool) { let task = ~do Coroutine::new(&mut sched.stack_pool) {
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
assert!(sched.in_task_context()); assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |task| { do sched.deschedule_running_task_and_then() |task| {
let task = Cell(task); let task = Cell(task);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
assert!(!sched.in_task_context()); assert!(!sched.in_task_context());
sched.enqueue_task(task.take()); sched.enqueue_task(task.take());
} }
@ -536,17 +538,17 @@ mod test {
// exit before emptying the work queue // exit before emptying the work queue
do run_in_newsched_task { do run_in_newsched_task {
do spawn { do spawn {
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| { do sched.deschedule_running_task_and_then |task| {
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
let task = Cell(task); let task = Cell(task);
do sched.event_loop.callback_ms(10) { do sched.event_loop.callback_ms(10) {
rtdebug!("in callback"); rtdebug!("in callback");
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
sched.enqueue_task(task.take()); sched.enqueue_task(task.take());
local_sched::put(sched); Local::put(sched);
} }
local_sched::put(sched); Local::put(sched);
} }
} }
} }

View File

@ -16,7 +16,8 @@
use prelude::*; use prelude::*;
use libc::{c_void, uintptr_t}; use libc::{c_void, uintptr_t};
use cast::transmute; use cast::transmute;
use super::sched::local_sched; use super::sched::{Scheduler, local_sched};
use rt::local::Local;
use super::local_heap::LocalHeap; use super::local_heap::LocalHeap;
use rt::logging::StdErrLogger; use rt::logging::StdErrLogger;
@ -152,7 +153,7 @@ impl Unwinder {
/// Borrow a pointer to the installed local services. /// Borrow a pointer to the installed local services.
/// Fails (likely aborting the process) if local services are not available. /// Fails (likely aborting the process) if local services are not available.
pub fn borrow_local_task(f: &fn(&mut Task)) { pub fn borrow_local_task(f: &fn(&mut Task)) {
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
match sched.current_task { match sched.current_task {
Some(~ref mut task) => { Some(~ref mut task) => {
f(&mut *task.task) f(&mut *task.task)
@ -165,7 +166,7 @@ pub fn borrow_local_task(f: &fn(&mut Task)) {
} }
pub unsafe fn unsafe_borrow_local_task() -> *mut Task { pub unsafe fn unsafe_borrow_local_task() -> *mut Task {
match (*local_sched::unsafe_borrow()).current_task { match (*Local::unsafe_borrow::<Scheduler>()).current_task {
Some(~ref mut task) => { Some(~ref mut task) => {
let s: *mut Task = &mut *task.task; let s: *mut Task = &mut *task.task;
return s; return s;
@ -178,7 +179,7 @@ pub unsafe fn unsafe_borrow_local_task() -> *mut Task {
} }
pub unsafe fn unsafe_try_borrow_local_task() -> Option<*mut Task> { pub unsafe fn unsafe_try_borrow_local_task() -> Option<*mut Task> {
if local_sched::exists() { if Local::exists::<Scheduler>() {
Some(unsafe_borrow_local_task()) Some(unsafe_borrow_local_task())
} else { } else {
None None

View File

@ -15,6 +15,7 @@ use result::{Result, Ok, Err};
use super::io::net::ip::{IpAddr, Ipv4}; use super::io::net::ip::{IpAddr, Ipv4};
use rt::task::Task; use rt::task::Task;
use rt::thread::Thread; use rt::thread::Thread;
use rt::local::Local;
/// Creates a new scheduler in a new thread and runs a task in it, /// Creates a new scheduler in a new thread and runs a task in it,
/// then waits for the scheduler to exit. Failure of the task /// then waits for the scheduler to exit. Failure of the task
@ -40,13 +41,13 @@ pub fn run_in_newsched_task(f: ~fn()) {
pub fn spawntask(f: ~fn()) { pub fn spawntask(f: ~fn()) {
use super::sched::*; use super::sched::*;
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool, let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(), ~Task::without_unwinding(),
f); f);
do sched.switch_running_tasks_and_then(task) |task| { do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task); let task = Cell(task);
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.schedule_new_task(task.take()); sched.schedule_new_task(task.take());
} }
} }
@ -55,13 +56,13 @@ pub fn spawntask(f: ~fn()) {
pub fn spawntask_immediately(f: ~fn()) { pub fn spawntask_immediately(f: ~fn()) {
use super::sched::*; use super::sched::*;
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool, let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(), ~Task::without_unwinding(),
f); f);
do sched.switch_running_tasks_and_then(task) |task| { do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task); let task = Cell(task);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take()); sched.enqueue_task(task.take());
} }
} }
@ -71,13 +72,13 @@ pub fn spawntask_immediately(f: ~fn()) {
pub fn spawntask_later(f: ~fn()) { pub fn spawntask_later(f: ~fn()) {
use super::sched::*; use super::sched::*;
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool, let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(), ~Task::without_unwinding(),
f); f);
sched.enqueue_task(task); sched.enqueue_task(task);
local_sched::put(sched); Local::put(sched);
} }
/// Spawn a task and either run it immediately or run it later /// Spawn a task and either run it immediately or run it later
@ -88,7 +89,7 @@ pub fn spawntask_random(f: ~fn()) {
let mut rng = rng(); let mut rng = rng();
let run_now: bool = Rand::rand(&mut rng); let run_now: bool = Rand::rand(&mut rng);
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool, let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(), ~Task::without_unwinding(),
f); f);
@ -96,13 +97,13 @@ pub fn spawntask_random(f: ~fn()) {
if run_now { if run_now {
do sched.switch_running_tasks_and_then(task) |task| { do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task); let task = Cell(task);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take()); sched.enqueue_task(task.take());
} }
} }
} else { } else {
sched.enqueue_task(task); sched.enqueue_task(task);
local_sched::put(sched); Local::put(sched);
} }
} }
@ -120,21 +121,21 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
// Switch to the scheduler // Switch to the scheduler
let f = Cell(Cell(f)); let f = Cell(Cell(f));
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |old_task| { do sched.deschedule_running_task_and_then() |old_task| {
let old_task = Cell(old_task); let old_task = Cell(old_task);
let f = f.take(); let f = f.take();
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
let new_task = ~do Coroutine::new(&mut sched.stack_pool) { let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
do (|| { do (|| {
(f.take())() (f.take())()
}).finally { }).finally {
// Check for failure then resume the parent task // Check for failure then resume the parent task
unsafe { *failed_ptr = task::failing(); } unsafe { *failed_ptr = task::failing(); }
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(old_task.take()) |new_task| { do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
let new_task = Cell(new_task); let new_task = Cell(new_task);
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(new_task.take()); sched.enqueue_task(new_task.take());
} }
} }

View File

@ -16,9 +16,10 @@
use option::*; use option::*;
use clone::Clone; use clone::Clone;
use super::rc::RC; use super::rc::RC;
use rt::sched::Coroutine; use rt::sched::{Scheduler, Coroutine};
use rt::{context, TaskContext, SchedulerContext}; use rt::{context, TaskContext, SchedulerContext};
use rt::local_sched; use rt::local_sched;
use rt::local::Local;
use vec::OwnedVector; use vec::OwnedVector;
use container::Container; use container::Container;
@ -53,7 +54,7 @@ impl<T> Tube<T> {
// There's a waiting task. Wake it up // There's a waiting task. Wake it up
rtdebug!("waking blocked tube"); rtdebug!("waking blocked tube");
let task = (*state).blocked_task.swap_unwrap(); let task = (*state).blocked_task.swap_unwrap();
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task); sched.resume_task_immediately(task);
} }
} }
@ -71,7 +72,7 @@ impl<T> Tube<T> {
rtdebug!("blocking on tube recv"); rtdebug!("blocking on tube recv");
assert!(self.p.refcount() > 1); // There better be somebody to wake us up assert!(self.p.refcount() > 1); // There better be somebody to wake us up
assert!((*state).blocked_task.is_none()); assert!((*state).blocked_task.is_none());
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| { do sched.deschedule_running_task_and_then |task| {
(*state).blocked_task = Some(task); (*state).blocked_task = Some(task);
} }
@ -97,6 +98,8 @@ mod test {
use rt::local_sched; use rt::local_sched;
use rt::test::*; use rt::test::*;
use rt::rtio::EventLoop; use rt::rtio::EventLoop;
use rt::sched::Scheduler;
use rt::local::Local;
use super::*; use super::*;
#[test] #[test]
@ -105,11 +108,11 @@ mod test {
let mut tube: Tube<int> = Tube::new(); let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone(); let tube_clone = tube.clone();
let tube_clone_cell = Cell(tube_clone); let tube_clone_cell = Cell(tube_clone);
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| { do sched.deschedule_running_task_and_then |task| {
let mut tube_clone = tube_clone_cell.take(); let mut tube_clone = tube_clone_cell.take();
tube_clone.send(1); tube_clone.send(1);
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task); sched.resume_task_immediately(task);
} }
@ -123,10 +126,10 @@ mod test {
let mut tube: Tube<int> = Tube::new(); let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone(); let tube_clone = tube.clone();
let tube_clone = Cell(Cell(Cell(tube_clone))); let tube_clone = Cell(Cell(Cell(tube_clone)));
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| { do sched.deschedule_running_task_and_then |task| {
let tube_clone = tube_clone.take(); let tube_clone = tube_clone.take();
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
let tube_clone = tube_clone.take(); let tube_clone = tube_clone.take();
do sched.event_loop.callback { do sched.event_loop.callback {
let mut tube_clone = tube_clone.take(); let mut tube_clone = tube_clone.take();
@ -135,7 +138,7 @@ mod test {
tube_clone.send(1); tube_clone.send(1);
} }
} }
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task); sched.resume_task_immediately(task);
} }
@ -151,7 +154,7 @@ mod test {
let mut tube: Tube<int> = Tube::new(); let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone(); let tube_clone = tube.clone();
let tube_clone = Cell(tube_clone); let tube_clone = Cell(tube_clone);
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| { do sched.deschedule_running_task_and_then |task| {
callback_send(tube_clone.take(), 0); callback_send(tube_clone.take(), 0);
@ -159,7 +162,7 @@ mod test {
if i == 100 { return; } if i == 100 { return; }
let tube = Cell(Cell(tube)); let tube = Cell(Cell(tube));
do local_sched::borrow |sched| { do Local::borrow::<Scheduler> |sched| {
let tube = tube.take(); let tube = tube.take();
do sched.event_loop.callback { do sched.event_loop.callback {
let mut tube = tube.take(); let mut tube = tube.take();
@ -171,7 +174,7 @@ mod test {
} }
} }
let sched = local_sched::take(); let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task); sched.resume_task_immediately(task);
} }

View File

@ -22,6 +22,7 @@ use rt::rtio::*;
use rt::sched::{Scheduler, local_sched}; use rt::sched::{Scheduler, local_sched};
use rt::io::{standard_error, OtherIoError}; use rt::io::{standard_error, OtherIoError};
use rt::tube::Tube; use rt::tube::Tube;
use rt::local::Local;
#[cfg(test)] use container::Container; #[cfg(test)] use container::Container;
#[cfg(test)] use uint; #[cfg(test)] use uint;
@ -118,14 +119,14 @@ impl IoFactory for UvIoFactory {
let result_cell = empty_cell(); let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell; let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context()); assert!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context // Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |task| { do scheduler.deschedule_running_task_and_then |task| {
rtdebug!("connect: entered scheduler context"); rtdebug!("connect: entered scheduler context");
do local_sched::borrow |scheduler| { do Local::borrow::<Scheduler> |scheduler| {
assert!(!scheduler.in_task_context()); assert!(!scheduler.in_task_context());
} }
let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
@ -142,7 +143,7 @@ impl IoFactory for UvIoFactory {
unsafe { (*result_cell_ptr).put_back(res); } unsafe { (*result_cell_ptr).put_back(res); }
// Context switch // Context switch
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take()); scheduler.resume_task_immediately(task_cell.take());
} else { } else {
rtdebug!("status is some"); rtdebug!("status is some");
@ -150,7 +151,7 @@ impl IoFactory for UvIoFactory {
do stream_watcher.close { do stream_watcher.close {
let res = Err(uv_error_to_io_error(status.get())); let res = Err(uv_error_to_io_error(status.get()));
unsafe { (*result_cell_ptr).put_back(res); } unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take()); scheduler.resume_task_immediately(task_cell.take());
} }
}; };
@ -166,11 +167,11 @@ impl IoFactory for UvIoFactory {
match watcher.bind(addr) { match watcher.bind(addr) {
Ok(_) => Ok(~UvTcpListener::new(watcher)), Ok(_) => Ok(~UvTcpListener::new(watcher)),
Err(uverr) => { Err(uverr) => {
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| { do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task); let task_cell = Cell(task);
do watcher.as_stream().close { do watcher.as_stream().close {
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take()); scheduler.resume_task_immediately(task_cell.take());
} }
} }
@ -202,11 +203,11 @@ impl UvTcpListener {
impl Drop for UvTcpListener { impl Drop for UvTcpListener {
fn finalize(&self) { fn finalize(&self) {
let watcher = self.watcher(); let watcher = self.watcher();
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| { do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task); let task_cell = Cell(task);
do watcher.as_stream().close { do watcher.as_stream().close {
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take()); scheduler.resume_task_immediately(task_cell.take());
} }
} }
@ -264,11 +265,11 @@ impl Drop for UvTcpStream {
fn finalize(&self) { fn finalize(&self) {
rtdebug!("closing tcp stream"); rtdebug!("closing tcp stream");
let watcher = self.watcher(); let watcher = self.watcher();
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| { do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task); let task_cell = Cell(task);
do watcher.close { do watcher.close {
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take()); scheduler.resume_task_immediately(task_cell.take());
} }
} }
@ -280,13 +281,13 @@ impl RtioTcpStream for UvTcpStream {
let result_cell = empty_cell(); let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell; let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context()); assert!(scheduler.in_task_context());
let watcher = self.watcher(); let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf; let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| { do scheduler.deschedule_running_task_and_then |task| {
rtdebug!("read: entered scheduler context"); rtdebug!("read: entered scheduler context");
do local_sched::borrow |scheduler| { do Local::borrow::<Scheduler> |scheduler| {
assert!(!scheduler.in_task_context()); assert!(!scheduler.in_task_context());
} }
let mut watcher = watcher; let mut watcher = watcher;
@ -314,7 +315,7 @@ impl RtioTcpStream for UvTcpStream {
unsafe { (*result_cell_ptr).put_back(result); } unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take()); scheduler.resume_task_immediately(task_cell.take());
} }
} }
@ -326,7 +327,7 @@ impl RtioTcpStream for UvTcpStream {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> { fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let result_cell = empty_cell(); let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell; let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context()); assert!(scheduler.in_task_context());
let watcher = self.watcher(); let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf; let buf_ptr: *&[u8] = &buf;
@ -343,7 +344,7 @@ impl RtioTcpStream for UvTcpStream {
unsafe { (*result_cell_ptr).put_back(result); } unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take()); scheduler.resume_task_immediately(task_cell.take());
} }
} }
@ -420,13 +421,13 @@ fn test_read_and_block() {
} }
reads += 1; reads += 1;
let scheduler = local_sched::take(); let scheduler = Local::take::<Scheduler>();
// Yield to the other task in hopes that it // Yield to the other task in hopes that it
// will trigger a read callback while we are // will trigger a read callback while we are
// not ready for it // not ready for it
do scheduler.deschedule_running_task_and_then |task| { do scheduler.deschedule_running_task_and_then |task| {
let task = Cell(task); let task = Cell(task);
do local_sched::borrow |scheduler| { do Local::borrow::<Scheduler> |scheduler| {
scheduler.enqueue_task(task.take()); scheduler.enqueue_task(task.take());
} }
} }

View File

@ -90,6 +90,7 @@ use task::unkillable;
use uint; use uint;
use util; use util;
use unstable::sync::{Exclusive, exclusive}; use unstable::sync::{Exclusive, exclusive};
use rt::local::Local;
#[cfg(test)] use task::default_task_opts; #[cfg(test)] use task::default_task_opts;
@ -575,7 +576,7 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) { fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) {
use rt::sched::*; use rt::sched::*;
let mut sched = local_sched::take(); let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::new(&mut sched.stack_pool, f); let task = ~Coroutine::new(&mut sched.stack_pool, f);
sched.schedule_new_task(task); sched.schedule_new_task(task);
} }