From 8e966216c7c59c1b1de08561502f0b78589db59a Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Thu, 11 Apr 2013 17:34:52 -0700 Subject: [PATCH 01/14] core::rt: Restructure task_from_last_cleanup_job to borrow correctly We need a number of mutable references to contexts so name it `get_contexts` and return a tuple of all of them. --- src/libcore/rt/sched.rs | 113 +++++++++++++++++++++++----------------- 1 file changed, 64 insertions(+), 49 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 46ea5713e2a..eff25d22882 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -151,7 +151,16 @@ pub impl Scheduler { // Store the task in the scheduler so it can be grabbed later self.current_task = Some(task); - self.swap_in_task(); + + // Take pointers to both the task and scheduler's saved registers. + { + let (sched_context, _, next_task_context) = self.get_contexts(); + let next_task_context = next_task_context.unwrap(); + // Context switch to the task, restoring it's registers + // and saving the scheduler's + Context::swap(sched_context, next_task_context); + } + // The running task should have passed ownership elsewhere assert!(self.current_task.is_none()); @@ -171,8 +180,11 @@ pub impl Scheduler { let dead_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RecycleTask(dead_task)); - let dead_task = self.task_from_last_cleanup_job(); - self.swap_out_task(dead_task); + { + let (sched_context, last_task_context, _) = self.get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); + } } /// Block a running task, context switch to the scheduler, then pass the @@ -194,9 +206,13 @@ pub impl Scheduler { }; let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region); self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - let blocked_task = self.task_from_last_cleanup_job(); + { + let (sched_context, last_task_context, _) = self.get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); + } - self.swap_out_task(blocked_task); + // XXX: Should probably run cleanup jobs } /// Switch directly to another task, without going through the scheduler. @@ -209,43 +225,17 @@ pub impl Scheduler { let old_running_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RescheduleTask(old_running_task)); - let old_running_task = self.task_from_last_cleanup_job(); - self.current_task = Some(next_task); - self.swap_in_task_from_running_task(old_running_task); + { + let (_, last_task_context, next_task_context) = self.get_contexts(); + let last_task_context = last_task_context.unwrap(); + let next_task_context = next_task_context.unwrap(); + Context::swap(last_task_context, next_task_context); + } + + // XXX: Should probably run cleanup jobs } - - // * Context switching - - // NB: When switching to a task callers are expected to first set - // self.running_task. When switching away from a task likewise move - // out of the self.running_task - - priv fn swap_in_task(&mut self) { - // Take pointers to both the task and scheduler's saved registers. - let running_task: &~Task = self.current_task.get_ref(); - let task_context = &running_task.saved_context; - let scheduler_context = &mut self.saved_context; - - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(scheduler_context, task_context); - } - - priv fn swap_out_task(&mut self, running_task: &mut Task) { - let task_context = &mut running_task.saved_context; - let scheduler_context = &self.saved_context; - Context::swap(task_context, scheduler_context); - } - - priv fn swap_in_task_from_running_task(&mut self, running_task: &mut Task) { - let running_task_context = &mut running_task.saved_context; - let next_context = &self.current_task.get_ref().saved_context; - Context::swap(running_task_context, next_context); - } - - // * Other stuff fn in_task_context(&self) -> bool { self.current_task.is_some() } @@ -270,20 +260,42 @@ pub impl Scheduler { } } - // XXX: Hack. This should return &'self mut but I don't know how to - // make the borrowcheck happy - fn task_from_last_cleanup_job(&mut self) -> &mut Task { - assert!(!self.cleanup_jobs.is_empty()); - let last_job: &'self mut CleanupJob = &mut self.cleanup_jobs[0]; - let last_task: &'self Task = match last_job { - &RescheduleTask(~ref task) => task, - &RecycleTask(~ref task) => task, - &GiveTask(~ref task, _) => task, + /// Get mutable references to all the contexts that may be involved in a + /// context switch. + /// + /// Returns (the scheduler context, the optional context of the + /// task in the cleanup list, the optional context of the task in + /// the current task slot). When context switching to a task, + /// callers should first arrange for that task to be located in the + /// Scheduler's current_task slot and set up the + /// post-context-switch cleanup job. + fn get_contexts(&mut self) -> (&'self mut Context, + Option<&'self mut Context>, + Option<&'self mut Context>) { + let last_task = if !self.cleanup_jobs.is_empty() { + let last_job: &'self mut CleanupJob = &mut self.cleanup_jobs[0]; + let last_task: &'self Task = match last_job { + &RescheduleTask(~ref task) => task, + &RecycleTask(~ref task) => task, + &GiveTask(~ref task, _) => task, + }; + Some(last_task) + } else { + None }; // XXX: Pattern matching mutable pointers above doesn't work // because borrowck thinks the three patterns are conflicting // borrows - return unsafe { transmute::<&Task, &mut Task>(last_task) }; + let last_task = unsafe { transmute::, Option<&mut Task>>(last_task) }; + let last_task_context = match last_task { + Some(ref t) => Some(&mut t.saved_context), None => None + }; + let next_task_context = match self.current_task { + Some(ref mut t) => Some(&mut t.saved_context), None => None + }; + return (&mut self.saved_context, + last_task_context, + next_task_context); } } @@ -313,6 +325,9 @@ pub impl Task { priv fn build_start_wrapper(start: ~fn()) -> ~fn() { // XXX: The old code didn't have this extra allocation let wrapper: ~fn() = || { + // XXX: Should probably run scheduler cleanup jobs for situations + // where a task context switches directly to a new task + start(); let mut sched = ThreadLocalScheduler::new(); From 1cb78e736f89cf286d454d7af05b470d570be1a7 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 14 Apr 2013 16:12:30 -0700 Subject: [PATCH 02/14] core::rt: Rename block_running_task_and_then to deschedul_... --- src/libcore/rt/sched.rs | 4 ++-- src/libcore/rt/uvio.rs | 10 +++++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index eff25d22882..2160d62fc53 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -195,7 +195,7 @@ pub impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - fn block_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) { + fn deschedule_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) { assert!(self.in_task_context()); rtdebug!("blocking task"); @@ -566,7 +566,7 @@ fn test_block_task() { let task = ~do Task::new(&mut sched.stack_pool) { do Scheduler::local |sched| { assert!(sched.in_task_context()); - do sched.block_running_task_and_then() |sched, task| { + do sched.deschedule_running_task_and_then() |sched, task| { assert!(!sched.in_task_context()); sched.task_queue.push_back(task); } diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index 37f29d1a5c0..97c65b39f9c 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -108,7 +108,7 @@ impl IoFactory for UvIoFactory { assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.block_running_task_and_then |scheduler, task| { + do scheduler.deschedule_running_task_and_then |scheduler, task| { rtdebug!("connect: entered scheduler context"); assert!(!scheduler.in_task_context()); @@ -181,7 +181,7 @@ impl TcpListener for UvTcpListener { do Scheduler::local |scheduler| { assert!(scheduler.in_task_context()); - do scheduler.block_running_task_and_then |_, task| { + do scheduler.deschedule_running_task_and_then |_, task| { let task_cell = Cell(task); let mut server_tcp_watcher = server_tcp_watcher; do server_tcp_watcher.listen |server_stream_watcher, status| { @@ -247,7 +247,7 @@ impl Stream for UvStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.block_running_task_and_then |scheduler, task| { + do scheduler.deschedule_running_task_and_then |scheduler, task| { rtdebug!("read: entered scheduler context"); assert!(!scheduler.in_task_context()); let mut watcher = watcher; @@ -293,7 +293,7 @@ impl Stream for UvStream { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; - do scheduler.block_running_task_and_then |_, task| { + do scheduler.deschedule_running_task_and_then |_, task| { let mut watcher = watcher; let task_cell = Cell(task); let buf = unsafe { &*buf_ptr }; @@ -420,7 +420,7 @@ fn test_read_and_block() { // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.block_running_task_and_then |scheduler, task| { + do scheduler.deschedule_running_task_and_then |scheduler, task| { scheduler.task_queue.push_back(task); } } From 6b084bad250e58c4003a1df40584f6385f44aac1 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 14 Apr 2013 16:25:33 -0700 Subject: [PATCH 03/14] core::rt: Insert calls to run_cleanup_tasks after all context switches --- src/libcore/rt/sched.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 2160d62fc53..62262a5a520 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -212,7 +212,7 @@ pub impl Scheduler { Context::swap(last_task_context, sched_context); } - // XXX: Should probably run cleanup jobs + self.run_cleanup_jobs(); } /// Switch directly to another task, without going through the scheduler. @@ -233,7 +233,7 @@ pub impl Scheduler { Context::swap(last_task_context, next_task_context); } - // XXX: Should probably run cleanup jobs + self.run_cleanup_jobs(); } // * Other stuff @@ -245,7 +245,6 @@ pub impl Scheduler { } fn run_cleanup_jobs(&mut self) { - assert!(!self.in_task_context()); rtdebug!("running cleanup jobs"); while !self.cleanup_jobs.is_empty() { @@ -325,8 +324,12 @@ pub impl Task { priv fn build_start_wrapper(start: ~fn()) -> ~fn() { // XXX: The old code didn't have this extra allocation let wrapper: ~fn() = || { - // XXX: Should probably run scheduler cleanup jobs for situations - // where a task context switches directly to a new task + // This is the first code to execute after the initial + // context switch to the task. The previous context may + // have asked us to do some cleanup. + let mut sched = ThreadLocalScheduler::new(); + let sched = sched.get_scheduler(); + sched.run_cleanup_jobs(); start(); From 5f52aecb1e7a53bf46280d057654746c34e89859 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 14 Apr 2013 19:24:43 -0700 Subject: [PATCH 04/14] core::rt: Schedulers only need a single cleanup_job at a time Each context switch has up to one cleanup job and it is always executed immediately after the context switch. --- src/libcore/rt/sched.rs | 62 ++++++++++++++++++++--------------------- 1 file changed, 31 insertions(+), 31 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 62262a5a520..11934d676e6 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -38,12 +38,9 @@ pub struct Scheduler { priv saved_context: Context, /// The currently executing task priv current_task: Option<~Task>, - /// A queue of jobs to perform immediately upon return from task - /// context to scheduler context. - /// XXX: This probably should be a single cleanup action and it - /// should run after a context switch, not on return from the - /// scheduler - priv cleanup_jobs: ~[CleanupJob] + /// An action performed after a context switch on behalf of the + /// code running before the context switch + priv cleanup_job: Option } // XXX: Some hacks to put a &fn in Scheduler without borrowck @@ -84,7 +81,7 @@ pub impl Scheduler { stack_pool: StackPool::new(), saved_context: Context::empty(), current_task: None, - cleanup_jobs: ~[] + cleanup_job: None } } @@ -165,7 +162,7 @@ pub impl Scheduler { assert!(self.current_task.is_none()); // Running tasks may have asked us to do some cleanup - self.run_cleanup_jobs(); + self.run_cleanup_job(); } @@ -212,7 +209,7 @@ pub impl Scheduler { Context::swap(last_task_context, sched_context); } - self.run_cleanup_jobs(); + self.run_cleanup_job(); } /// Switch directly to another task, without going through the scheduler. @@ -233,7 +230,7 @@ pub impl Scheduler { Context::swap(last_task_context, next_task_context); } - self.run_cleanup_jobs(); + self.run_cleanup_job(); } // * Other stuff @@ -241,21 +238,25 @@ pub impl Scheduler { fn in_task_context(&self) -> bool { self.current_task.is_some() } fn enqueue_cleanup_job(&mut self, job: CleanupJob) { - self.cleanup_jobs.unshift(job); + assert!(self.cleanup_job.is_none()); + self.cleanup_job = Some(job); } - fn run_cleanup_jobs(&mut self) { + fn run_cleanup_job(&mut self) { rtdebug!("running cleanup jobs"); - while !self.cleanup_jobs.is_empty() { - match self.cleanup_jobs.pop() { - RescheduleTask(task) => { - // NB: Pushing to the *front* of the queue - self.task_queue.push_front(task); - } - RecycleTask(task) => task.recycle(&mut self.stack_pool), - GiveTask(task, f) => (f.to_fn())(self, task) + if self.cleanup_job.is_none() { + return; + } + + let cleanup_job = self.cleanup_job.swap_unwrap(); + match cleanup_job { + RescheduleTask(task) => { + // NB: Pushing to the *front* of the queue + self.task_queue.push_front(task); } + RecycleTask(task) => task.recycle(&mut self.stack_pool), + GiveTask(task, f) => (f.to_fn())(self, task) } } @@ -271,16 +272,15 @@ pub impl Scheduler { fn get_contexts(&mut self) -> (&'self mut Context, Option<&'self mut Context>, Option<&'self mut Context>) { - let last_task = if !self.cleanup_jobs.is_empty() { - let last_job: &'self mut CleanupJob = &mut self.cleanup_jobs[0]; - let last_task: &'self Task = match last_job { - &RescheduleTask(~ref task) => task, - &RecycleTask(~ref task) => task, - &GiveTask(~ref task, _) => task, - }; - Some(last_task) - } else { - None + let last_task = match self.cleanup_job { + Some(RescheduleTask(~ref task)) | + Some(RecycleTask(~ref task)) | + Some(GiveTask(~ref task, _)) => { + Some(task) + } + None => { + None + } }; // XXX: Pattern matching mutable pointers above doesn't work // because borrowck thinks the three patterns are conflicting @@ -329,7 +329,7 @@ pub impl Task { // have asked us to do some cleanup. let mut sched = ThreadLocalScheduler::new(); let sched = sched.get_scheduler(); - sched.run_cleanup_jobs(); + sched.run_cleanup_job(); start(); From a2e5827866876e9bafcf1f4e94f4e354e500420a Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 14 Apr 2013 20:24:15 -0700 Subject: [PATCH 05/14] core::rt: All context switches are followed by a cleanup action --- src/libcore/rt/sched.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 11934d676e6..7cfb5b7903f 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -60,6 +60,7 @@ impl HackAroundBorrowCk for UnsafeTaskReceiver { } enum CleanupJob { + DoNothing, RescheduleTask(~Task), RecycleTask(~Task), GiveTask(~Task, UnsafeTaskReceiver) @@ -148,6 +149,7 @@ pub impl Scheduler { // Store the task in the scheduler so it can be grabbed later self.current_task = Some(task); + self.enqueue_cleanup_job(DoNothing); // Take pointers to both the task and scheduler's saved registers. { @@ -243,14 +245,13 @@ pub impl Scheduler { } fn run_cleanup_job(&mut self) { - rtdebug!("running cleanup jobs"); + rtdebug!("running cleanup job"); - if self.cleanup_job.is_none() { - return; - } + assert!(self.cleanup_job.is_some()); let cleanup_job = self.cleanup_job.swap_unwrap(); match cleanup_job { + DoNothing => { } RescheduleTask(task) => { // NB: Pushing to the *front* of the queue self.task_queue.push_front(task); @@ -278,9 +279,10 @@ pub impl Scheduler { Some(GiveTask(~ref task, _)) => { Some(task) } - None => { + Some(DoNothing) => { None } + None => fail!(fmt!("all context switches should have a cleanup job")) }; // XXX: Pattern matching mutable pointers above doesn't work // because borrowck thinks the three patterns are conflicting From 93757d81853302408dd30a3ad1a4102eae26b7c5 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Sun, 14 Apr 2013 21:34:38 -0700 Subject: [PATCH 06/14] core::rt: Clean up the thread-local scheduler code Remove the ThreadLocalScheduler type in favor of functions. Move everything into the sched::local module. --- src/libcore/rt/sched.rs | 179 +++++++++++++++++++++------------------- 1 file changed, 92 insertions(+), 87 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index 7cfb5b7903f..e4d9c9681d7 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -11,14 +11,11 @@ use option::*; use sys; use cast::transmute; -use libc::c_void; -use ptr::mut_null; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; use super::rtio::{EventLoop, EventLoopObject}; use super::context::Context; -use tls = super::thread_local_storage; #[cfg(test)] use super::uvio::UvEventLoop; #[cfg(test)] use unstable::run_in_bare_thread; @@ -110,18 +107,13 @@ pub impl Scheduler { } fn install(~self, f: &fn(&mut Scheduler)) -> ~Scheduler { - let mut tlsched = ThreadLocalScheduler::new(); - tlsched.put_scheduler(self); - { - let sched = tlsched.get_scheduler(); - f(sched); + do local::install(self) { + local::borrow(f) } - return tlsched.take_scheduler(); } fn local(f: &fn(&mut Scheduler)) { - let mut tlsched = ThreadLocalScheduler::new(); - f(tlsched.get_scheduler()); + local::borrow(f) } // * Scheduler-context operations @@ -329,15 +321,15 @@ pub impl Task { // This is the first code to execute after the initial // context switch to the task. The previous context may // have asked us to do some cleanup. - let mut sched = ThreadLocalScheduler::new(); - let sched = sched.get_scheduler(); - sched.run_cleanup_job(); + do Scheduler::local |sched| { + sched.run_cleanup_job(); + } start(); - let mut sched = ThreadLocalScheduler::new(); - let sched = sched.get_scheduler(); - sched.terminate_current_task(); + do Scheduler::local |sched| { + sched.terminate_current_task(); + } }; return wrapper; } @@ -352,90 +344,103 @@ pub impl Task { } } -// NB: This is a type so we can use make use of the &self region. -struct ThreadLocalScheduler(tls::Key); +mod local { -impl ThreadLocalScheduler { - fn new() -> ThreadLocalScheduler { + //! Access to the thread-local Scheduler + + use ptr::mut_null; + use libc::c_void; + use cast::transmute; + + use super::Scheduler; + use tls = super::super::thread_local_storage; + #[cfg(test)] use super::super::uvio::UvEventLoop; + + /// Give the Scheduler to thread-local storage + pub fn put(sched: ~Scheduler) { unsafe { - // NB: This assumes that the TLS key has been created prior. - // Currently done in rust_start. - let key: *mut c_void = rust_get_sched_tls_key(); - let key: &mut tls::Key = transmute(key); - ThreadLocalScheduler(*key) + let key = tls_key(); + let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched); + tls::set(key, void_sched); } } - fn put_scheduler(&mut self, scheduler: ~Scheduler) { + /// Take ownership of the Scheduler from thread-local storage + pub fn take() -> ~Scheduler { unsafe { - let key = match self { &ThreadLocalScheduler(key) => key }; - let value: *mut c_void = transmute::<~Scheduler, *mut c_void>(scheduler); - tls::set(key, value); - } - } - - fn get_scheduler(&mut self) -> &'self mut Scheduler { - unsafe { - let key = match self { &ThreadLocalScheduler(key) => key }; - let mut value: *mut c_void = tls::get(key); - assert!(value.is_not_null()); - { - let value_ptr = &mut value; - let sched: &mut ~Scheduler = { - transmute::<&mut *mut c_void, &mut ~Scheduler>(value_ptr) - }; - let sched: &mut Scheduler = &mut **sched; - return sched; - } - } - } - - fn take_scheduler(&mut self) -> ~Scheduler { - unsafe { - let key = match self { &ThreadLocalScheduler(key) => key }; - let value: *mut c_void = tls::get(key); - assert!(value.is_not_null()); - let sched = transmute(value); + let key = tls_key(); + let void_sched: *mut c_void = tls::get(key); + assert!(void_sched.is_not_null()); + let sched = transmute::<*mut c_void, ~Scheduler>(void_sched); tls::set(key, mut_null()); return sched; } } -} -extern { - fn rust_get_sched_tls_key() -> *mut c_void; -} - -#[test] -fn thread_local_scheduler_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); - let mut tls_scheduler = ThreadLocalScheduler::new(); - tls_scheduler.put_scheduler(scheduler); - { - let _scheduler = tls_scheduler.get_scheduler(); - } - let _scheduler = tls_scheduler.take_scheduler(); -} - -#[test] -fn thread_local_scheduler_two_instances() { - let scheduler = ~UvEventLoop::new_scheduler(); - let mut tls_scheduler = ThreadLocalScheduler::new(); - tls_scheduler.put_scheduler(scheduler); - { - - let _scheduler = tls_scheduler.get_scheduler(); - } - { - let scheduler = tls_scheduler.take_scheduler(); - tls_scheduler.put_scheduler(scheduler); + /// Give the Scheduler to thread-local storage for the duration of the block + pub fn install(sched: ~Scheduler, f: &fn()) -> ~Scheduler { + put(sched); + f(); + return take(); } - let mut tls_scheduler = ThreadLocalScheduler::new(); - { - let _scheduler = tls_scheduler.get_scheduler(); + /// Borrow a mutable reference to the thread-local Scheduler + /// # Safety Note + /// Because this leaves the Scheduler in thread-local storage it is possible + /// For the Scheduler pointer to be aliased + pub fn borrow(f: &fn(&mut Scheduler)) { + unsafe { + let key = tls_key(); + let mut void_sched: *mut c_void = tls::get(key); + assert!(void_sched.is_not_null()); + { + let void_sched_ptr = &mut void_sched; + let sched: &mut ~Scheduler = { + transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) + }; + let sched: &mut Scheduler = &mut **sched; + f(sched); + } + } + } + + fn tls_key() -> tls::Key { + unsafe { + let key: *mut c_void = rust_get_sched_tls_key(); + let key: &mut tls::Key = transmute(key); + return *key; + } + } + + extern { + fn rust_get_sched_tls_key() -> *mut c_void; + } + + #[test] + fn thread_local_scheduler_smoke_test() { + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); + } + + #[test] + fn thread_local_scheduler_two_instances() { + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); + } + + #[test] + fn install_borrow_smoke_test() { + let scheduler = ~UvEventLoop::new_scheduler(); + let _scheduler = do install(scheduler) { + do borrow |_sched| { + } + }; } - let _scheduler = tls_scheduler.take_scheduler(); } #[test] From 4f1dd1fd212f51f0a2f4fe20f9ebc7e0dc9081d8 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 15:13:34 -0700 Subject: [PATCH 07/14] core::rt: Fix two incorrect uses of the thread-local scheduler These would not be threadsafe --- src/libcore/rt/sched.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched.rs index e4d9c9681d7..6ed21200876 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched.rs @@ -203,7 +203,10 @@ pub impl Scheduler { Context::swap(last_task_context, sched_context); } - self.run_cleanup_job(); + // We could be executing in a different thread now + do local::borrow |sched| { + sched.run_cleanup_job(); + } } /// Switch directly to another task, without going through the scheduler. @@ -224,7 +227,10 @@ pub impl Scheduler { Context::swap(last_task_context, next_task_context); } - self.run_cleanup_job(); + // We could be executing in a different thread now + do local::borrow |sched| { + sched.run_cleanup_job(); + } } // * Other stuff From 1f8ebb6a8ebc5ee490158d3f4264f8aefd509045 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 13:40:36 -0700 Subject: [PATCH 08/14] core::rt: Move thread-local scheduler to its own module --- src/libcore/rt/mod.rs | 1 + src/libcore/rt/sched/local.rs | 106 ++++++++++++++++++++++ src/libcore/rt/{sched.rs => sched/mod.rs} | 101 +-------------------- 3 files changed, 109 insertions(+), 99 deletions(-) create mode 100644 src/libcore/rt/sched/local.rs rename src/libcore/rt/{sched.rs => sched/mod.rs} (85%) diff --git a/src/libcore/rt/mod.rs b/src/libcore/rt/mod.rs index 0f2a6cd7ef9..b1227af5f4c 100644 --- a/src/libcore/rt/mod.rs +++ b/src/libcore/rt/mod.rs @@ -32,6 +32,7 @@ macro_rules! rtdebug ( ($( $arg:expr),+) => ( $(let _ = $arg)*; ) ) +#[path = "sched/mod.rs"] mod sched; mod rtio; pub mod uvll; diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local.rs new file mode 100644 index 00000000000..1b1a0033017 --- /dev/null +++ b/src/libcore/rt/sched/local.rs @@ -0,0 +1,106 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +//! Access to the thread-local Scheduler + +use ptr::mut_null; +use libc::c_void; +use cast::transmute; + +use super::Scheduler; +use tls = super::super::thread_local_storage; +#[cfg(test)] use super::super::uvio::UvEventLoop; + +/// Give the Scheduler to thread-local storage +pub fn put(sched: ~Scheduler) { + unsafe { + let key = tls_key(); + let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched); + tls::set(key, void_sched); + } +} + +/// Take ownership of the Scheduler from thread-local storage +pub fn take() -> ~Scheduler { + unsafe { + let key = tls_key(); + let void_sched: *mut c_void = tls::get(key); + assert!(void_sched.is_not_null()); + let sched = transmute::<*mut c_void, ~Scheduler>(void_sched); + tls::set(key, mut_null()); + return sched; + } +} + +/// Give the Scheduler to thread-local storage for the duration of the block +pub fn install(sched: ~Scheduler, f: &fn()) -> ~Scheduler { + put(sched); + f(); + return take(); +} + +/// Borrow a mutable reference to the thread-local Scheduler +/// # Safety Note +/// Because this leaves the Scheduler in thread-local storage it is possible +/// For the Scheduler pointer to be aliased +pub fn borrow(f: &fn(&mut Scheduler)) { + unsafe { + let key = tls_key(); + let mut void_sched: *mut c_void = tls::get(key); + assert!(void_sched.is_not_null()); + { + let void_sched_ptr = &mut void_sched; + let sched: &mut ~Scheduler = { + transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) + }; + let sched: &mut Scheduler = &mut **sched; + f(sched); + } + } +} + +fn tls_key() -> tls::Key { + unsafe { + let key: *mut c_void = rust_get_sched_tls_key(); + let key: &mut tls::Key = transmute(key); + return *key; + } +} + +extern { + fn rust_get_sched_tls_key() -> *mut c_void; +} + +#[test] +fn thread_local_scheduler_smoke_test() { + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); +} + +#[test] +fn thread_local_scheduler_two_instances() { + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); + let scheduler = ~UvEventLoop::new_scheduler(); + put(scheduler); + let _scheduler = take(); +} + +#[test] +fn install_borrow_smoke_test() { + let scheduler = ~UvEventLoop::new_scheduler(); + let _scheduler = do install(scheduler) { + do borrow |_sched| { + } + }; +} + diff --git a/src/libcore/rt/sched.rs b/src/libcore/rt/sched/mod.rs similarity index 85% rename from src/libcore/rt/sched.rs rename to src/libcore/rt/sched/mod.rs index 6ed21200876..bc25023d05f 100644 --- a/src/libcore/rt/sched.rs +++ b/src/libcore/rt/sched/mod.rs @@ -21,6 +21,8 @@ use super::context::Context; #[cfg(test)] use unstable::run_in_bare_thread; #[cfg(test)] use int; +mod local; + /// The Scheduler is responsible for coordinating execution of Tasks /// on a single thread. When the scheduler is running it is owned by /// thread local storage and the running task is owned by the @@ -350,105 +352,6 @@ pub impl Task { } } -mod local { - - //! Access to the thread-local Scheduler - - use ptr::mut_null; - use libc::c_void; - use cast::transmute; - - use super::Scheduler; - use tls = super::super::thread_local_storage; - #[cfg(test)] use super::super::uvio::UvEventLoop; - - /// Give the Scheduler to thread-local storage - pub fn put(sched: ~Scheduler) { - unsafe { - let key = tls_key(); - let void_sched: *mut c_void = transmute::<~Scheduler, *mut c_void>(sched); - tls::set(key, void_sched); - } - } - - /// Take ownership of the Scheduler from thread-local storage - pub fn take() -> ~Scheduler { - unsafe { - let key = tls_key(); - let void_sched: *mut c_void = tls::get(key); - assert!(void_sched.is_not_null()); - let sched = transmute::<*mut c_void, ~Scheduler>(void_sched); - tls::set(key, mut_null()); - return sched; - } - } - - /// Give the Scheduler to thread-local storage for the duration of the block - pub fn install(sched: ~Scheduler, f: &fn()) -> ~Scheduler { - put(sched); - f(); - return take(); - } - - /// Borrow a mutable reference to the thread-local Scheduler - /// # Safety Note - /// Because this leaves the Scheduler in thread-local storage it is possible - /// For the Scheduler pointer to be aliased - pub fn borrow(f: &fn(&mut Scheduler)) { - unsafe { - let key = tls_key(); - let mut void_sched: *mut c_void = tls::get(key); - assert!(void_sched.is_not_null()); - { - let void_sched_ptr = &mut void_sched; - let sched: &mut ~Scheduler = { - transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) - }; - let sched: &mut Scheduler = &mut **sched; - f(sched); - } - } - } - - fn tls_key() -> tls::Key { - unsafe { - let key: *mut c_void = rust_get_sched_tls_key(); - let key: &mut tls::Key = transmute(key); - return *key; - } - } - - extern { - fn rust_get_sched_tls_key() -> *mut c_void; - } - - #[test] - fn thread_local_scheduler_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); - put(scheduler); - let _scheduler = take(); - } - - #[test] - fn thread_local_scheduler_two_instances() { - let scheduler = ~UvEventLoop::new_scheduler(); - put(scheduler); - let _scheduler = take(); - let scheduler = ~UvEventLoop::new_scheduler(); - put(scheduler); - let _scheduler = take(); - } - - #[test] - fn install_borrow_smoke_test() { - let scheduler = ~UvEventLoop::new_scheduler(); - let _scheduler = do install(scheduler) { - do borrow |_sched| { - } - }; - } -} - #[test] fn test_simple_scheduling() { do run_in_bare_thread { From 3b8a354c21cec6204a268cfd1e2946744c28ed0e Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 15:25:29 -0700 Subject: [PATCH 09/14] core::rt: A little bit of cleanup to thread-local scheduler --- src/libcore/rt/sched/local.rs | 17 ++++++----------- src/libcore/rt/sched/mod.rs | 16 +++++++--------- 2 files changed, 13 insertions(+), 20 deletions(-) diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local.rs index 1b1a0033017..020d581546a 100644 --- a/src/libcore/rt/sched/local.rs +++ b/src/libcore/rt/sched/local.rs @@ -39,18 +39,11 @@ pub fn take() -> ~Scheduler { } } -/// Give the Scheduler to thread-local storage for the duration of the block -pub fn install(sched: ~Scheduler, f: &fn()) -> ~Scheduler { - put(sched); - f(); - return take(); -} - /// Borrow a mutable reference to the thread-local Scheduler /// # Safety Note /// Because this leaves the Scheduler in thread-local storage it is possible /// For the Scheduler pointer to be aliased -pub fn borrow(f: &fn(&mut Scheduler)) { +pub unsafe fn borrow(f: &fn(&mut Scheduler)) { unsafe { let key = tls_key(); let mut void_sched: *mut c_void = tls::get(key); @@ -96,11 +89,13 @@ fn thread_local_scheduler_two_instances() { } #[test] -fn install_borrow_smoke_test() { +fn borrow_smoke_test() { let scheduler = ~UvEventLoop::new_scheduler(); - let _scheduler = do install(scheduler) { + put(scheduler); + unsafe { do borrow |_sched| { } - }; + } + let _scheduler = take(); } diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index bc25023d05f..2cdaf9573fb 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -93,7 +93,9 @@ pub impl Scheduler { assert!(!self.in_task_context()); // Give ownership of the scheduler (self) to the thread - do self.install |scheduler| { + local::put(self); + + do Scheduler::local |scheduler| { fn run_scheduler_once() { do Scheduler::local |scheduler| { if scheduler.resume_task_from_queue() { @@ -106,16 +108,12 @@ pub impl Scheduler { scheduler.event_loop.callback(run_scheduler_once); scheduler.event_loop.run(); } - } - fn install(~self, f: &fn(&mut Scheduler)) -> ~Scheduler { - do local::install(self) { - local::borrow(f) - } + return local::take(); } fn local(f: &fn(&mut Scheduler)) { - local::borrow(f) + unsafe { local::borrow(f) } } // * Scheduler-context operations @@ -206,7 +204,7 @@ pub impl Scheduler { } // We could be executing in a different thread now - do local::borrow |sched| { + do Scheduler::local |sched| { sched.run_cleanup_job(); } } @@ -230,7 +228,7 @@ pub impl Scheduler { } // We could be executing in a different thread now - do local::borrow |sched| { + do Scheduler::local |sched| { sched.run_cleanup_job(); } } From 473b4d19ad51529afb8f1391cf471a20053c781c Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 15:32:43 -0700 Subject: [PATCH 10/14] core::rt: Rename Scheduler::local to Scheduler::unsafe_local --- src/libcore/rt/sched/mod.rs | 26 +++++++++++++++----------- src/libcore/rt/uvio.rs | 30 +++++++++++++++--------------- 2 files changed, 30 insertions(+), 26 deletions(-) diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index 2cdaf9573fb..82f0c3592c4 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -95,9 +95,9 @@ pub impl Scheduler { // Give ownership of the scheduler (self) to the thread local::put(self); - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { fn run_scheduler_once() { - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { if scheduler.resume_task_from_queue() { // Ok, a task ran. Nice! We'll do it again later scheduler.event_loop.callback(run_scheduler_once); @@ -112,7 +112,11 @@ pub impl Scheduler { return local::take(); } - fn local(f: &fn(&mut Scheduler)) { + /// Get a mutable pointer to the thread-local scheduler. + /// # Safety Note + /// This allows other mutable aliases to the scheduler, both in the current + /// execution context and other execution contexts. + fn unsafe_local(f: &fn(&mut Scheduler)) { unsafe { local::borrow(f) } } @@ -204,7 +208,7 @@ pub impl Scheduler { } // We could be executing in a different thread now - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { sched.run_cleanup_job(); } } @@ -228,7 +232,7 @@ pub impl Scheduler { } // We could be executing in a different thread now - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { sched.run_cleanup_job(); } } @@ -327,13 +331,13 @@ pub impl Task { // This is the first code to execute after the initial // context switch to the task. The previous context may // have asked us to do some cleanup. - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { sched.run_cleanup_job(); } start(); - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { sched.terminate_current_task(); } }; @@ -394,7 +398,7 @@ fn test_swap_tasks() { let mut sched = ~UvEventLoop::new_scheduler(); let task1 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let task2 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; @@ -427,7 +431,7 @@ fn test_run_a_lot_of_tasks_queued() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -460,7 +464,7 @@ fn test_run_a_lot_of_tasks_direct() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -481,7 +485,7 @@ fn test_block_task() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { assert!(sched.in_task_context()); do sched.deschedule_running_task_and_then() |sched, task| { assert!(!sched.in_task_context()); diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index 97c65b39f9c..7dfc6fff4cd 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -104,7 +104,7 @@ impl IoFactory for UvIoFactory { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context @@ -131,7 +131,7 @@ impl IoFactory for UvIoFactory { unsafe { (*result_cell_ptr).put_back(maybe_stream); } // Context switch - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { scheduler.resume_task_immediately(task_cell.take()); } } @@ -178,7 +178,7 @@ impl TcpListener for UvTcpListener { let server_tcp_watcher = self.watcher(); - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { assert!(scheduler.in_task_context()); do scheduler.deschedule_running_task_and_then |_, task| { @@ -201,7 +201,7 @@ impl TcpListener for UvTcpListener { rtdebug!("resuming task from listen"); // Context switch - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { scheduler.resume_task_immediately(task_cell.take()); } } @@ -243,7 +243,7 @@ impl Stream for UvStream { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; @@ -275,7 +275,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { scheduler.resume_task_immediately(task_cell.take()); } } @@ -289,7 +289,7 @@ impl Stream for UvStream { fn write(&mut self, buf: &[u8]) -> Result<(), ()> { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; @@ -308,7 +308,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { scheduler.resume_task_immediately(task_cell.take()); } } @@ -326,7 +326,7 @@ fn test_simple_io_no_connect() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let io = sched.event_loop.io().unwrap(); let addr = Ipv4(127, 0, 0, 1, 2926); let maybe_chan = io.connect(addr); @@ -346,7 +346,7 @@ fn test_simple_tcp_server_and_client() { let addr = Ipv4(127, 0, 0, 1, 2929); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let io = sched.event_loop.io().unwrap(); let mut stream = io.connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); @@ -355,7 +355,7 @@ fn test_simple_tcp_server_and_client() { }; let server_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let io = sched.event_loop.io().unwrap(); let mut listener = io.bind(addr).unwrap(); let mut stream = listener.listen().unwrap(); @@ -385,7 +385,7 @@ fn test_read_and_block() { let addr = Ipv4(127, 0, 0, 1, 2930); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let io = sched.event_loop.io().unwrap(); let mut stream = io.connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); @@ -397,7 +397,7 @@ fn test_read_and_block() { }; let server_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let io = sched.event_loop.io().unwrap(); let mut listener = io.bind(addr).unwrap(); let mut stream = listener.listen().unwrap(); @@ -416,7 +416,7 @@ fn test_read_and_block() { } reads += 1; - do Scheduler::local |scheduler| { + do Scheduler::unsafe_local |scheduler| { // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it @@ -448,7 +448,7 @@ fn test_read_read_read() { let addr = Ipv4(127, 0, 0, 1, 2931); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::local |sched| { + do Scheduler::unsafe_local |sched| { let io = sched.event_loop.io().unwrap(); let mut stream = io.connect(addr).unwrap(); let mut buf = [0, .. 2048]; From ebefe07792caf17c03c6f90fb1979d4e6c935001 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 16:00:15 -0700 Subject: [PATCH 11/14] core::rt: Make Scheduler::unsafe_local return a fabricated region pointer Instead of taking a closure. It's unsafe either way. Rename it to unsafe_local_borrow. --- src/libcore/rt/sched/local.rs | 7 +- src/libcore/rt/sched/mod.rs | 97 +++++----- src/libcore/rt/uvio.rs | 343 ++++++++++++++++------------------ 3 files changed, 214 insertions(+), 233 deletions(-) diff --git a/src/libcore/rt/sched/local.rs b/src/libcore/rt/sched/local.rs index 020d581546a..d8001011114 100644 --- a/src/libcore/rt/sched/local.rs +++ b/src/libcore/rt/sched/local.rs @@ -43,7 +43,7 @@ pub fn take() -> ~Scheduler { /// # Safety Note /// Because this leaves the Scheduler in thread-local storage it is possible /// For the Scheduler pointer to be aliased -pub unsafe fn borrow(f: &fn(&mut Scheduler)) { +pub unsafe fn borrow() -> &mut Scheduler { unsafe { let key = tls_key(); let mut void_sched: *mut c_void = tls::get(key); @@ -54,7 +54,7 @@ pub unsafe fn borrow(f: &fn(&mut Scheduler)) { transmute::<&mut *mut c_void, &mut ~Scheduler>(void_sched_ptr) }; let sched: &mut Scheduler = &mut **sched; - f(sched); + return sched; } } } @@ -93,8 +93,7 @@ fn borrow_smoke_test() { let scheduler = ~UvEventLoop::new_scheduler(); put(scheduler); unsafe { - do borrow |_sched| { - } + let _scheduler = borrow(); } let _scheduler = take(); } diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index 82f0c3592c4..a2a440ba76e 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -95,20 +95,18 @@ pub impl Scheduler { // Give ownership of the scheduler (self) to the thread local::put(self); - do Scheduler::unsafe_local |scheduler| { - fn run_scheduler_once() { - do Scheduler::unsafe_local |scheduler| { - if scheduler.resume_task_from_queue() { - // Ok, a task ran. Nice! We'll do it again later - scheduler.event_loop.callback(run_scheduler_once); - } - } + let scheduler = Scheduler::unsafe_local_borrow(); + fn run_scheduler_once() { + let scheduler = Scheduler::unsafe_local_borrow(); + if scheduler.resume_task_from_queue() { + // Ok, a task ran. Nice! We'll do it again later + scheduler.event_loop.callback(run_scheduler_once); } - - scheduler.event_loop.callback(run_scheduler_once); - scheduler.event_loop.run(); } + scheduler.event_loop.callback(run_scheduler_once); + scheduler.event_loop.run(); + return local::take(); } @@ -116,8 +114,14 @@ pub impl Scheduler { /// # Safety Note /// This allows other mutable aliases to the scheduler, both in the current /// execution context and other execution contexts. - fn unsafe_local(f: &fn(&mut Scheduler)) { - unsafe { local::borrow(f) } + fn unsafe_local_borrow() -> &mut Scheduler { + unsafe { local::borrow() } + } + + fn local_borrow(f: &fn(&mut Scheduler)) { + let mut sched = local::take(); + f(sched); + local::put(sched); } // * Scheduler-context operations @@ -208,9 +212,8 @@ pub impl Scheduler { } // We could be executing in a different thread now - do Scheduler::unsafe_local |sched| { - sched.run_cleanup_job(); - } + let sched = Scheduler::unsafe_local_borrow(); + sched.run_cleanup_job(); } /// Switch directly to another task, without going through the scheduler. @@ -232,9 +235,8 @@ pub impl Scheduler { } // We could be executing in a different thread now - do Scheduler::unsafe_local |sched| { - sched.run_cleanup_job(); - } + let sched = Scheduler::unsafe_local_borrow(); + sched.run_cleanup_job(); } // * Other stuff @@ -331,15 +333,13 @@ pub impl Task { // This is the first code to execute after the initial // context switch to the task. The previous context may // have asked us to do some cleanup. - do Scheduler::unsafe_local |sched| { - sched.run_cleanup_job(); - } + let sched = Scheduler::unsafe_local_borrow(); + sched.run_cleanup_job(); start(); - do Scheduler::unsafe_local |sched| { - sched.terminate_current_task(); - } + let sched = Scheduler::unsafe_local_borrow(); + sched.terminate_current_task(); }; return wrapper; } @@ -398,13 +398,12 @@ fn test_swap_tasks() { let mut sched = ~UvEventLoop::new_scheduler(); let task1 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } - do Scheduler::unsafe_local |sched| { - let task2 = ~do Task::new(&mut sched.stack_pool) { - unsafe { *count_ptr = *count_ptr + 1; } - }; - // Context switch directly to the new task - sched.resume_task_from_running_task_direct(task2); - } + let sched = Scheduler::unsafe_local_borrow(); + let task2 = ~do Task::new(&mut sched.stack_pool) { + unsafe { *count_ptr = *count_ptr + 1; } + }; + // Context switch directly to the new task + sched.resume_task_from_running_task_direct(task2); unsafe { *count_ptr = *count_ptr + 1; } }; sched.task_queue.push_back(task1); @@ -431,7 +430,7 @@ fn test_run_a_lot_of_tasks_queued() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::unsafe_local |sched| { + do Scheduler::local_borrow |sched| { let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -464,18 +463,17 @@ fn test_run_a_lot_of_tasks_direct() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::unsafe_local |sched| { - let task = ~do Task::new(&mut sched.stack_pool) { - unsafe { - *count_ptr = *count_ptr + 1; - if *count_ptr != MAX { - run_task(count_ptr); - } + let sched = Scheduler::unsafe_local_borrow(); + let task = ~do Task::new(&mut sched.stack_pool) { + unsafe { + *count_ptr = *count_ptr + 1; + if *count_ptr != MAX { + run_task(count_ptr); } - }; - // Context switch directly to the new task - sched.resume_task_from_running_task_direct(task); - } + } + }; + // Context switch directly to the new task + sched.resume_task_from_running_task_direct(task); }; } } @@ -485,12 +483,11 @@ fn test_block_task() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - assert!(!sched.in_task_context()); - sched.task_queue.push_back(task); - } + let sched = Scheduler::unsafe_local_borrow(); + assert!(sched.in_task_context()); + do sched.deschedule_running_task_and_then() |sched, task| { + assert!(!sched.in_task_context()); + sched.task_queue.push_back(task); } }; sched.task_queue.push_back(task); diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index 7dfc6fff4cd..a43ec07c2de 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -104,37 +104,35 @@ impl IoFactory for UvIoFactory { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::unsafe_local |scheduler| { - assert!(scheduler.in_task_context()); + let scheduler = Scheduler::unsafe_local_borrow(); + assert!(scheduler.in_task_context()); - // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |scheduler, task| { + // Block this task and take ownership, switch to scheduler context + do scheduler.deschedule_running_task_and_then |scheduler, task| { - rtdebug!("connect: entered scheduler context"); - assert!(!scheduler.in_task_context()); - let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); - let task_cell = Cell(task); + rtdebug!("connect: entered scheduler context"); + assert!(!scheduler.in_task_context()); + let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); + let task_cell = Cell(task); - // Wait for a connection - do tcp_watcher.connect(addr) |stream_watcher, status| { - rtdebug!("connect: in connect callback"); - let maybe_stream = if status.is_none() { - rtdebug!("status is none"); - Some(~UvStream(stream_watcher)) - } else { - rtdebug!("status is some"); - stream_watcher.close(||()); - None - }; + // Wait for a connection + do tcp_watcher.connect(addr) |stream_watcher, status| { + rtdebug!("connect: in connect callback"); + let maybe_stream = if status.is_none() { + rtdebug!("status is none"); + Some(~UvStream(stream_watcher)) + } else { + rtdebug!("status is some"); + stream_watcher.close(||()); + None + }; - // Store the stream in the task's stack - unsafe { (*result_cell_ptr).put_back(maybe_stream); } + // Store the stream in the task's stack + unsafe { (*result_cell_ptr).put_back(maybe_stream); } - // Context switch - do Scheduler::unsafe_local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + // Context switch + let scheduler = Scheduler::unsafe_local_borrow(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -178,33 +176,31 @@ impl TcpListener for UvTcpListener { let server_tcp_watcher = self.watcher(); - do Scheduler::unsafe_local |scheduler| { - assert!(scheduler.in_task_context()); + let scheduler = Scheduler::unsafe_local_borrow(); + assert!(scheduler.in_task_context()); - do scheduler.deschedule_running_task_and_then |_, task| { - let task_cell = Cell(task); - let mut server_tcp_watcher = server_tcp_watcher; - do server_tcp_watcher.listen |server_stream_watcher, status| { - let maybe_stream = if status.is_none() { - let mut server_stream_watcher = server_stream_watcher; - let mut loop_ = loop_from_watcher(&server_stream_watcher); - let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); - let mut client_tcp_watcher = client_tcp_watcher.as_stream(); - // XXX: Need's to be surfaced in interface - server_stream_watcher.accept(client_tcp_watcher); - Some(~UvStream::new(client_tcp_watcher)) - } else { - None - }; + do scheduler.deschedule_running_task_and_then |_, task| { + let task_cell = Cell(task); + let mut server_tcp_watcher = server_tcp_watcher; + do server_tcp_watcher.listen |server_stream_watcher, status| { + let maybe_stream = if status.is_none() { + let mut server_stream_watcher = server_stream_watcher; + let mut loop_ = loop_from_watcher(&server_stream_watcher); + let mut client_tcp_watcher = TcpWatcher::new(&mut loop_); + let mut client_tcp_watcher = client_tcp_watcher.as_stream(); + // XXX: Need's to be surfaced in interface + server_stream_watcher.accept(client_tcp_watcher); + Some(~UvStream::new(client_tcp_watcher)) + } else { + None + }; - unsafe { (*result_cell_ptr).put_back(maybe_stream); } + unsafe { (*result_cell_ptr).put_back(maybe_stream); } - rtdebug!("resuming task from listen"); - // Context switch - do Scheduler::unsafe_local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + rtdebug!("resuming task from listen"); + // Context switch + let scheduler = Scheduler::unsafe_local_borrow(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -243,42 +239,40 @@ impl Stream for UvStream { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::unsafe_local |scheduler| { - assert!(scheduler.in_task_context()); - let watcher = self.watcher(); - let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |scheduler, task| { - rtdebug!("read: entered scheduler context"); - assert!(!scheduler.in_task_context()); + let scheduler = Scheduler::unsafe_local_borrow(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&mut [u8] = &buf; + do scheduler.deschedule_running_task_and_then |scheduler, task| { + rtdebug!("read: entered scheduler context"); + assert!(!scheduler.in_task_context()); + let mut watcher = watcher; + let task_cell = Cell(task); + // XXX: We shouldn't reallocate these callbacks every + // call to read + let alloc: AllocCallback = |_| unsafe { + slice_to_uv_buf(*buf_ptr) + }; + do watcher.read_start(alloc) |watcher, nread, _buf, status| { + + // Stop reading so that no read callbacks are + // triggered before the user calls `read` again. + // XXX: Is there a performance impact to calling + // stop here? let mut watcher = watcher; - let task_cell = Cell(task); - // XXX: We shouldn't reallocate these callbacks every - // call to read - let alloc: AllocCallback = |_| unsafe { - slice_to_uv_buf(*buf_ptr) + watcher.read_stop(); + + let result = if status.is_none() { + assert!(nread >= 0); + Ok(nread as uint) + } else { + Err(()) }; - do watcher.read_start(alloc) |watcher, nread, _buf, status| { - // Stop reading so that no read callbacks are - // triggered before the user calls `read` again. - // XXX: Is there a performance impact to calling - // stop here? - let mut watcher = watcher; - watcher.read_stop(); + unsafe { (*result_cell_ptr).put_back(result); } - let result = if status.is_none() { - assert!(nread >= 0); - Ok(nread as uint) - } else { - Err(()) - }; - - unsafe { (*result_cell_ptr).put_back(result); } - - do Scheduler::unsafe_local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + let scheduler = Scheduler::unsafe_local_borrow(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -289,29 +283,27 @@ impl Stream for UvStream { fn write(&mut self, buf: &[u8]) -> Result<(), ()> { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - do Scheduler::unsafe_local |scheduler| { - assert!(scheduler.in_task_context()); - let watcher = self.watcher(); - let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { - let mut watcher = watcher; - let task_cell = Cell(task); - let buf = unsafe { &*buf_ptr }; - // XXX: OMGCOPIES - let buf = buf.to_vec(); - do watcher.write(buf) |_watcher, status| { - let result = if status.is_none() { - Ok(()) - } else { - Err(()) - }; + let scheduler = Scheduler::unsafe_local_borrow(); + assert!(scheduler.in_task_context()); + let watcher = self.watcher(); + let buf_ptr: *&[u8] = &buf; + do scheduler.deschedule_running_task_and_then |_, task| { + let mut watcher = watcher; + let task_cell = Cell(task); + let buf = unsafe { &*buf_ptr }; + // XXX: OMGCOPIES + let buf = buf.to_vec(); + do watcher.write(buf) |_watcher, status| { + let result = if status.is_none() { + Ok(()) + } else { + Err(()) + }; - unsafe { (*result_cell_ptr).put_back(result); } + unsafe { (*result_cell_ptr).put_back(result); } - do Scheduler::unsafe_local |scheduler| { - scheduler.resume_task_immediately(task_cell.take()); - } - } + let scheduler = Scheduler::unsafe_local_borrow(); + scheduler.resume_task_immediately(task_cell.take()); } } @@ -326,12 +318,11 @@ fn test_simple_io_no_connect() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let addr = Ipv4(127, 0, 0, 1, 2926); - let maybe_chan = io.connect(addr); - assert!(maybe_chan.is_none()); - } + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let addr = Ipv4(127, 0, 0, 1, 2926); + let maybe_chan = io.connect(addr); + assert!(maybe_chan.is_none()); }; sched.task_queue.push_back(task); sched.run(); @@ -346,29 +337,27 @@ fn test_simple_tcp_server_and_client() { let addr = Ipv4(127, 0, 0, 1, 2929); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); - } + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); }; let server_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); - let mut buf = [0, .. 2048]; - let nread = stream.read(buf).unwrap(); - assert!(nread == 8); - for uint::range(0, nread) |i| { - rtdebug!("%u", buf[i] as uint); - assert!(buf[i] == i as u8); - } - stream.close(); - listener.close(); + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; + let nread = stream.read(buf).unwrap(); + assert!(nread == 8); + for uint::range(0, nread) |i| { + rtdebug!("%u", buf[i] as uint); + assert!(buf[i] == i as u8); } + stream.close(); + listener.close(); }; // Start the server first so it listens before the client connects @@ -385,53 +374,50 @@ fn test_read_and_block() { let addr = Ipv4(127, 0, 0, 1, 2930); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.write([0, 1, 2, 3, 4, 5, 6, 7]); - stream.close(); - } + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.write([0, 1, 2, 3, 4, 5, 6, 7]); + stream.close(); }; let server_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut listener = io.bind(addr).unwrap(); - let mut stream = listener.listen().unwrap(); - let mut buf = [0, .. 2048]; + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut listener = io.bind(addr).unwrap(); + let mut stream = listener.listen().unwrap(); + let mut buf = [0, .. 2048]; - let expected = 32; - let mut current = 0; - let mut reads = 0; + let expected = 32; + let mut current = 0; + let mut reads = 0; - while current < expected { - let nread = stream.read(buf).unwrap(); - for uint::range(0, nread) |i| { - let val = buf[i] as uint; - assert!(val == current % 8); - current += 1; - } - reads += 1; - - do Scheduler::unsafe_local |scheduler| { - // Yield to the other task in hopes that it - // will trigger a read callback while we are - // not ready for it - do scheduler.deschedule_running_task_and_then |scheduler, task| { - scheduler.task_queue.push_back(task); - } - } + while current < expected { + let nread = stream.read(buf).unwrap(); + for uint::range(0, nread) |i| { + let val = buf[i] as uint; + assert!(val == current % 8); + current += 1; } + reads += 1; - // Make sure we had multiple reads - assert!(reads > 1); - - stream.close(); - listener.close(); + let scheduler = Scheduler::unsafe_local_borrow(); + // Yield to the other task in hopes that it + // will trigger a read callback while we are + // not ready for it + do scheduler.deschedule_running_task_and_then |scheduler, task| { + scheduler.task_queue.push_back(task); + } } + + // Make sure we had multiple reads + assert!(reads > 1); + + stream.close(); + listener.close(); }; // Start the server first so it listens before the client connects @@ -448,19 +434,18 @@ fn test_read_read_read() { let addr = Ipv4(127, 0, 0, 1, 2931); let client_task = ~do Task::new(&mut sched.stack_pool) { - do Scheduler::unsafe_local |sched| { - let io = sched.event_loop.io().unwrap(); - let mut stream = io.connect(addr).unwrap(); - let mut buf = [0, .. 2048]; - let mut total_bytes_read = 0; - while total_bytes_read < 500000000 { - let nread = stream.read(buf).unwrap(); - rtdebug!("read %u bytes", nread as uint); - total_bytes_read += nread; - } - rtdebug_!("read %u bytes total", total_bytes_read as uint); - stream.close(); + let sched = Scheduler::unsafe_local_borrow(); + let io = sched.event_loop.io().unwrap(); + let mut stream = io.connect(addr).unwrap(); + let mut buf = [0, .. 2048]; + let mut total_bytes_read = 0; + while total_bytes_read < 500000000 { + let nread = stream.read(buf).unwrap(); + rtdebug!("read %u bytes", nread as uint); + total_bytes_read += nread; } + rtdebug_!("read %u bytes total", total_bytes_read as uint); + stream.close(); }; sched.task_queue.push_back(client_task); From 68583a25a0b31bc113cf1f4ec479339cbf876e4d Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 16:19:01 -0700 Subject: [PATCH 12/14] core::rt: Restructure context switches to take ownership of the Scheduler In order to do a context switch you have to give up ownership of the scheduler, effectively passing it to the next execution context. This could help avoid some situations here tasks retain unsafe pointers to schedulers between context switches, across which they may have changed threads. There are still a number of uses of unsafe scheduler pointers. --- src/libcore/rt/sched/mod.rs | 117 ++++++++++++++++++++++-------------- src/libcore/rt/uvio.rs | 41 +++++++------ 2 files changed, 95 insertions(+), 63 deletions(-) diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index a2a440ba76e..2aaf0a44455 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -20,6 +20,7 @@ use super::context::Context; #[cfg(test)] use super::uvio::UvEventLoop; #[cfg(test)] use unstable::run_in_bare_thread; #[cfg(test)] use int; +#[cfg(test)] use cell::Cell; mod local; @@ -46,14 +47,14 @@ pub struct Scheduler { // complaining type UnsafeTaskReceiver = sys::Closure; trait HackAroundBorrowCk { - fn from_fn(&fn(&mut Scheduler, ~Task)) -> Self; - fn to_fn(self) -> &fn(&mut Scheduler, ~Task); + fn from_fn(&fn(~Task)) -> Self; + fn to_fn(self) -> &fn(~Task); } impl HackAroundBorrowCk for UnsafeTaskReceiver { - fn from_fn(f: &fn(&mut Scheduler, ~Task)) -> UnsafeTaskReceiver { + fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } - fn to_fn(self) -> &fn(&mut Scheduler, ~Task) { + fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } } } @@ -97,10 +98,12 @@ pub impl Scheduler { let scheduler = Scheduler::unsafe_local_borrow(); fn run_scheduler_once() { - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); if scheduler.resume_task_from_queue() { // Ok, a task ran. Nice! We'll do it again later - scheduler.event_loop.callback(run_scheduler_once); + do Scheduler::local_borrow |scheduler| { + scheduler.event_loop.callback(run_scheduler_once); + } } } @@ -124,9 +127,13 @@ pub impl Scheduler { local::put(sched); } + fn local_take() -> ~Scheduler { + local::take() + } + // * Scheduler-context operations - fn resume_task_from_queue(&mut self) -> bool { + fn resume_task_from_queue(~self) -> bool { assert!(!self.in_task_context()); let mut self = self; @@ -137,12 +144,14 @@ pub impl Scheduler { } None => { rtdebug!("no tasks in queue"); + local::put(self); return false; } } } - fn resume_task_immediately(&mut self, task: ~Task) { + fn resume_task_immediately(~self, task: ~Task) { + let mut self = self; assert!(!self.in_task_context()); rtdebug!("scheduling a task"); @@ -151,20 +160,22 @@ pub impl Scheduler { self.current_task = Some(task); self.enqueue_cleanup_job(DoNothing); - // Take pointers to both the task and scheduler's saved registers. - { - let (sched_context, _, next_task_context) = self.get_contexts(); - let next_task_context = next_task_context.unwrap(); - // Context switch to the task, restoring it's registers - // and saving the scheduler's - Context::swap(sched_context, next_task_context); - } + local::put(self); + // Take pointers to both the task and scheduler's saved registers. + let sched = Scheduler::unsafe_local_borrow(); + let (sched_context, _, next_task_context) = sched.get_contexts(); + let next_task_context = next_task_context.unwrap(); + // Context switch to the task, restoring it's registers + // and saving the scheduler's + Context::swap(sched_context, next_task_context); + + let sched = Scheduler::unsafe_local_borrow(); // The running task should have passed ownership elsewhere - assert!(self.current_task.is_none()); + assert!(sched.current_task.is_none()); // Running tasks may have asked us to do some cleanup - self.run_cleanup_job(); + sched.run_cleanup_job(); } @@ -172,18 +183,23 @@ pub impl Scheduler { /// Called by a running task to end execution, after which it will /// be recycled by the scheduler for reuse in a new task. - fn terminate_current_task(&mut self) { + fn terminate_current_task(~self) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("ending running task"); let dead_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RecycleTask(dead_task)); - { - let (sched_context, last_task_context, _) = self.get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - } + + local::put(self); + + let sched = Scheduler::unsafe_local_borrow(); + let (sched_context, last_task_context, _) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); + + // Control never reaches here } /// Block a running task, context switch to the scheduler, then pass the @@ -194,22 +210,25 @@ pub impl Scheduler { /// The closure here is a *stack* closure that lives in the /// running task. It gets transmuted to the scheduler's lifetime /// and called while the task is blocked. - fn deschedule_running_task_and_then(&mut self, f: &fn(&mut Scheduler, ~Task)) { + fn deschedule_running_task_and_then(~self, f: &fn(~Task)) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("blocking task"); let blocked_task = self.current_task.swap_unwrap(); let f_fake_region = unsafe { - transmute::<&fn(&mut Scheduler, ~Task), &fn(&mut Scheduler, ~Task)>(f) + transmute::<&fn(~Task), &fn(~Task)>(f) }; let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region); self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); - { - let (sched_context, last_task_context, _) = self.get_contexts(); - let last_task_context = last_task_context.unwrap(); - Context::swap(last_task_context, sched_context); - } + + local::put(self); + + let sched = Scheduler::unsafe_local_borrow(); + let (sched_context, last_task_context, _) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + Context::swap(last_task_context, sched_context); // We could be executing in a different thread now let sched = Scheduler::unsafe_local_borrow(); @@ -219,7 +238,8 @@ pub impl Scheduler { /// Switch directly to another task, without going through the scheduler. /// You would want to think hard about doing this, e.g. if there are /// pending I/O events it would be a bad idea. - fn resume_task_from_running_task_direct(&mut self, next_task: ~Task) { + fn resume_task_from_running_task_direct(~self, next_task: ~Task) { + let mut self = self; assert!(self.in_task_context()); rtdebug!("switching tasks"); @@ -227,12 +247,14 @@ pub impl Scheduler { let old_running_task = self.current_task.swap_unwrap(); self.enqueue_cleanup_job(RescheduleTask(old_running_task)); self.current_task = Some(next_task); - { - let (_, last_task_context, next_task_context) = self.get_contexts(); - let last_task_context = last_task_context.unwrap(); - let next_task_context = next_task_context.unwrap(); - Context::swap(last_task_context, next_task_context); - } + + local::put(self); + + let sched = Scheduler::unsafe_local_borrow(); + let (_, last_task_context, next_task_context) = sched.get_contexts(); + let last_task_context = last_task_context.unwrap(); + let next_task_context = next_task_context.unwrap(); + Context::swap(last_task_context, next_task_context); // We could be executing in a different thread now let sched = Scheduler::unsafe_local_borrow(); @@ -261,7 +283,7 @@ pub impl Scheduler { self.task_queue.push_front(task); } RecycleTask(task) => task.recycle(&mut self.stack_pool), - GiveTask(task, f) => (f.to_fn())(self, task) + GiveTask(task, f) => (f.to_fn())(task) } } @@ -338,7 +360,7 @@ pub impl Task { start(); - let sched = Scheduler::unsafe_local_borrow(); + let sched = Scheduler::local_take(); sched.terminate_current_task(); }; return wrapper; @@ -398,7 +420,7 @@ fn test_swap_tasks() { let mut sched = ~UvEventLoop::new_scheduler(); let task1 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } - let sched = Scheduler::unsafe_local_borrow(); + let mut sched = Scheduler::local_take(); let task2 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; @@ -463,7 +485,7 @@ fn test_run_a_lot_of_tasks_direct() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - let sched = Scheduler::unsafe_local_borrow(); + let mut sched = Scheduler::local_take(); let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -483,11 +505,14 @@ fn test_block_task() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::unsafe_local_borrow(); + let sched = Scheduler::local_take(); assert!(sched.in_task_context()); - do sched.deschedule_running_task_and_then() |sched, task| { - assert!(!sched.in_task_context()); - sched.task_queue.push_back(task); + do sched.deschedule_running_task_and_then() |task| { + let task = Cell(task); + do Scheduler::local_borrow |sched| { + assert!(!sched.in_task_context()); + sched.task_queue.push_back(task.take()); + } } }; sched.task_queue.push_back(task); diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index a43ec07c2de..e3fed29ddd2 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -104,14 +104,16 @@ impl IoFactory for UvIoFactory { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context - do scheduler.deschedule_running_task_and_then |scheduler, task| { + do scheduler.deschedule_running_task_and_then |task| { rtdebug!("connect: entered scheduler context"); - assert!(!scheduler.in_task_context()); + do Scheduler::local_borrow |scheduler| { + assert!(!scheduler.in_task_context()); + } let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); let task_cell = Cell(task); @@ -131,7 +133,7 @@ impl IoFactory for UvIoFactory { unsafe { (*result_cell_ptr).put_back(maybe_stream); } // Context switch - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -176,10 +178,10 @@ impl TcpListener for UvTcpListener { let server_tcp_watcher = self.watcher(); - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); assert!(scheduler.in_task_context()); - do scheduler.deschedule_running_task_and_then |_, task| { + do scheduler.deschedule_running_task_and_then |task| { let task_cell = Cell(task); let mut server_tcp_watcher = server_tcp_watcher; do server_tcp_watcher.listen |server_stream_watcher, status| { @@ -199,7 +201,7 @@ impl TcpListener for UvTcpListener { rtdebug!("resuming task from listen"); // Context switch - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -239,13 +241,15 @@ impl Stream for UvStream { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; - do scheduler.deschedule_running_task_and_then |scheduler, task| { + do scheduler.deschedule_running_task_and_then |task| { rtdebug!("read: entered scheduler context"); - assert!(!scheduler.in_task_context()); + do Scheduler::local_borrow |scheduler| { + assert!(!scheduler.in_task_context()); + } let mut watcher = watcher; let task_cell = Cell(task); // XXX: We shouldn't reallocate these callbacks every @@ -271,7 +275,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -283,11 +287,11 @@ impl Stream for UvStream { fn write(&mut self, buf: &[u8]) -> Result<(), ()> { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; - do scheduler.deschedule_running_task_and_then |_, task| { + do scheduler.deschedule_running_task_and_then |task| { let mut watcher = watcher; let task_cell = Cell(task); let buf = unsafe { &*buf_ptr }; @@ -302,7 +306,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -404,12 +408,15 @@ fn test_read_and_block() { } reads += 1; - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = Scheduler::local_take(); // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it - do scheduler.deschedule_running_task_and_then |scheduler, task| { - scheduler.task_queue.push_back(task); + do scheduler.deschedule_running_task_and_then |task| { + let task = Cell(task); + do Scheduler::local_borrow |scheduler| { + scheduler.task_queue.push_back(task.take()); + } } } From ed74ac169e3e79ff4ce59329a06223e23f2b99b6 Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 18:26:05 -0700 Subject: [PATCH 13/14] core::rt: Narrow down the unsafety of the thread-local scheduler Only when borrowing the I/O implementation do we need unsafety --- src/libcore/rt/sched/mod.rs | 52 +++++++++++++++++++++---------------- src/libcore/rt/uvio.rs | 42 +++++++++++++----------------- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index 2aaf0a44455..8fb8e2a1d10 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -14,7 +14,7 @@ use cast::transmute; use super::work_queue::WorkQueue; use super::stack::{StackPool, StackSegment}; -use super::rtio::{EventLoop, EventLoopObject}; +use super::rtio::{EventLoop, EventLoopObject, IoFactoryObject}; use super::context::Context; #[cfg(test)] use super::uvio::UvEventLoop; @@ -96,12 +96,12 @@ pub impl Scheduler { // Give ownership of the scheduler (self) to the thread local::put(self); - let scheduler = Scheduler::unsafe_local_borrow(); + let scheduler = unsafe { local::borrow() }; fn run_scheduler_once() { - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); if scheduler.resume_task_from_queue() { // Ok, a task ran. Nice! We'll do it again later - do Scheduler::local_borrow |scheduler| { + do Scheduler::borrow_local |scheduler| { scheduler.event_loop.callback(run_scheduler_once); } } @@ -113,21 +113,27 @@ pub impl Scheduler { return local::take(); } - /// Get a mutable pointer to the thread-local scheduler. + /// Get a mutable pointer to the thread-local I/O /// # Safety Note /// This allows other mutable aliases to the scheduler, both in the current /// execution context and other execution contexts. - fn unsafe_local_borrow() -> &mut Scheduler { - unsafe { local::borrow() } + unsafe fn borrow_local_io() -> &mut IoFactoryObject { + unsafe { + let io = local::borrow().event_loop.io().unwrap(); + transmute::<&mut IoFactoryObject, &mut IoFactoryObject>(io) + } } - fn local_borrow(f: &fn(&mut Scheduler)) { + /// Borrow the thread-local scheduler from thread-local storage. + /// While the scheduler is borrowed it is not available in TLS. + fn borrow_local(f: &fn(&mut Scheduler)) { let mut sched = local::take(); f(sched); local::put(sched); } - fn local_take() -> ~Scheduler { + /// Take ownership of the scheduler from thread local storage + fn take_local() -> ~Scheduler { local::take() } @@ -163,14 +169,14 @@ pub impl Scheduler { local::put(self); // Take pointers to both the task and scheduler's saved registers. - let sched = Scheduler::unsafe_local_borrow(); + let sched = unsafe { local::borrow() }; let (sched_context, _, next_task_context) = sched.get_contexts(); let next_task_context = next_task_context.unwrap(); // Context switch to the task, restoring it's registers // and saving the scheduler's Context::swap(sched_context, next_task_context); - let sched = Scheduler::unsafe_local_borrow(); + let sched = unsafe { local::borrow() }; // The running task should have passed ownership elsewhere assert!(sched.current_task.is_none()); @@ -194,7 +200,7 @@ pub impl Scheduler { local::put(self); - let sched = Scheduler::unsafe_local_borrow(); + let sched = unsafe { local::borrow() }; let (sched_context, last_task_context, _) = sched.get_contexts(); let last_task_context = last_task_context.unwrap(); Context::swap(last_task_context, sched_context); @@ -225,13 +231,13 @@ pub impl Scheduler { local::put(self); - let sched = Scheduler::unsafe_local_borrow(); + let sched = unsafe { local::borrow() }; let (sched_context, last_task_context, _) = sched.get_contexts(); let last_task_context = last_task_context.unwrap(); Context::swap(last_task_context, sched_context); // We could be executing in a different thread now - let sched = Scheduler::unsafe_local_borrow(); + let sched = unsafe { local::borrow() }; sched.run_cleanup_job(); } @@ -250,14 +256,14 @@ pub impl Scheduler { local::put(self); - let sched = Scheduler::unsafe_local_borrow(); + let sched = unsafe { local::borrow() }; let (_, last_task_context, next_task_context) = sched.get_contexts(); let last_task_context = last_task_context.unwrap(); let next_task_context = next_task_context.unwrap(); Context::swap(last_task_context, next_task_context); // We could be executing in a different thread now - let sched = Scheduler::unsafe_local_borrow(); + let sched = unsafe { local::borrow() }; sched.run_cleanup_job(); } @@ -355,12 +361,12 @@ pub impl Task { // This is the first code to execute after the initial // context switch to the task. The previous context may // have asked us to do some cleanup. - let sched = Scheduler::unsafe_local_borrow(); + let sched = unsafe { local::borrow() }; sched.run_cleanup_job(); start(); - let sched = Scheduler::local_take(); + let sched = Scheduler::take_local(); sched.terminate_current_task(); }; return wrapper; @@ -420,7 +426,7 @@ fn test_swap_tasks() { let mut sched = ~UvEventLoop::new_scheduler(); let task1 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } - let mut sched = Scheduler::local_take(); + let mut sched = Scheduler::take_local(); let task2 = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; } }; @@ -452,7 +458,7 @@ fn test_run_a_lot_of_tasks_queued() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - do Scheduler::local_borrow |sched| { + do Scheduler::borrow_local |sched| { let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -485,7 +491,7 @@ fn test_run_a_lot_of_tasks_direct() { assert!(count == MAX); fn run_task(count_ptr: *mut int) { - let mut sched = Scheduler::local_take(); + let mut sched = Scheduler::take_local(); let task = ~do Task::new(&mut sched.stack_pool) { unsafe { *count_ptr = *count_ptr + 1; @@ -505,11 +511,11 @@ fn test_block_task() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::local_take(); + let sched = Scheduler::take_local(); assert!(sched.in_task_context()); do sched.deschedule_running_task_and_then() |task| { let task = Cell(task); - do Scheduler::local_borrow |sched| { + do Scheduler::borrow_local |sched| { assert!(!sched.in_task_context()); sched.task_queue.push_back(task.take()); } diff --git a/src/libcore/rt/uvio.rs b/src/libcore/rt/uvio.rs index e3fed29ddd2..051d4ee7df9 100644 --- a/src/libcore/rt/uvio.rs +++ b/src/libcore/rt/uvio.rs @@ -104,14 +104,14 @@ impl IoFactory for UvIoFactory { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); assert!(scheduler.in_task_context()); // Block this task and take ownership, switch to scheduler context do scheduler.deschedule_running_task_and_then |task| { rtdebug!("connect: entered scheduler context"); - do Scheduler::local_borrow |scheduler| { + do Scheduler::borrow_local |scheduler| { assert!(!scheduler.in_task_context()); } let mut tcp_watcher = TcpWatcher::new(self.uv_loop()); @@ -133,7 +133,7 @@ impl IoFactory for UvIoFactory { unsafe { (*result_cell_ptr).put_back(maybe_stream); } // Context switch - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -178,7 +178,7 @@ impl TcpListener for UvTcpListener { let server_tcp_watcher = self.watcher(); - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); assert!(scheduler.in_task_context()); do scheduler.deschedule_running_task_and_then |task| { @@ -201,7 +201,7 @@ impl TcpListener for UvTcpListener { rtdebug!("resuming task from listen"); // Context switch - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -241,13 +241,13 @@ impl Stream for UvStream { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&mut [u8] = &buf; do scheduler.deschedule_running_task_and_then |task| { rtdebug!("read: entered scheduler context"); - do Scheduler::local_borrow |scheduler| { + do Scheduler::borrow_local |scheduler| { assert!(!scheduler.in_task_context()); } let mut watcher = watcher; @@ -275,7 +275,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -287,7 +287,7 @@ impl Stream for UvStream { fn write(&mut self, buf: &[u8]) -> Result<(), ()> { let result_cell = empty_cell(); let result_cell_ptr: *Cell> = &result_cell; - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); assert!(scheduler.in_task_context()); let watcher = self.watcher(); let buf_ptr: *&[u8] = &buf; @@ -306,7 +306,7 @@ impl Stream for UvStream { unsafe { (*result_cell_ptr).put_back(result); } - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); scheduler.resume_task_immediately(task_cell.take()); } } @@ -322,8 +322,7 @@ fn test_simple_io_no_connect() { do run_in_bare_thread { let mut sched = ~UvEventLoop::new_scheduler(); let task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::unsafe_local_borrow(); - let io = sched.event_loop.io().unwrap(); + let io = unsafe { Scheduler::borrow_local_io() }; let addr = Ipv4(127, 0, 0, 1, 2926); let maybe_chan = io.connect(addr); assert!(maybe_chan.is_none()); @@ -341,16 +340,14 @@ fn test_simple_tcp_server_and_client() { let addr = Ipv4(127, 0, 0, 1, 2929); let client_task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::unsafe_local_borrow(); - let io = sched.event_loop.io().unwrap(); + let io = unsafe { Scheduler::borrow_local_io() }; let mut stream = io.connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.close(); }; let server_task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::unsafe_local_borrow(); - let io = sched.event_loop.io().unwrap(); + let io = unsafe { Scheduler::borrow_local_io() }; let mut listener = io.bind(addr).unwrap(); let mut stream = listener.listen().unwrap(); let mut buf = [0, .. 2048]; @@ -378,8 +375,7 @@ fn test_read_and_block() { let addr = Ipv4(127, 0, 0, 1, 2930); let client_task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::unsafe_local_borrow(); - let io = sched.event_loop.io().unwrap(); + let io = unsafe { Scheduler::borrow_local_io() }; let mut stream = io.connect(addr).unwrap(); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); stream.write([0, 1, 2, 3, 4, 5, 6, 7]); @@ -389,8 +385,7 @@ fn test_read_and_block() { }; let server_task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::unsafe_local_borrow(); - let io = sched.event_loop.io().unwrap(); + let io = unsafe { Scheduler::borrow_local_io() }; let mut listener = io.bind(addr).unwrap(); let mut stream = listener.listen().unwrap(); let mut buf = [0, .. 2048]; @@ -408,13 +403,13 @@ fn test_read_and_block() { } reads += 1; - let scheduler = Scheduler::local_take(); + let scheduler = Scheduler::take_local(); // Yield to the other task in hopes that it // will trigger a read callback while we are // not ready for it do scheduler.deschedule_running_task_and_then |task| { let task = Cell(task); - do Scheduler::local_borrow |scheduler| { + do Scheduler::borrow_local |scheduler| { scheduler.task_queue.push_back(task.take()); } } @@ -441,8 +436,7 @@ fn test_read_read_read() { let addr = Ipv4(127, 0, 0, 1, 2931); let client_task = ~do Task::new(&mut sched.stack_pool) { - let sched = Scheduler::unsafe_local_borrow(); - let io = sched.event_loop.io().unwrap(); + let io = unsafe { Scheduler::borrow_local_io() }; let mut stream = io.connect(addr).unwrap(); let mut buf = [0, .. 2048]; let mut total_bytes_read = 0; From 1f94364aeff935eebadd3ae47dbae3eb099023da Mon Sep 17 00:00:00 2001 From: Brian Anderson Date: Mon, 15 Apr 2013 18:56:39 -0700 Subject: [PATCH 14/14] core::rt: Clean up some more hacks --- src/libcore/rt/sched/mod.rs | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/src/libcore/rt/sched/mod.rs b/src/libcore/rt/sched/mod.rs index 8fb8e2a1d10..08c52d8e8d4 100644 --- a/src/libcore/rt/sched/mod.rs +++ b/src/libcore/rt/sched/mod.rs @@ -46,17 +46,13 @@ pub struct Scheduler { // XXX: Some hacks to put a &fn in Scheduler without borrowck // complaining type UnsafeTaskReceiver = sys::Closure; -trait HackAroundBorrowCk { +trait ClosureConverter { fn from_fn(&fn(~Task)) -> Self; fn to_fn(self) -> &fn(~Task); } -impl HackAroundBorrowCk for UnsafeTaskReceiver { - fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { - unsafe { transmute(f) } - } - fn to_fn(self) -> &fn(~Task) { - unsafe { transmute(self) } - } +impl ClosureConverter for UnsafeTaskReceiver { + fn from_fn(f: &fn(~Task)) -> UnsafeTaskReceiver { unsafe { transmute(f) } } + fn to_fn(self) -> &fn(~Task) { unsafe { transmute(self) } } } enum CleanupJob { @@ -223,10 +219,8 @@ pub impl Scheduler { rtdebug!("blocking task"); let blocked_task = self.current_task.swap_unwrap(); - let f_fake_region = unsafe { - transmute::<&fn(~Task), &fn(~Task)>(f) - }; - let f_opaque = HackAroundBorrowCk::from_fn(f_fake_region); + let f_fake_region = unsafe { transmute::<&fn(~Task), &fn(~Task)>(f) }; + let f_opaque = ClosureConverter::from_fn(f_fake_region); self.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque)); local::put(self);