core::rt: Narrow down the unsafety of the thread-local scheduler

Only when borrowing the I/O implementation do we need unsafety
This commit is contained in:
Brian Anderson 2013-04-15 18:26:05 -07:00
parent 68583a25a0
commit ed74ac169e
2 changed files with 47 additions and 47 deletions

View File

@ -14,7 +14,7 @@ use cast::transmute;
use super::work_queue::WorkQueue;
use super::stack::{StackPool, StackSegment};
use super::rtio::{EventLoop, EventLoopObject};
use super::rtio::{EventLoop, EventLoopObject, IoFactoryObject};
use super::context::Context;
#[cfg(test)] use super::uvio::UvEventLoop;
@ -96,12 +96,12 @@ pub impl Scheduler {
// Give ownership of the scheduler (self) to the thread
local::put(self);
let scheduler = Scheduler::unsafe_local_borrow();
let scheduler = unsafe { local::borrow() };
fn run_scheduler_once() {
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
if scheduler.resume_task_from_queue() {
// Ok, a task ran. Nice! We'll do it again later
do Scheduler::local_borrow |scheduler| {
do Scheduler::borrow_local |scheduler| {
scheduler.event_loop.callback(run_scheduler_once);
}
}
@ -113,21 +113,27 @@ pub impl Scheduler {
return local::take();
}
/// Get a mutable pointer to the thread-local scheduler.
/// Get a mutable pointer to the thread-local I/O
/// # Safety Note
/// This allows other mutable aliases to the scheduler, both in the current
/// execution context and other execution contexts.
fn unsafe_local_borrow() -> &mut Scheduler {
unsafe { local::borrow() }
unsafe fn borrow_local_io() -> &mut IoFactoryObject {
unsafe {
let io = local::borrow().event_loop.io().unwrap();
transmute::<&mut IoFactoryObject, &mut IoFactoryObject>(io)
}
}
fn local_borrow(f: &fn(&mut Scheduler)) {
/// Borrow the thread-local scheduler from thread-local storage.
/// While the scheduler is borrowed it is not available in TLS.
fn borrow_local(f: &fn(&mut Scheduler)) {
let mut sched = local::take();
f(sched);
local::put(sched);
}
fn local_take() -> ~Scheduler {
/// Take ownership of the scheduler from thread local storage
fn take_local() -> ~Scheduler {
local::take()
}
@ -163,14 +169,14 @@ pub impl Scheduler {
local::put(self);
// Take pointers to both the task and scheduler's saved registers.
let sched = Scheduler::unsafe_local_borrow();
let sched = unsafe { local::borrow() };
let (sched_context, _, next_task_context) = sched.get_contexts();
let next_task_context = next_task_context.unwrap();
// Context switch to the task, restoring it's registers
// and saving the scheduler's
Context::swap(sched_context, next_task_context);
let sched = Scheduler::unsafe_local_borrow();
let sched = unsafe { local::borrow() };
// The running task should have passed ownership elsewhere
assert!(sched.current_task.is_none());
@ -194,7 +200,7 @@ pub impl Scheduler {
local::put(self);
let sched = Scheduler::unsafe_local_borrow();
let sched = unsafe { local::borrow() };
let (sched_context, last_task_context, _) = sched.get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
@ -225,13 +231,13 @@ pub impl Scheduler {
local::put(self);
let sched = Scheduler::unsafe_local_borrow();
let sched = unsafe { local::borrow() };
let (sched_context, last_task_context, _) = sched.get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
// We could be executing in a different thread now
let sched = Scheduler::unsafe_local_borrow();
let sched = unsafe { local::borrow() };
sched.run_cleanup_job();
}
@ -250,14 +256,14 @@ pub impl Scheduler {
local::put(self);
let sched = Scheduler::unsafe_local_borrow();
let sched = unsafe { local::borrow() };
let (_, last_task_context, next_task_context) = sched.get_contexts();
let last_task_context = last_task_context.unwrap();
let next_task_context = next_task_context.unwrap();
Context::swap(last_task_context, next_task_context);
// We could be executing in a different thread now
let sched = Scheduler::unsafe_local_borrow();
let sched = unsafe { local::borrow() };
sched.run_cleanup_job();
}
@ -355,12 +361,12 @@ pub impl Task {
// This is the first code to execute after the initial
// context switch to the task. The previous context may
// have asked us to do some cleanup.
let sched = Scheduler::unsafe_local_borrow();
let sched = unsafe { local::borrow() };
sched.run_cleanup_job();
start();
let sched = Scheduler::local_take();
let sched = Scheduler::take_local();
sched.terminate_current_task();
};
return wrapper;
@ -420,7 +426,7 @@ fn test_swap_tasks() {
let mut sched = ~UvEventLoop::new_scheduler();
let task1 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
let mut sched = Scheduler::local_take();
let mut sched = Scheduler::take_local();
let task2 = ~do Task::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
@ -452,7 +458,7 @@ fn test_run_a_lot_of_tasks_queued() {
assert!(count == MAX);
fn run_task(count_ptr: *mut int) {
do Scheduler::local_borrow |sched| {
do Scheduler::borrow_local |sched| {
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
@ -485,7 +491,7 @@ fn test_run_a_lot_of_tasks_direct() {
assert!(count == MAX);
fn run_task(count_ptr: *mut int) {
let mut sched = Scheduler::local_take();
let mut sched = Scheduler::take_local();
let task = ~do Task::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
@ -505,11 +511,11 @@ fn test_block_task() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
let sched = Scheduler::local_take();
let sched = Scheduler::take_local();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |task| {
let task = Cell(task);
do Scheduler::local_borrow |sched| {
do Scheduler::borrow_local |sched| {
assert!(!sched.in_task_context());
sched.task_queue.push_back(task.take());
}

View File

@ -104,14 +104,14 @@ impl IoFactory for UvIoFactory {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Option<~StreamObject>> = &result_cell;
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
assert!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |task| {
rtdebug!("connect: entered scheduler context");
do Scheduler::local_borrow |scheduler| {
do Scheduler::borrow_local |scheduler| {
assert!(!scheduler.in_task_context());
}
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
@ -133,7 +133,7 @@ impl IoFactory for UvIoFactory {
unsafe { (*result_cell_ptr).put_back(maybe_stream); }
// Context switch
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -178,7 +178,7 @@ impl TcpListener for UvTcpListener {
let server_tcp_watcher = self.watcher();
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
assert!(scheduler.in_task_context());
do scheduler.deschedule_running_task_and_then |task| {
@ -201,7 +201,7 @@ impl TcpListener for UvTcpListener {
rtdebug!("resuming task from listen");
// Context switch
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -241,13 +241,13 @@ impl Stream for UvStream {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, ()>> = &result_cell;
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| {
rtdebug!("read: entered scheduler context");
do Scheduler::local_borrow |scheduler| {
do Scheduler::borrow_local |scheduler| {
assert!(!scheduler.in_task_context());
}
let mut watcher = watcher;
@ -275,7 +275,7 @@ impl Stream for UvStream {
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -287,7 +287,7 @@ impl Stream for UvStream {
fn write(&mut self, buf: &[u8]) -> Result<(), ()> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<(), ()>> = &result_cell;
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
@ -306,7 +306,7 @@ impl Stream for UvStream {
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -322,8 +322,7 @@ fn test_simple_io_no_connect() {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Task::new(&mut sched.stack_pool) {
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let io = unsafe { Scheduler::borrow_local_io() };
let addr = Ipv4(127, 0, 0, 1, 2926);
let maybe_chan = io.connect(addr);
assert!(maybe_chan.is_none());
@ -341,16 +340,14 @@ fn test_simple_tcp_server_and_client() {
let addr = Ipv4(127, 0, 0, 1, 2929);
let client_task = ~do Task::new(&mut sched.stack_pool) {
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let io = unsafe { Scheduler::borrow_local_io() };
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.close();
};
let server_task = ~do Task::new(&mut sched.stack_pool) {
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let io = unsafe { Scheduler::borrow_local_io() };
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
@ -378,8 +375,7 @@ fn test_read_and_block() {
let addr = Ipv4(127, 0, 0, 1, 2930);
let client_task = ~do Task::new(&mut sched.stack_pool) {
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let io = unsafe { Scheduler::borrow_local_io() };
let mut stream = io.connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
@ -389,8 +385,7 @@ fn test_read_and_block() {
};
let server_task = ~do Task::new(&mut sched.stack_pool) {
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let io = unsafe { Scheduler::borrow_local_io() };
let mut listener = io.bind(addr).unwrap();
let mut stream = listener.listen().unwrap();
let mut buf = [0, .. 2048];
@ -408,13 +403,13 @@ fn test_read_and_block() {
}
reads += 1;
let scheduler = Scheduler::local_take();
let scheduler = Scheduler::take_local();
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.deschedule_running_task_and_then |task| {
let task = Cell(task);
do Scheduler::local_borrow |scheduler| {
do Scheduler::borrow_local |scheduler| {
scheduler.task_queue.push_back(task.take());
}
}
@ -441,8 +436,7 @@ fn test_read_read_read() {
let addr = Ipv4(127, 0, 0, 1, 2931);
let client_task = ~do Task::new(&mut sched.stack_pool) {
let sched = Scheduler::unsafe_local_borrow();
let io = sched.event_loop.io().unwrap();
let io = unsafe { Scheduler::borrow_local_io() };
let mut stream = io.connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;