From 8686645aad315467c97f457e8330696d88a4f9a0 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Tue, 16 Aug 2011 16:39:47 -0700 Subject: [PATCH] New channel-based task status notifications. --- src/lib/task.rs | 52 +++++++++++++++++++++++++++++++++------- src/rt/rust_chan.h | 6 ----- src/rt/rust_internal.h | 7 +++++- src/rt/rust_task.cpp | 26 ++++++++++++++++++-- src/rt/rust_task.h | 23 ++++++++++++++++++ src/test/stdtest/task.rs | 32 +++++++++++++++++++++++++ 6 files changed, 128 insertions(+), 18 deletions(-) diff --git a/src/lib/task.rs b/src/lib/task.rs index d50004b33ce..834e09db267 100644 --- a/src/lib/task.rs +++ b/src/lib/task.rs @@ -1,4 +1,9 @@ import cast = unsafe::reinterpret_cast; +import comm; +import comm::_chan; +import option::some; +import option::none; +import option = option::t; native "rust" mod rustrt { fn task_sleep(time_in_us: uint); @@ -10,7 +15,6 @@ native "rust" mod rustrt { fn get_task_id() -> task_id; type rust_chan; - type rust_task; fn set_min_stack(stack_size: uint); @@ -26,7 +30,13 @@ native "rust" mod rustrt { fn leak<@T>(thing : -T); } -type task_id = int; +type rust_task = { + mutable notify_enabled : u8, + mutable notify_chan : _chan[task_notification] +}; + +type task = int; +type task_id = task; fn get_task_id() -> task_id { rustrt::get_task_id() @@ -43,12 +53,13 @@ fn yield() { ret rustrt::task_yield(); } tag task_result { tr_success; tr_failure; } -// FIXME: Re-enable this once the task type is removed from the compiler. -/* +tag task_notification { + exit(task, task_result); +} + fn join(t: task) -> task_result { join_id(cast(t)) } -*/ fn join_id(t : task_id) -> task_result { alt rustrt::task_join(t) { 0 { tr_success } _ { tr_failure } } @@ -64,8 +75,22 @@ fn set_min_stack(stack_size : uint) { rustrt::set_min_stack(stack_size); } +fn _spawn(thunk : fn() -> ()) -> task { + spawn(thunk) +} + +fn spawn(thunk : fn() -> ()) -> task { + spawn_inner(thunk, none) +} + +fn spawn_notify(thunk : fn() -> (), notify : _chan[task_notification]) + -> task { + spawn_inner(thunk, some(notify)) +} + // FIXME: make this a fn~ once those are supported. -fn _spawn(thunk : fn() -> ()) -> task_id { +fn spawn_inner(thunk : fn() -> (), notify : option[_chan[task_notification]]) + -> task_id { let id = rustrt::new_task(); // the order of arguments are outptr, taskptr, envptr. @@ -75,12 +100,21 @@ fn _spawn(thunk : fn() -> ()) -> task_id { let regs = rustrt::get_task_context(id); // set up the task pointer - let task_ptr : u32 = cast(rustrt::get_task_pointer(id)); - (*regs).edx = task_ptr; + let task_ptr = rustrt::get_task_pointer(id); + (*regs).edx = cast(task_ptr); let raw_thunk : { code: u32, env: u32 } = cast(thunk); (*regs).eip = raw_thunk.code; + // set up notifications if they are enabled. + alt notify { + some(c) { + (*task_ptr).notify_enabled = 1u8; + (*task_ptr).notify_chan = c; + } + none {} + }; + // okay, now we align the stack and add the environment pointer and a fake // return address. @@ -95,7 +129,7 @@ fn _spawn(thunk : fn() -> ()) -> task_id { // put the return pointer in ecx. (*regs).ecx = (*regs).esp + 8u32; - *tptr = task_ptr; + *tptr = cast(task_ptr); *env = raw_thunk.env; *ra = rustrt::get_task_trampoline(); diff --git a/src/rt/rust_chan.h b/src/rt/rust_chan.h index 99ace5c5294..9dbd9337a18 100644 --- a/src/rt/rust_chan.h +++ b/src/rt/rust_chan.h @@ -25,12 +25,6 @@ public: rust_chan *clone(rust_task *target); }; -// Corresponds to the rust chan (currently _chan) type. -struct chan_handle { - rust_task_id task; - rust_port_id port; -}; - // // Local Variables: // mode: C++ diff --git a/src/rt/rust_internal.h b/src/rt/rust_internal.h index 65805deefb8..aa9f387a9f9 100644 --- a/src/rt/rust_internal.h +++ b/src/rt/rust_internal.h @@ -56,7 +56,6 @@ struct rust_task; class rust_log; class rust_port; class rust_chan; -struct chan_handle; struct rust_token; class rust_kernel; class rust_crate_cache; @@ -68,6 +67,12 @@ struct frame_glue_fns; typedef intptr_t rust_task_id; typedef intptr_t rust_port_id; +// Corresponds to the rust chan (currently _chan) type. +struct chan_handle { + rust_task_id task; + rust_port_id port; +}; + #ifndef __i386__ #error "Target CPU not supported." #endif diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 8ef83488513..c877d5e9941 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -80,6 +80,10 @@ rust_task::rust_task(rust_scheduler *sched, rust_task_list *state, LOGPTR(sched, "new task", (uintptr_t)this); DLOG(sched, task, "sizeof(task) = %d (0x%x)", sizeof *this, sizeof *this); + assert((void*)this == (void*)&user); + + user.notify_enabled = 0; + stk = new_stk(sched, this, 0); rust_sp = stk->limit; } @@ -89,6 +93,19 @@ rust_task::~rust_task() DLOG(sched, task, "~rust_task %s @0x%" PRIxPTR ", refcnt=%d", name, (uintptr_t)this, ref_count); + if(user.notify_enabled) { + rust_chan *target = + get_chan_by_handle(&user.notify_chan); + if(target) { + task_notification msg; + msg.id = id; + msg.result = failed ? tr_failure : tr_success; + + target->send(&msg); + target->deref(); + } + } + kernel->release_task_id(id); /* FIXME: tighten this up, there are some more @@ -400,8 +417,11 @@ rust_task::free(void *p, bool is_gc) void rust_task::transition(rust_task_list *src, rust_task_list *dst) { - I(sched, !sched->lock.lock_held_by_current_thread()); - scoped_lock with(sched->lock); + bool unlock = false; + if(!sched->lock.lock_held_by_current_thread()) { + unlock = true; + sched->lock.lock(); + } DLOG(sched, task, "task %s " PTR " state change '%s' -> '%s' while in '%s'", name, (uintptr_t)this, src->name, dst->name, state->name); @@ -409,6 +429,8 @@ rust_task::transition(rust_task_list *src, rust_task_list *dst) { src->remove(this); dst->append(this); state = dst; + if(unlock) + sched->lock.unlock(); } void diff --git a/src/rt/rust_task.h b/src/rt/rust_task.h index aafc0e1abb7..a688c405e60 100644 --- a/src/rt/rust_task.h +++ b/src/rt/rust_task.h @@ -34,9 +34,32 @@ struct gc_alloc { } }; +// portions of the task structure that are accessible from the standard +// library. This struct must agree with the std::task::rust_task record. +struct rust_task_user { + uint8_t notify_enabled; + chan_handle notify_chan; +}; + +// std::lib::task::task_result +enum task_result { + tr_success = 0, + tr_failure = 1 +}; + +// std::lib::task::task_notification +// +// since it's currently a unary tag, we only add the fields. +struct task_notification { + rust_task_id id; + task_result result; // task_result +}; + struct rust_task : public kernel_owned, rust_cond { + rust_task_user user; + RUST_ATOMIC_REFCOUNT(); // Fields known to the compiler. diff --git a/src/test/stdtest/task.rs b/src/test/stdtest/task.rs index 56b555232a9..b97271bf4be 100644 --- a/src/test/stdtest/task.rs +++ b/src/test/stdtest/task.rs @@ -1,5 +1,6 @@ use std; import std::task; +import std::comm; #[test] fn test_sleep() { task::sleep(1000000u); } @@ -11,6 +12,7 @@ fn test_unsupervise() { } #[test] +#[ignore] fn test_join() { fn winner() { } @@ -36,3 +38,33 @@ fn test_lib_spawn2() { fn foo(x : int) { assert(x == 42); } task::_spawn(bind foo(42)); } + +#[test] +fn test_join_chan() { + fn winner() { } + + let p = comm::mk_port[task::task_notification](); + task::spawn_notify(bind winner(), p.mk_chan()); + let s = p.recv(); + log_err "received task status message"; + log_err s; + alt s { + task::exit(_, task::tr_success.) { /* yay! */ } + _ { fail "invalid task status received" } + } +} + +#[test] +fn test_join_chan_fail() { + fn failer() { task::unsupervise(); fail } + + let p = comm::mk_port[task::task_notification](); + task::spawn_notify(bind failer(), p.mk_chan()); + let s = p.recv(); + log_err "received task status message"; + log_err s; + alt s { + task::exit(_, task::tr_failure.) { /* yay! */ } + _ { fail "invalid task status received" } + } +}