auto merge of #6626 : brson/rust/io-upstream, r=graydon

r?

Mostly refactoring, and adding some of the remaining types described in #4419.

The [`Local`](3b4ff41511/src/libcore/rt/local.rs (L17)) trait collects some common, often unsafe patterns around task-local and thread-local values. Making all these types safe is largely the aim of #6210.
This commit is contained in:
bors 2013-05-20 21:40:27 -07:00
commit 7abcc142e5
20 changed files with 466 additions and 321 deletions

View File

@ -66,8 +66,11 @@ pub fn log_type<T>(level: u32, object: &T) {
}
fn newsched_log_str(msg: ~str) {
use rt::task::Task;
use rt::local::Local;
unsafe {
match rt::local_services::unsafe_try_borrow_local_services() {
match Local::try_unsafe_borrow::<Task>() {
Some(local) => {
// Use the available logger
(*local).logger.log(Left(msg));

View File

@ -20,8 +20,8 @@ use cast;
use util;
use ops::Drop;
use kinds::Owned;
use rt::sched::Coroutine;
use rt::local_sched;
use rt::sched::{Scheduler, Coroutine};
use rt::local::Local;
use unstable::intrinsics::{atomic_xchg, atomic_load};
use util::Void;
use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable};
@ -127,7 +127,7 @@ impl<T> ChanOne<T> {
task_as_state => {
// Port is blocked. Wake it up.
let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr);
}
}
@ -157,7 +157,7 @@ impl<T> PortOne<T> {
// XXX: Optimize this to not require the two context switches when data is available
// Switch to the scheduler to put the ~Task into the Packet state.
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
unsafe {
// Atomically swap the task pointer into the Packet state, issuing
@ -173,7 +173,7 @@ impl<T> PortOne<T> {
STATE_ONE => {
// Channel is closed. Switch back and check the data.
let task: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
}
_ => util::unreachable()
@ -239,7 +239,7 @@ impl<T> Drop for ChanOneHack<T> {
// The port is blocked waiting for a message we will never send. Wake it.
assert!((*this.packet()).payload.is_none());
let recvr: ~Coroutine = cast::transmute(task_as_state);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.schedule_task(recvr);
}
}

View File

@ -10,13 +10,13 @@
use option::{Option, Some, None};
use result::{Ok, Err};
use rt::sched::local_sched::unsafe_borrow_io;
use rt::io::net::ip::IpAddr;
use rt::io::{Reader, Writer, Listener};
use rt::io::{io_error, read_error, EndOfFile};
use rt::rtio::{IoFactory,
use rt::rtio::{IoFactory, IoFactoryObject,
RtioTcpListener, RtioTcpListenerObject,
RtioTcpStream, RtioTcpStreamObject};
use rt::local::Local;
pub struct TcpStream {
rtstream: ~RtioTcpStreamObject
@ -32,7 +32,7 @@ impl TcpStream {
pub fn connect(addr: IpAddr) -> Option<TcpStream> {
let stream = unsafe {
rtdebug!("borrowing io to connect");
let io = unsafe_borrow_io();
let io = Local::unsafe_borrow::<IoFactoryObject>();
rtdebug!("about to connect");
(*io).tcp_connect(addr)
};
@ -88,7 +88,10 @@ pub struct TcpListener {
impl TcpListener {
pub fn bind(addr: IpAddr) -> Option<TcpListener> {
let listener = unsafe { (*unsafe_borrow_io()).tcp_bind(addr) };
let listener = unsafe {
let io = Local::unsafe_borrow::<IoFactoryObject>();
(*io).tcp_bind(addr)
};
match listener {
Ok(l) => {
Some(TcpListener {

118
src/libcore/rt/local.rs Normal file
View File

@ -0,0 +1,118 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use option::{Option, Some, None};
use rt::sched::Scheduler;
use rt::task::Task;
use rt::local_ptr;
use rt::rtio::{EventLoop, IoFactoryObject};
pub trait Local {
fn put(value: ~Self);
fn take() -> ~Self;
fn exists() -> bool;
fn borrow(f: &fn(&mut Self));
unsafe fn unsafe_borrow() -> *mut Self;
unsafe fn try_unsafe_borrow() -> Option<*mut Self>;
}
impl Local for Scheduler {
fn put(value: ~Scheduler) { unsafe { local_ptr::put(value) }}
fn take() -> ~Scheduler { unsafe { local_ptr::take() } }
fn exists() -> bool { local_ptr::exists() }
fn borrow(f: &fn(&mut Scheduler)) { unsafe { local_ptr::borrow(f) } }
unsafe fn unsafe_borrow() -> *mut Scheduler { local_ptr::unsafe_borrow() }
unsafe fn try_unsafe_borrow() -> Option<*mut Scheduler> { abort!("unimpl") }
}
impl Local for Task {
fn put(value: ~Task) { abort!("unimpl") }
fn take() -> ~Task { abort!("unimpl") }
fn exists() -> bool { abort!("unimpl") }
fn borrow(f: &fn(&mut Task)) {
do Local::borrow::<Scheduler> |sched| {
match sched.current_task {
Some(~ref mut task) => {
f(&mut *task.task)
}
None => {
abort!("no scheduler")
}
}
}
}
unsafe fn unsafe_borrow() -> *mut Task {
match (*Local::unsafe_borrow::<Scheduler>()).current_task {
Some(~ref mut task) => {
let s: *mut Task = &mut *task.task;
return s;
}
None => {
// Don't fail. Infinite recursion
abort!("no scheduler")
}
}
}
unsafe fn try_unsafe_borrow() -> Option<*mut Task> {
if Local::exists::<Scheduler>() {
Some(Local::unsafe_borrow())
} else {
None
}
}
}
// XXX: This formulation won't work once ~IoFactoryObject is a real trait pointer
impl Local for IoFactoryObject {
fn put(value: ~IoFactoryObject) { abort!("unimpl") }
fn take() -> ~IoFactoryObject { abort!("unimpl") }
fn exists() -> bool { abort!("unimpl") }
fn borrow(f: &fn(&mut IoFactoryObject)) { abort!("unimpl") }
unsafe fn unsafe_borrow() -> *mut IoFactoryObject {
let sched = Local::unsafe_borrow::<Scheduler>();
let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
return io;
}
unsafe fn try_unsafe_borrow() -> Option<*mut IoFactoryObject> { abort!("unimpl") }
}
#[cfg(test)]
mod test {
use rt::sched::Scheduler;
use rt::uv::uvio::UvEventLoop;
use super::*;
#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
let _scheduler: ~Scheduler = Local::take();
}
#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
Local::put(scheduler);
unsafe {
let _scheduler: *mut Scheduler = Local::unsafe_borrow();
}
let _scheduler: ~Scheduler = Local::take();
}
}

View File

@ -8,43 +8,57 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Access to the thread-local Scheduler
//! Access to a single thread-local pointer.
//!
//! The runtime will use this for storing ~Task.
//!
//! XXX: Add runtime checks for usage of inconsistent pointer types.
//! and for overwriting an existing pointer.
use prelude::*;
use ptr::mut_null;
use libc::c_void;
use cast;
use ptr;
use cell::Cell;
use rt::sched::Scheduler;
use rt::rtio::{EventLoop, IoFactoryObject};
use tls = rt::thread_local_storage;
use option::{Option, Some, None};
use unstable::finally::Finally;
use tls = rt::thread_local_storage;
#[cfg(test)] use rt::uv::uvio::UvEventLoop;
/// Give the Scheduler to thread-local storage
pub fn put(sched: ~Scheduler) {
/// Initialize the TLS key. Other ops will fail if this isn't executed first.
pub fn init_tls_key() {
unsafe {
let key = tls_key();
let void_sched: *mut c_void = cast::transmute(sched);
tls::set(key, void_sched);
rust_initialize_rt_tls_key();
extern {
fn rust_initialize_rt_tls_key();
}
}
}
/// Take ownership of the Scheduler from thread-local storage
pub fn take() -> ~Scheduler {
unsafe {
let key = tls_key();
let void_sched: *mut c_void = tls::get(key);
rtassert!(void_sched.is_not_null());
let sched: ~Scheduler = cast::transmute(void_sched);
tls::set(key, mut_null());
return sched;
}
/// Give a pointer to thread-local storage.
///
/// # Safety note
///
/// Does not validate the pointer type.
pub unsafe fn put<T>(sched: ~T) {
let key = tls_key();
let void_ptr: *mut c_void = cast::transmute(sched);
tls::set(key, void_ptr);
}
/// Check whether there is a thread-local Scheduler attached to the running thread
/// Take ownership of a pointer from thread-local storage.
///
/// # Safety note
///
/// Does not validate the pointer type.
pub unsafe fn take<T>() -> ~T {
let key = tls_key();
let void_ptr: *mut c_void = tls::get(key);
rtassert!(void_ptr.is_not_null());
let ptr: ~T = cast::transmute(void_ptr);
tls::set(key, ptr::mut_null());
return ptr;
}
/// Check whether there is a thread-local pointer installed.
pub fn exists() -> bool {
unsafe {
match maybe_tls_key() {
@ -56,19 +70,21 @@ pub fn exists() -> bool {
/// Borrow the thread-local scheduler from thread-local storage.
/// While the scheduler is borrowed it is not available in TLS.
pub fn borrow(f: &fn(&mut Scheduler)) {
let mut sched = take();
///
/// # Safety note
///
/// Does not validate the pointer type.
pub unsafe fn borrow<T>(f: &fn(&mut T)) {
let mut value = take();
// XXX: Need a different abstraction from 'finally' here to avoid unsafety
unsafe {
let unsafe_sched = cast::transmute_mut_region(&mut *sched);
let sched = Cell(sched);
let unsafe_ptr = cast::transmute_mut_region(&mut *value);
let value_cell = Cell(value);
do (|| {
f(unsafe_sched);
}).finally {
put(sched.take());
}
do (|| {
f(unsafe_ptr);
}).finally {
put(value_cell.take());
}
}
@ -78,24 +94,18 @@ pub fn borrow(f: &fn(&mut Scheduler)) {
///
/// Because this leaves the Scheduler in thread-local storage it is possible
/// For the Scheduler pointer to be aliased
pub unsafe fn unsafe_borrow() -> *mut Scheduler {
pub unsafe fn unsafe_borrow<T>() -> *mut T {
let key = tls_key();
let mut void_sched: *mut c_void = tls::get(key);
rtassert!(void_sched.is_not_null());
{
let sched: *mut *mut c_void = &mut void_sched;
let sched: *mut ~Scheduler = sched as *mut ~Scheduler;
let sched: *mut Scheduler = &mut **sched;
let sched: *mut ~T = sched as *mut ~T;
let sched: *mut T = &mut **sched;
return sched;
}
}
pub unsafe fn unsafe_borrow_io() -> *mut IoFactoryObject {
let sched = unsafe_borrow();
let io: *mut IoFactoryObject = (*sched).event_loop.io().unwrap();
return io;
}
fn tls_key() -> tls::Key {
match maybe_tls_key() {
Some(key) => key,
@ -126,36 +136,10 @@ fn maybe_tls_key() -> Option<tls::Key> {
return None;
}
}
}
extern {
#[fast_ffi]
fn rust_get_rt_tls_key() -> *mut c_void;
}
#[test]
fn thread_local_scheduler_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
let _scheduler = take();
}
#[test]
fn thread_local_scheduler_two_instances() {
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
let _scheduler = take();
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
let _scheduler = take();
}
#[test]
fn borrow_smoke_test() {
let scheduler = ~UvEventLoop::new_scheduler();
put(scheduler);
unsafe {
let _scheduler = unsafe_borrow();
extern {
#[fast_ffi]
fn rust_get_rt_tls_key() -> *mut c_void;
}
let _scheduler = take();
}

View File

@ -0,0 +1,53 @@
// Copyright 2013 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use container::Container;
use kinds::Owned;
use vec::OwnedVector;
use cell::Cell;
use option::*;
use unstable::sync::{Exclusive, exclusive};
use clone::Clone;
pub struct MessageQueue<T> {
// XXX: Another mystery bug fixed by boxing this lock
priv queue: ~Exclusive<~[T]>
}
impl<T: Owned> MessageQueue<T> {
pub fn new() -> MessageQueue<T> {
MessageQueue {
queue: ~exclusive(~[])
}
}
pub fn push(&mut self, value: T) {
let value = Cell(value);
self.queue.with(|q| q.push(value.take()) );
}
pub fn pop(&mut self) -> Option<T> {
do self.queue.with |q| {
if !q.is_empty() {
Some(q.shift())
} else {
None
}
}
}
}
impl<T> Clone for MessageQueue<T> {
fn clone(&self) -> MessageQueue<T> {
MessageQueue {
queue: self.queue.clone()
}
}
}

View File

@ -31,14 +31,8 @@ access to the global heap. Unlike most of `rt` the global heap is
truly a global resource and generally operates independently of the
rest of the runtime.
All other runtime features are 'local', either thread-local or
task-local. Those critical to the functioning of the language are
defined in the module `local_services`. Local services are those which
are expected to be available to Rust code generally but rely on
thread- or task-local state. These currently include the local heap,
All other runtime features are task-local, including the local heap,
the garbage collector, local storage, logging and the stack unwinder.
Local services are primarily implemented for tasks, but may also
be implemented for use outside of tasks.
The relationship between `rt` and the rest of the core library is
not entirely clear yet and some modules will be moving into or
@ -67,19 +61,16 @@ use ptr::Ptr;
/// The global (exchange) heap.
pub mod global_heap;
/// The Scheduler and Coroutine types.
mod sched;
/// Implementations of language-critical runtime features like @.
pub mod task;
/// Thread-local access to the current Scheduler.
pub mod local_sched;
/// The coroutine task scheduler, built on the `io` event loop.
mod sched;
/// Synchronous I/O.
#[path = "io/mod.rs"]
pub mod io;
/// Thread-local implementations of language-critical runtime features like @.
pub mod local_services;
/// The EventLoop and internal synchronous I/O interface.
mod rtio;
@ -87,13 +78,16 @@ mod rtio;
#[path = "uv/mod.rs"]
pub mod uv;
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
/// Bindings to pthread/windows thread-local storage.
pub mod thread_local_storage;
/// The Local trait for types that are accessible via thread-local
/// or task-local storage.
pub mod local;
/// A parallel work-stealing dequeue.
/// A parallel work-stealing deque.
mod work_queue;
/// A parallel queue.
mod message_queue;
/// Stack segments and caching.
mod stack;
@ -125,6 +119,15 @@ pub mod tube;
/// Simple reimplementation of core::comm
pub mod comm;
// FIXME #5248 shouldn't be pub
/// The runtime needs to be able to put a pointer into thread-local storage.
pub mod local_ptr;
// FIXME #5248: The import in `sched` doesn't resolve unless this is pub!
/// Bindings to pthread/windows thread-local storage.
pub mod thread_local_storage;
/// Set up a default runtime configuration, given compiler-supplied arguments.
///
/// This is invoked by the `start` _language item_ (unstable::lang) to
@ -182,16 +185,17 @@ pub enum RuntimeContext {
pub fn context() -> RuntimeContext {
use task::rt::rust_task;
use self::sched::local_sched;
use self::local::Local;
use self::sched::Scheduler;
// XXX: Hitting TLS twice to check if the scheduler exists
// then to check for the task is not good for perf
if unsafe { rust_try_get_task().is_not_null() } {
return OldTaskContext;
} else {
if local_sched::exists() {
if Local::exists::<Scheduler>() {
let context = ::cell::empty_cell();
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
if sched.in_task_context() {
context.put_back(TaskContext);
} else {
@ -213,9 +217,10 @@ pub fn context() -> RuntimeContext {
#[test]
fn test_context() {
use unstable::run_in_bare_thread;
use self::sched::{local_sched, Coroutine};
use self::sched::{Scheduler, Coroutine};
use rt::uv::uvio::UvEventLoop;
use cell::Cell;
use rt::local::Local;
assert_eq!(context(), OldTaskContext);
do run_in_bare_thread {
@ -223,11 +228,11 @@ fn test_context() {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Coroutine::new(&mut sched.stack_pool) {
assert_eq!(context(), TaskContext);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |task| {
assert_eq!(context(), SchedulerContext);
let task = Cell(task);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
}

View File

@ -11,16 +11,16 @@
use option::*;
use sys;
use cast::transmute;
use cell::Cell;
use super::work_queue::WorkQueue;
use super::stack::{StackPool, StackSegment};
use super::rtio::{EventLoop, EventLoopObject};
use super::context::Context;
use super::local_services::LocalServices;
use cell::Cell;
// A more convenient name for external callers, e.g. `local_sched::take()`
pub mod local_sched;
use super::task::Task;
use rt::local_ptr;
use rt::local::Local;
use rt::rtio::IoFactoryObject;
/// The Scheduler is responsible for coordinating execution of Coroutines
/// on a single thread. When the scheduler is running it is owned by
@ -64,11 +64,8 @@ pub impl Scheduler {
fn new(event_loop: ~EventLoopObject) -> Scheduler {
// Lazily initialize the global state, currently the scheduler TLS key
unsafe { rust_initialize_global_state(); }
extern {
fn rust_initialize_global_state();
}
// Lazily initialize the runtime TLS key
local_ptr::init_tls_key();
Scheduler {
event_loop: event_loop,
@ -96,12 +93,12 @@ pub impl Scheduler {
};
// Give ownership of the scheduler (self) to the thread
local_sched::put(self_sched);
Local::put(self_sched);
(*event_loop).run();
}
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
assert!(sched.work_queue.is_empty());
return sched;
}
@ -112,11 +109,11 @@ pub impl Scheduler {
/// to run it later. Always use this instead of pushing to the work queue
/// directly.
fn enqueue_task(&mut self, task: ~Coroutine) {
self.work_queue.push_front(task);
self.work_queue.push(task);
self.event_loop.callback(resume_task_from_queue);
fn resume_task_from_queue() {
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_from_queue();
}
}
@ -129,14 +126,14 @@ pub impl Scheduler {
rtdebug!("looking in work queue for task to schedule");
let mut this = self;
match this.work_queue.pop_front() {
match this.work_queue.pop() {
Some(task) => {
rtdebug!("resuming task from work queue");
this.resume_task_immediately(task);
}
None => {
rtdebug!("no tasks in queue");
local_sched::put(this);
Local::put(this);
}
}
}
@ -152,7 +149,7 @@ pub impl Scheduler {
do self.deschedule_running_task_and_then |dead_task| {
let dead_task = Cell(dead_task);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
dead_task.take().recycle(&mut sched.stack_pool);
}
}
@ -165,7 +162,7 @@ pub impl Scheduler {
do self.switch_running_tasks_and_then(task) |last_task| {
let last_task = Cell(last_task);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(last_task.take());
}
}
@ -176,7 +173,7 @@ pub impl Scheduler {
do self.switch_running_tasks_and_then(task) |last_task| {
let last_task = Cell(last_task);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(last_task.take());
}
}
@ -194,18 +191,18 @@ pub impl Scheduler {
this.current_task = Some(task);
this.enqueue_cleanup_job(DoNothing);
local_sched::put(this);
Local::put(this);
// Take pointers to both the task and scheduler's saved registers.
unsafe {
let sched = local_sched::unsafe_borrow();
let sched = Local::unsafe_borrow::<Scheduler>();
let (sched_context, _, next_task_context) = (*sched).get_contexts();
let next_task_context = next_task_context.unwrap();
// Context switch to the task, restoring it's registers
// and saving the scheduler's
Context::swap(sched_context, next_task_context);
let sched = local_sched::unsafe_borrow();
let sched = Local::unsafe_borrow::<Scheduler>();
// The running task should have passed ownership elsewhere
assert!((*sched).current_task.is_none());
@ -235,16 +232,16 @@ pub impl Scheduler {
this.enqueue_cleanup_job(GiveTask(blocked_task, f_opaque));
}
local_sched::put(this);
Local::put(this);
unsafe {
let sched = local_sched::unsafe_borrow();
let sched = Local::unsafe_borrow::<Scheduler>();
let (sched_context, last_task_context, _) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap();
Context::swap(last_task_context, sched_context);
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job();
}
}
@ -264,17 +261,17 @@ pub impl Scheduler {
this.enqueue_cleanup_job(GiveTask(old_running_task, f_opaque));
this.current_task = Some(next_task);
local_sched::put(this);
Local::put(this);
unsafe {
let sched = local_sched::unsafe_borrow();
let sched = Local::unsafe_borrow::<Scheduler>();
let (_, last_task_context, next_task_context) = (*sched).get_contexts();
let last_task_context = last_task_context.unwrap();
let next_task_context = next_task_context.unwrap();
Context::swap(last_task_context, next_task_context);
// We could be executing in a different thread now
let sched = local_sched::unsafe_borrow();
let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job();
}
}
@ -350,16 +347,16 @@ pub struct Coroutine {
/// the task is dead
priv saved_context: Context,
/// The heap, GC, unwinding, local storage, logging
local_services: LocalServices
task: ~Task
}
pub impl Coroutine {
fn new(stack_pool: &mut StackPool, start: ~fn()) -> Coroutine {
Coroutine::with_local(stack_pool, LocalServices::new(), start)
Coroutine::with_task(stack_pool, ~Task::new(), start)
}
fn with_local(stack_pool: &mut StackPool,
local_services: LocalServices,
fn with_task(stack_pool: &mut StackPool,
task: ~Task,
start: ~fn()) -> Coroutine {
let start = Coroutine::build_start_wrapper(start);
let mut stack = stack_pool.take_segment(MIN_STACK_SIZE);
@ -368,7 +365,7 @@ pub impl Coroutine {
return Coroutine {
current_stack_segment: stack,
saved_context: initial_context,
local_services: local_services
task: task
};
}
@ -379,16 +376,16 @@ pub impl Coroutine {
// context switch to the task. The previous context may
// have asked us to do some cleanup.
unsafe {
let sched = local_sched::unsafe_borrow();
let sched = Local::unsafe_borrow::<Scheduler>();
(*sched).run_cleanup_job();
let sched = local_sched::unsafe_borrow();
let sched = Local::unsafe_borrow::<Scheduler>();
let task = (*sched).current_task.get_mut_ref();
// FIXME #6141: shouldn't neet to put `start()` in another closure
task.local_services.run(||start());
task.task.run(||start());
}
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.terminate_current_task();
};
return wrapper;
@ -411,6 +408,7 @@ mod test {
use rt::uv::uvio::UvEventLoop;
use unstable::run_in_bare_thread;
use task::spawn;
use rt::local::Local;
use rt::test::*;
use super::*;
@ -458,14 +456,14 @@ mod test {
let mut sched = ~UvEventLoop::new_scheduler();
let task1 = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
let mut sched = local_sched::take();
let mut sched = Local::take::<Scheduler>();
let task2 = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe { *count_ptr = *count_ptr + 1; }
};
// Context switch directly to the new task
do sched.switch_running_tasks_and_then(task2) |task1| {
let task1 = Cell(task1);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task1.take());
}
}
@ -495,7 +493,7 @@ mod test {
assert_eq!(count, MAX);
fn run_task(count_ptr: *mut int) {
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
let task = ~do Coroutine::new(&mut sched.stack_pool) {
unsafe {
*count_ptr = *count_ptr + 1;
@ -515,11 +513,11 @@ mod test {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~do Coroutine::new(&mut sched.stack_pool) {
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
assert!(sched.in_task_context());
do sched.deschedule_running_task_and_then() |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
assert!(!sched.in_task_context());
sched.enqueue_task(task.take());
}
@ -538,17 +536,17 @@ mod test {
// exit before emptying the work queue
do run_in_newsched_task {
do spawn {
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
let mut sched = local_sched::take();
let mut sched = Local::take::<Scheduler>();
let task = Cell(task);
do sched.event_loop.callback_ms(10) {
rtdebug!("in callback");
let mut sched = local_sched::take();
let mut sched = Local::take::<Scheduler>();
sched.enqueue_task(task.take());
local_sched::put(sched);
Local::put(sched);
}
local_sched::put(sched);
Local::put(sched);
}
}
}

View File

@ -13,19 +13,15 @@
//! local storage, and logging. Even a 'freestanding' Rust would likely want
//! to implement this.
//! Local services may exist in at least three different contexts:
//! when running as a task, when running in the scheduler's context,
//! or when running outside of a scheduler but with local services
//! (freestanding rust with local services?).
use prelude::*;
use libc::{c_void, uintptr_t};
use cast::transmute;
use super::sched::local_sched;
use super::sched::Scheduler;
use rt::local::Local;
use super::local_heap::LocalHeap;
use rt::logging::StdErrLogger;
pub struct LocalServices {
pub struct Task {
heap: LocalHeap,
gc: GarbageCollector,
storage: LocalStorage,
@ -41,9 +37,9 @@ pub struct Unwinder {
unwinding: bool,
}
impl LocalServices {
pub fn new() -> LocalServices {
LocalServices {
impl Task {
pub fn new() -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
@ -53,8 +49,8 @@ impl LocalServices {
}
}
pub fn without_unwinding() -> LocalServices {
LocalServices {
pub fn without_unwinding() -> Task {
Task {
heap: LocalHeap::new(),
gc: GarbageCollector,
storage: LocalStorage(ptr::null(), None),
@ -66,9 +62,9 @@ impl LocalServices {
pub fn run(&mut self, f: &fn()) {
// This is just an assertion that `run` was called unsafely
// and this instance of LocalServices is still accessible.
do borrow_local_services |sched| {
assert!(ptr::ref_eq(sched, self));
// and this instance of Task is still accessible.
do Local::borrow::<Task> |task| {
assert!(ptr::ref_eq(task, self));
}
match self.unwinder {
@ -86,14 +82,14 @@ impl LocalServices {
/// Must be called manually before finalization to clean up
/// thread-local resources. Some of the routines here expect
/// LocalServices to be available recursively so this must be
/// called unsafely, without removing LocalServices from
/// Task to be available recursively so this must be
/// called unsafely, without removing Task from
/// thread-local-storage.
fn destroy(&mut self) {
// This is just an assertion that `destroy` was called unsafely
// and this instance of LocalServices is still accessible.
do borrow_local_services |sched| {
assert!(ptr::ref_eq(sched, self));
// and this instance of Task is still accessible.
do Local::borrow::<Task> |task| {
assert!(ptr::ref_eq(task, self));
}
match self.storage {
LocalStorage(ptr, Some(ref dtor)) => {
@ -105,7 +101,7 @@ impl LocalServices {
}
}
impl Drop for LocalServices {
impl Drop for Task {
fn finalize(&self) { assert!(self.destroyed) }
}
@ -154,42 +150,6 @@ impl Unwinder {
}
}
/// Borrow a pointer to the installed local services.
/// Fails (likely aborting the process) if local services are not available.
pub fn borrow_local_services(f: &fn(&mut LocalServices)) {
do local_sched::borrow |sched| {
match sched.current_task {
Some(~ref mut task) => {
f(&mut task.local_services)
}
None => {
fail!("no local services for schedulers yet")
}
}
}
}
pub unsafe fn unsafe_borrow_local_services() -> *mut LocalServices {
match (*local_sched::unsafe_borrow()).current_task {
Some(~ref mut task) => {
let s: *mut LocalServices = &mut task.local_services;
return s;
}
None => {
// Don't fail. Infinite recursion
abort!("no local services for schedulers yet")
}
}
}
pub unsafe fn unsafe_try_borrow_local_services() -> Option<*mut LocalServices> {
if local_sched::exists() {
Some(unsafe_borrow_local_services())
} else {
None
}
}
#[cfg(test)]
mod test {
use rt::test::*;

View File

@ -13,8 +13,9 @@ use option::*;
use cell::Cell;
use result::{Result, Ok, Err};
use super::io::net::ip::{IpAddr, Ipv4};
use rt::local_services::LocalServices;
use rt::task::Task;
use rt::thread::Thread;
use rt::local::Local;
/// Creates a new scheduler in a new thread and runs a task in it,
/// then waits for the scheduler to exit. Failure of the task
@ -28,9 +29,9 @@ pub fn run_in_newsched_task(f: ~fn()) {
do run_in_bare_thread {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f.take());
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f.take());
sched.enqueue_task(task);
sched.run();
}
@ -40,13 +41,13 @@ pub fn run_in_newsched_task(f: ~fn()) {
pub fn spawntask(f: ~fn()) {
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.schedule_new_task(task.take());
}
}
@ -55,13 +56,13 @@ pub fn spawntask(f: ~fn()) {
pub fn spawntask_immediately(f: ~fn()) {
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
}
@ -71,13 +72,13 @@ pub fn spawntask_immediately(f: ~fn()) {
pub fn spawntask_later(f: ~fn()) {
use super::sched::*;
let mut sched = local_sched::take();
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
sched.enqueue_task(task);
local_sched::put(sched);
Local::put(sched);
}
/// Spawn a task and either run it immediately or run it later
@ -88,21 +89,21 @@ pub fn spawntask_random(f: ~fn()) {
let mut rng = rng();
let run_now: bool = Rand::rand(&mut rng);
let mut sched = local_sched::take();
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f);
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f);
if run_now {
do sched.switch_running_tasks_and_then(task) |task| {
let task = Cell(task);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(task.take());
}
}
} else {
sched.enqueue_task(task);
local_sched::put(sched);
Local::put(sched);
}
}
@ -120,21 +121,21 @@ pub fn spawntask_try(f: ~fn()) -> Result<(), ()> {
// Switch to the scheduler
let f = Cell(Cell(f));
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then() |old_task| {
let old_task = Cell(old_task);
let f = f.take();
let mut sched = local_sched::take();
let mut sched = Local::take::<Scheduler>();
let new_task = ~do Coroutine::new(&mut sched.stack_pool) {
do (|| {
(f.take())()
}).finally {
// Check for failure then resume the parent task
unsafe { *failed_ptr = task::failing(); }
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.switch_running_tasks_and_then(old_task.take()) |new_task| {
let new_task = Cell(new_task);
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
sched.enqueue_task(new_task.take());
}
}
@ -155,9 +156,9 @@ pub fn spawntask_thread(f: ~fn()) -> Thread {
let f = Cell(f);
let thread = do Thread::start {
let mut sched = ~UvEventLoop::new_scheduler();
let task = ~Coroutine::with_local(&mut sched.stack_pool,
LocalServices::without_unwinding(),
f.take());
let task = ~Coroutine::with_task(&mut sched.stack_pool,
~Task::without_unwinding(),
f.take());
sched.enqueue_task(task);
sched.run();
};

View File

@ -16,9 +16,9 @@
use option::*;
use clone::Clone;
use super::rc::RC;
use rt::sched::Coroutine;
use rt::sched::{Scheduler, Coroutine};
use rt::{context, TaskContext, SchedulerContext};
use rt::local_sched;
use rt::local::Local;
use vec::OwnedVector;
use container::Container;
@ -53,7 +53,7 @@ impl<T> Tube<T> {
// There's a waiting task. Wake it up
rtdebug!("waking blocked tube");
let task = (*state).blocked_task.swap_unwrap();
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
}
}
@ -71,7 +71,7 @@ impl<T> Tube<T> {
rtdebug!("blocking on tube recv");
assert!(self.p.refcount() > 1); // There better be somebody to wake us up
assert!((*state).blocked_task.is_none());
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
(*state).blocked_task = Some(task);
}
@ -94,9 +94,10 @@ impl<T> Clone for Tube<T> {
mod test {
use int;
use cell::Cell;
use rt::local_sched;
use rt::test::*;
use rt::rtio::EventLoop;
use rt::sched::Scheduler;
use rt::local::Local;
use super::*;
#[test]
@ -105,11 +106,11 @@ mod test {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone_cell = Cell(tube_clone);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
let mut tube_clone = tube_clone_cell.take();
tube_clone.send(1);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
}
@ -123,10 +124,10 @@ mod test {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone = Cell(Cell(Cell(tube_clone)));
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
let tube_clone = tube_clone.take();
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
let tube_clone = tube_clone.take();
do sched.event_loop.callback {
let mut tube_clone = tube_clone.take();
@ -135,7 +136,7 @@ mod test {
tube_clone.send(1);
}
}
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
}
@ -151,7 +152,7 @@ mod test {
let mut tube: Tube<int> = Tube::new();
let tube_clone = tube.clone();
let tube_clone = Cell(tube_clone);
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
do sched.deschedule_running_task_and_then |task| {
callback_send(tube_clone.take(), 0);
@ -159,7 +160,7 @@ mod test {
if i == 100 { return; }
let tube = Cell(Cell(tube));
do local_sched::borrow |sched| {
do Local::borrow::<Scheduler> |sched| {
let tube = tube.take();
do sched.event_loop.callback {
let mut tube = tube.take();
@ -171,7 +172,7 @@ mod test {
}
}
let sched = local_sched::take();
let sched = Local::take::<Scheduler>();
sched.resume_task_immediately(task);
}

View File

@ -19,9 +19,10 @@ use rt::io::net::ip::IpAddr;
use rt::uv::*;
use rt::uv::idle::IdleWatcher;
use rt::rtio::*;
use rt::sched::{Scheduler, local_sched};
use rt::sched::Scheduler;
use rt::io::{standard_error, OtherIoError};
use rt::tube::Tube;
use rt::local::Local;
#[cfg(test)] use container::Container;
#[cfg(test)] use uint;
@ -118,14 +119,14 @@ impl IoFactory for UvIoFactory {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<~RtioTcpStreamObject, IoError>> = &result_cell;
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
// Block this task and take ownership, switch to scheduler context
do scheduler.deschedule_running_task_and_then |task| {
rtdebug!("connect: entered scheduler context");
do local_sched::borrow |scheduler| {
do Local::borrow::<Scheduler> |scheduler| {
assert!(!scheduler.in_task_context());
}
let mut tcp_watcher = TcpWatcher::new(self.uv_loop());
@ -142,7 +143,7 @@ impl IoFactory for UvIoFactory {
unsafe { (*result_cell_ptr).put_back(res); }
// Context switch
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
} else {
rtdebug!("status is some");
@ -150,7 +151,7 @@ impl IoFactory for UvIoFactory {
do stream_watcher.close {
let res = Err(uv_error_to_io_error(status.get()));
unsafe { (*result_cell_ptr).put_back(res); }
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
};
@ -166,11 +167,11 @@ impl IoFactory for UvIoFactory {
match watcher.bind(addr) {
Ok(_) => Ok(~UvTcpListener::new(watcher)),
Err(uverr) => {
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.as_stream().close {
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -202,11 +203,11 @@ impl UvTcpListener {
impl Drop for UvTcpListener {
fn finalize(&self) {
let watcher = self.watcher();
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.as_stream().close {
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -264,11 +265,11 @@ impl Drop for UvTcpStream {
fn finalize(&self) {
rtdebug!("closing tcp stream");
let watcher = self.watcher();
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
do scheduler.deschedule_running_task_and_then |task| {
let task_cell = Cell(task);
do watcher.close {
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -280,13 +281,13 @@ impl RtioTcpStream for UvTcpStream {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<uint, IoError>> = &result_cell;
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&mut [u8] = &buf;
do scheduler.deschedule_running_task_and_then |task| {
rtdebug!("read: entered scheduler context");
do local_sched::borrow |scheduler| {
do Local::borrow::<Scheduler> |scheduler| {
assert!(!scheduler.in_task_context());
}
let mut watcher = watcher;
@ -314,7 +315,7 @@ impl RtioTcpStream for UvTcpStream {
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -326,7 +327,7 @@ impl RtioTcpStream for UvTcpStream {
fn write(&mut self, buf: &[u8]) -> Result<(), IoError> {
let result_cell = empty_cell();
let result_cell_ptr: *Cell<Result<(), IoError>> = &result_cell;
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
assert!(scheduler.in_task_context());
let watcher = self.watcher();
let buf_ptr: *&[u8] = &buf;
@ -343,7 +344,7 @@ impl RtioTcpStream for UvTcpStream {
unsafe { (*result_cell_ptr).put_back(result); }
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
scheduler.resume_task_immediately(task_cell.take());
}
}
@ -357,7 +358,7 @@ impl RtioTcpStream for UvTcpStream {
fn test_simple_io_no_connect() {
do run_in_newsched_task {
unsafe {
let io = local_sched::unsafe_borrow_io();
let io = Local::unsafe_borrow::<IoFactoryObject>();
let addr = next_test_ip4();
let maybe_chan = (*io).tcp_connect(addr);
assert!(maybe_chan.is_err());
@ -373,7 +374,7 @@ fn test_simple_tcp_server_and_client() {
// Start the server first so it's listening when we connect
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
@ -388,7 +389,7 @@ fn test_simple_tcp_server_and_client() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
}
@ -402,7 +403,7 @@ fn test_read_and_block() {
let addr = next_test_ip4();
do spawntask_immediately {
let io = unsafe { local_sched::unsafe_borrow_io() };
let io = unsafe { Local::unsafe_borrow::<IoFactoryObject>() };
let mut listener = unsafe { (*io).tcp_bind(addr).unwrap() };
let mut stream = listener.accept().unwrap();
let mut buf = [0, .. 2048];
@ -420,13 +421,13 @@ fn test_read_and_block() {
}
reads += 1;
let scheduler = local_sched::take();
let scheduler = Local::take::<Scheduler>();
// Yield to the other task in hopes that it
// will trigger a read callback while we are
// not ready for it
do scheduler.deschedule_running_task_and_then |task| {
let task = Cell(task);
do local_sched::borrow |scheduler| {
do Local::borrow::<Scheduler> |scheduler| {
scheduler.enqueue_task(task.take());
}
}
@ -438,7 +439,7 @@ fn test_read_and_block() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
stream.write([0, 1, 2, 3, 4, 5, 6, 7]);
@ -458,7 +459,7 @@ fn test_read_read_read() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut listener = (*io).tcp_bind(addr).unwrap();
let mut stream = listener.accept().unwrap();
let buf = [1, .. 2048];
@ -472,7 +473,7 @@ fn test_read_read_read() {
do spawntask_immediately {
unsafe {
let io = local_sched::unsafe_borrow_io();
let io = Local::unsafe_borrow::<IoFactoryObject>();
let mut stream = (*io).tcp_connect(addr).unwrap();
let mut buf = [0, .. 2048];
let mut total_bytes_read = 0;

View File

@ -11,43 +11,57 @@
use container::Container;
use option::*;
use vec::OwnedVector;
use unstable::sync::{Exclusive, exclusive};
use cell::Cell;
use kinds::Owned;
use clone::Clone;
pub struct WorkQueue<T> {
priv queue: ~[T]
// XXX: Another mystery bug fixed by boxing this lock
priv queue: ~Exclusive<~[T]>
}
pub impl<T> WorkQueue<T> {
pub impl<T: Owned> WorkQueue<T> {
fn new() -> WorkQueue<T> {
WorkQueue {
queue: ~[]
queue: ~exclusive(~[])
}
}
fn push_back(&mut self, value: T) {
self.queue.push(value)
fn push(&mut self, value: T) {
let value = Cell(value);
self.queue.with(|q| q.unshift(value.take()) );
}
fn pop_back(&mut self) -> Option<T> {
if !self.queue.is_empty() {
Some(self.queue.pop())
} else {
None
fn pop(&mut self) -> Option<T> {
do self.queue.with |q| {
if !q.is_empty() {
Some(q.shift())
} else {
None
}
}
}
fn push_front(&mut self, value: T) {
self.queue.unshift(value)
}
fn pop_front(&mut self) -> Option<T> {
if !self.queue.is_empty() {
Some(self.queue.shift())
} else {
None
fn steal(&mut self) -> Option<T> {
do self.queue.with |q| {
if !q.is_empty() {
Some(q.pop())
} else {
None
}
}
}
fn is_empty(&self) -> bool {
return self.queue.is_empty();
self.queue.with_imm(|q| q.is_empty() )
}
}
impl<T> Clone for WorkQueue<T> {
fn clone(&self) -> WorkQueue<T> {
WorkQueue {
queue: self.queue.clone()
}
}
}

View File

@ -195,7 +195,8 @@ impl FailWithCause for &'static str {
pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
use option::Option;
use rt::{context, OldTaskContext, TaskContext};
use rt::local_services::{unsafe_borrow_local_services, Unwinder};
use rt::task::{Task, Unwinder};
use rt::local::Local;
let context = context();
match context {
@ -224,8 +225,8 @@ pub fn begin_unwind_(msg: *c_char, file: *c_char, line: size_t) -> ! {
gc::cleanup_stack_for_failure();
let local_services = unsafe_borrow_local_services();
let unwinder: &mut Option<Unwinder> = &mut (*local_services).unwinder;
let task = Local::unsafe_borrow::<Task>();
let unwinder: &mut Option<Unwinder> = &mut (*task).unwinder;
match *unwinder {
Some(ref mut unwinder) => unwinder.begin_unwind(),
None => abort!("failure without unwinder. aborting process")

View File

@ -18,7 +18,7 @@ use task::rt;
use local_data::LocalDataKey;
use super::rt::rust_task;
use rt::local_services::LocalStorage;
use rt::task::{Task, LocalStorage};
pub enum Handle {
OldHandle(*rust_task),
@ -28,15 +28,15 @@ pub enum Handle {
impl Handle {
pub fn new() -> Handle {
use rt::{context, OldTaskContext};
use rt::local_services::unsafe_borrow_local_services;
use rt::local::Local;
unsafe {
match context() {
OldTaskContext => {
OldHandle(rt::rust_get_task())
}
_ => {
let local_services = unsafe_borrow_local_services();
NewHandle(&mut (*local_services).storage)
let task = Local::unsafe_borrow::<Task>();
NewHandle(&mut (*task).storage)
}
}
}

View File

@ -504,7 +504,8 @@ pub fn failing() -> bool {
//! True if the running task has failed
use rt::{context, OldTaskContext};
use rt::local_services::borrow_local_services;
use rt::local::Local;
use rt::task::Task;
match context() {
OldTaskContext => {
@ -514,7 +515,7 @@ pub fn failing() -> bool {
}
_ => {
let mut unwinding = false;
do borrow_local_services |local| {
do Local::borrow::<Task> |local| {
unwinding = match local.unwinder {
Some(unwinder) => {
unwinder.unwinding

View File

@ -90,6 +90,7 @@ use task::unkillable;
use uint;
use util;
use unstable::sync::{Exclusive, exclusive};
use rt::local::Local;
#[cfg(test)] use task::default_task_opts;
@ -575,7 +576,7 @@ pub fn spawn_raw(opts: TaskOpts, f: ~fn()) {
fn spawn_raw_newsched(_opts: TaskOpts, f: ~fn()) {
use rt::sched::*;
let mut sched = local_sched::take();
let mut sched = Local::take::<Scheduler>();
let task = ~Coroutine::new(&mut sched.stack_pool, f);
sched.schedule_new_task(task);
}

View File

@ -17,7 +17,8 @@ use managed::raw::BoxRepr;
use str;
use sys;
use rt::{context, OldTaskContext};
use rt::local_services::borrow_local_services;
use rt::task::Task;
use rt::local::Local;
use option::{Option, Some, None};
use io;
use rt::global_heap;
@ -243,8 +244,8 @@ pub unsafe fn local_malloc(td: *c_char, size: uintptr_t) -> *c_char {
}
_ => {
let mut alloc = ::ptr::null();
do borrow_local_services |srv| {
alloc = srv.heap.alloc(td as *c_void, size as uint) as *c_char;
do Local::borrow::<Task> |task| {
alloc = task.heap.alloc(td as *c_void, size as uint) as *c_char;
}
return alloc;
}
@ -261,8 +262,8 @@ pub unsafe fn local_free(ptr: *c_char) {
rustrt::rust_upcall_free_noswitch(ptr);
}
_ => {
do borrow_local_services |srv| {
srv.heap.free(ptr as *c_void);
do Local::borrow::<Task> |task| {
task.heap.free(ptr as *c_void);
}
}
}

View File

@ -840,9 +840,9 @@ rust_get_rt_tls_key() {
return &rt_key;
}
// Initialize the global state required by the new scheduler
// Initialize the TLS key used by the new scheduler
extern "C" CDECL void
rust_initialize_global_state() {
rust_initialize_rt_tls_key() {
static lock_and_signal init_lock;
static bool initialized = false;

View File

@ -222,7 +222,7 @@ rust_uv_ip4_addrp
rust_uv_ip6_addrp
rust_uv_free_ip4_addr
rust_uv_free_ip6_addr
rust_initialize_global_state
rust_initialize_rt_tls_key
rust_dbg_next_port
rust_new_memory_region
rust_delete_memory_region