New channel-based task status notifications.

This commit is contained in:
Eric Holk 2011-08-16 16:39:47 -07:00
parent 07225e2169
commit 8686645aad
6 changed files with 128 additions and 18 deletions

View File

@ -1,4 +1,9 @@
import cast = unsafe::reinterpret_cast;
import comm;
import comm::_chan;
import option::some;
import option::none;
import option = option::t;
native "rust" mod rustrt {
fn task_sleep(time_in_us: uint);
@ -10,7 +15,6 @@ native "rust" mod rustrt {
fn get_task_id() -> task_id;
type rust_chan;
type rust_task;
fn set_min_stack(stack_size: uint);
@ -26,7 +30,13 @@ native "rust" mod rustrt {
fn leak<@T>(thing : -T);
}
type task_id = int;
type rust_task = {
mutable notify_enabled : u8,
mutable notify_chan : _chan[task_notification]
};
type task = int;
type task_id = task;
fn get_task_id() -> task_id {
rustrt::get_task_id()
@ -43,12 +53,13 @@ fn yield() { ret rustrt::task_yield(); }
tag task_result { tr_success; tr_failure; }
// FIXME: Re-enable this once the task type is removed from the compiler.
/*
tag task_notification {
exit(task, task_result);
}
fn join(t: task) -> task_result {
join_id(cast(t))
}
*/
fn join_id(t : task_id) -> task_result {
alt rustrt::task_join(t) { 0 { tr_success } _ { tr_failure } }
@ -64,8 +75,22 @@ fn set_min_stack(stack_size : uint) {
rustrt::set_min_stack(stack_size);
}
fn _spawn(thunk : fn() -> ()) -> task {
spawn(thunk)
}
fn spawn(thunk : fn() -> ()) -> task {
spawn_inner(thunk, none)
}
fn spawn_notify(thunk : fn() -> (), notify : _chan[task_notification])
-> task {
spawn_inner(thunk, some(notify))
}
// FIXME: make this a fn~ once those are supported.
fn _spawn(thunk : fn() -> ()) -> task_id {
fn spawn_inner(thunk : fn() -> (), notify : option[_chan[task_notification]])
-> task_id {
let id = rustrt::new_task();
// the order of arguments are outptr, taskptr, envptr.
@ -75,12 +100,21 @@ fn _spawn(thunk : fn() -> ()) -> task_id {
let regs = rustrt::get_task_context(id);
// set up the task pointer
let task_ptr : u32 = cast(rustrt::get_task_pointer(id));
(*regs).edx = task_ptr;
let task_ptr = rustrt::get_task_pointer(id);
(*regs).edx = cast(task_ptr);
let raw_thunk : { code: u32, env: u32 } = cast(thunk);
(*regs).eip = raw_thunk.code;
// set up notifications if they are enabled.
alt notify {
some(c) {
(*task_ptr).notify_enabled = 1u8;
(*task_ptr).notify_chan = c;
}
none {}
};
// okay, now we align the stack and add the environment pointer and a fake
// return address.
@ -95,7 +129,7 @@ fn _spawn(thunk : fn() -> ()) -> task_id {
// put the return pointer in ecx.
(*regs).ecx = (*regs).esp + 8u32;
*tptr = task_ptr;
*tptr = cast(task_ptr);
*env = raw_thunk.env;
*ra = rustrt::get_task_trampoline();

View File

@ -25,12 +25,6 @@ public:
rust_chan *clone(rust_task *target);
};
// Corresponds to the rust chan (currently _chan) type.
struct chan_handle {
rust_task_id task;
rust_port_id port;
};
//
// Local Variables:
// mode: C++

View File

@ -56,7 +56,6 @@ struct rust_task;
class rust_log;
class rust_port;
class rust_chan;
struct chan_handle;
struct rust_token;
class rust_kernel;
class rust_crate_cache;
@ -68,6 +67,12 @@ struct frame_glue_fns;
typedef intptr_t rust_task_id;
typedef intptr_t rust_port_id;
// Corresponds to the rust chan (currently _chan) type.
struct chan_handle {
rust_task_id task;
rust_port_id port;
};
#ifndef __i386__
#error "Target CPU not supported."
#endif

View File

@ -80,6 +80,10 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state,
LOGPTR(sched, "new task", (uintptr_t)this);
DLOG(sched, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this);
assert((void*)this == (void*)&user);
user.notify_enabled = 0;
stk = new_stk(sched, this, 0);
rust_sp = stk->limit;
}
@ -89,6 +93,19 @@ rust_task::~rust_task()
DLOG(sched, task, "~rust_task %s @0x%" PRIxPTR ", refcnt=%d",
name, (uintptr_t)this, ref_count);
if(user.notify_enabled) {
rust_chan *target =
get_chan_by_handle(&user.notify_chan);
if(target) {
task_notification msg;
msg.id = id;
msg.result = failed ? tr_failure : tr_success;
target->send(&msg);
target->deref();
}
}
kernel->release_task_id(id);
/* FIXME: tighten this up, there are some more
@ -400,8 +417,11 @@ rust_task::free(void *p, bool is_gc)
void
rust_task::transition(rust_task_list *src, rust_task_list *dst) {
I(sched, !sched->lock.lock_held_by_current_thread());
scoped_lock with(sched->lock);
bool unlock = false;
if(!sched->lock.lock_held_by_current_thread()) {
unlock = true;
sched->lock.lock();
}
DLOG(sched, task,
"task %s " PTR " state change '%s' -> '%s' while in '%s'",
name, (uintptr_t)this, src->name, dst->name, state->name);
@ -409,6 +429,8 @@ rust_task::transition(rust_task_list *src, rust_task_list *dst) {
src->remove(this);
dst->append(this);
state = dst;
if(unlock)
sched->lock.unlock();
}
void

View File

@ -34,9 +34,32 @@ struct gc_alloc {
}
};
// portions of the task structure that are accessible from the standard
// library. This struct must agree with the std::task::rust_task record.
struct rust_task_user {
uint8_t notify_enabled;
chan_handle notify_chan;
};
// std::lib::task::task_result
enum task_result {
tr_success = 0,
tr_failure = 1
};
// std::lib::task::task_notification
//
// since it's currently a unary tag, we only add the fields.
struct task_notification {
rust_task_id id;
task_result result; // task_result
};
struct
rust_task : public kernel_owned<rust_task>, rust_cond
{
rust_task_user user;
RUST_ATOMIC_REFCOUNT();
// Fields known to the compiler.

View File

@ -1,5 +1,6 @@
use std;
import std::task;
import std::comm;
#[test]
fn test_sleep() { task::sleep(1000000u); }
@ -11,6 +12,7 @@ fn test_unsupervise() {
}
#[test]
#[ignore]
fn test_join() {
fn winner() { }
@ -36,3 +38,33 @@ fn test_lib_spawn2() {
fn foo(x : int) { assert(x == 42); }
task::_spawn(bind foo(42));
}
#[test]
fn test_join_chan() {
fn winner() { }
let p = comm::mk_port[task::task_notification]();
task::spawn_notify(bind winner(), p.mk_chan());
let s = p.recv();
log_err "received task status message";
log_err s;
alt s {
task::exit(_, task::tr_success.) { /* yay! */ }
_ { fail "invalid task status received" }
}
}
#[test]
fn test_join_chan_fail() {
fn failer() { task::unsupervise(); fail }
let p = comm::mk_port[task::task_notification]();
task::spawn_notify(bind failer(), p.mk_chan());
let s = p.recv();
log_err "received task status message";
log_err s;
alt s {
task::exit(_, task::tr_failure.) { /* yay! */ }
_ { fail "invalid task status received" }
}
}