Remove linked failure from the runtime

The reasons for doing this are:

* The model on which linked failure is based is inherently complex
* The implementation is also very complex, and there are few remaining who
  fully understand the implementation
* There are existing race conditions in the core context switching function of
  the scheduler, and possibly others.
* It's unclear whether this model of linked failure maps well to a 1:1 threading
  model

Linked failure is often a desired aspect of tasks, but we would like to take a
much more conservative approach in re-implementing linked failure if at all.

Closes #8674
Closes #8318
Closes #8863
This commit is contained in:
Alex Crichton 2013-11-21 16:55:40 -08:00
parent 85a1eff3a9
commit acca9e3834
39 changed files with 400 additions and 2528 deletions

View File

@ -2350,9 +2350,9 @@ Indices are zero-based, and may be of any integral type. Vector access
is bounds-checked at run-time. When the check fails, it will put the
task in a _failing state_.
~~~~
~~~~ {.xfail-test}
# use std::task;
# do task::spawn_unlinked {
# do task::spawn {
([1, 2, 3, 4])[0];
(["a", "b"])[10]; // fails

View File

@ -402,22 +402,6 @@ freeing memory along the way---and then exits. Unlike exceptions in C++,
exceptions in Rust are unrecoverable within a single task: once a task fails,
there is no way to "catch" the exception.
All tasks are, by default, _linked_ to each other. That means that the fates
of all tasks are intertwined: if one fails, so do all the others.
~~~{.xfail-test .linked-failure}
# use std::task::spawn;
# use std::task;
# fn do_some_work() { loop { task::yield() } }
# do task::try {
// Create a child task that fails
do spawn { fail!() }
// This will also fail because the task we spawned failed
do_some_work();
# };
~~~
While it isn't possible for a task to recover from failure, tasks may notify
each other of failure. The simplest way of handling task failure is with the
`try` function, which is similar to `spawn`, but immediately blocks waiting
@ -464,101 +448,7 @@ it trips, indicates an unrecoverable logic error); in other cases you
might want to contain the failure at a certain boundary (perhaps a
small piece of input from the outside world, which you happen to be
processing in parallel, is malformed and its processing task can't
proceed). Hence, you will need different _linked failure modes_.
## Failure modes
By default, task failure is _bidirectionally linked_, which means that if
either task fails, it kills the other one.
~~~{.xfail-test .linked-failure}
# use std::task;
# use std::comm::oneshot;
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
# do task::try {
do spawn {
do spawn {
fail!(); // All three tasks will fail.
}
sleep_forever(); // Will get woken up by force, then fail
}
sleep_forever(); // Will get woken up by force, then fail
# };
~~~
If you want parent tasks to be able to kill their children, but do not want a
parent to fail automatically if one of its child task fails, you can call
`task::spawn_supervised` for _unidirectionally linked_ failure. The
function `task::try`, which we saw previously, uses `spawn_supervised`
internally, with additional logic to wait for the child task to finish
before returning. Hence:
~~~{.xfail-test .linked-failure}
# use std::comm::{stream, Chan, Port};
# use std::comm::oneshot;
# use std::task::{spawn, try};
# use std::task;
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
# do task::try {
let (receiver, sender): (Port<int>, Chan<int>) = stream();
do spawn { // Bidirectionally linked
// Wait for the supervised child task to exist.
let message = receiver.recv();
// Kill both it and the parent task.
assert!(message != 42);
}
do try { // Unidirectionally linked
sender.send(42);
sleep_forever(); // Will get woken up by force
}
// Flow never reaches here -- parent task was killed too.
# };
~~~
Supervised failure is useful in any situation where one task manages
multiple fallible child tasks, and the parent task can recover
if any child fails. On the other hand, if the _parent_ (supervisor) fails,
then there is nothing the children can do to recover, so they should
also fail.
Supervised task failure propagates across multiple generations even if
an intermediate generation has already exited:
~~~{.xfail-test .linked-failure}
# use std::task;
# use std::comm::oneshot;
# fn sleep_forever() { loop { let (p, c) = oneshot::<()>(); p.recv(); } }
# fn wait_for_a_while() { for _ in range(0, 1000u) { task::yield() } }
# do task::try::<int> {
do task::spawn_supervised {
do task::spawn_supervised {
sleep_forever(); // Will get woken up by force, then fail
}
// Intermediate task immediately exits
}
wait_for_a_while();
fail!(); // Will kill grandchild even if child has already exited
# };
~~~
Finally, tasks can be configured to not propagate failure to each
other at all, using `task::spawn_unlinked` for _isolated failure_.
~~~{.xfail-test .linked-failure}
# use std::task;
# fn random() -> uint { 100 }
# fn sleep_for(i: uint) { for _ in range(0, i) { task::yield() } }
# do task::try::<()> {
let (time1, time2) = (random(), random());
do task::spawn_unlinked {
sleep_for(time2); // Won't get forced awake
fail!();
}
sleep_for(time1); // Won't get forced awake
fail!();
// It will take MAX(time1,time2) for the program to finish.
# };
~~~
proceed).
## Creating a task with a bi-directional communication path

View File

@ -655,7 +655,7 @@ mod tests {
let arc2 = ~arc.clone();
let (p, c) = comm::stream();
do task::spawn_unlinked || {
do spawn {
let _ = p.recv();
do arc2.access_cond |one, cond| {
cond.signal();

View File

@ -137,7 +137,6 @@ pub fn rendezvous<T: Send>() -> (SyncPort<T>, SyncChan<T>) {
mod test {
use comm::{DuplexStream, rendezvous};
use std::rt::test::run_in_uv_task;
use std::task::spawn_unlinked;
#[test]
@ -177,7 +176,7 @@ mod test {
#[test]
fn send_and_fail_and_try_recv() {
let (port, chan) = rendezvous();
do spawn_unlinked {
do spawn {
chan.duplex_stream.send(()); // Can't access this field outside this module
fail!()
}
@ -187,7 +186,7 @@ mod test {
#[test]
fn try_send_and_recv_then_fail_before_ack() {
let (port, chan) = rendezvous();
do spawn_unlinked {
do spawn {
port.duplex_stream.recv();
fail!()
}
@ -198,7 +197,7 @@ mod test {
#[should_fail]
fn send_and_recv_then_fail_before_ack() {
let (port, chan) = rendezvous();
do spawn_unlinked {
do spawn {
port.duplex_stream.recv();
fail!()
}

View File

@ -27,7 +27,6 @@
use std::cell::Cell;
use std::comm::{PortOne, oneshot};
use std::task;
use std::util::replace;
/// A type encapsulating the result of a computation which may not be complete
@ -130,29 +129,12 @@ impl<A:Send> Future<A> {
let (port, chan) = oneshot();
do task::spawn_with(chan) |chan| {
do spawn {
chan.send(blk());
}
Future::from_port(port)
}
pub fn spawn_with<B: Send>(v: B, blk: proc(B) -> A) -> Future<A> {
/*!
* Create a future from a unique closure taking one argument.
*
* The closure and its argument will be moved into a new task. The
* closure will be run and its result used as the value of the future.
*/
let (port, chan) = oneshot();
do task::spawn_with((v, chan)) |(v, chan)| {
chan.send(blk(v));
}
Future::from_port(port)
}
}
#[cfg(test)]
@ -207,12 +189,6 @@ mod test {
assert_eq!(f.get(), ~"bale");
}
#[test]
fn test_spawn_with() {
let mut f = Future::spawn_with(~"gale", |s| { s });
assert_eq!(f.get(), ~"gale");
}
#[test]
#[should_fail]
fn test_futurefail() {

View File

@ -22,7 +22,6 @@ use std::borrow;
use std::comm;
use std::comm::SendDeferred;
use std::comm::{GenericPort, Peekable};
use std::task;
use std::unstable::sync::{Exclusive, UnsafeArc};
use std::unstable::atomics;
use std::unstable::finally::Finally;
@ -134,13 +133,11 @@ impl<Q:Send> Sem<Q> {
}
pub fn access<U>(&self, blk: || -> U) -> U {
do task::unkillable {
do (|| {
self.acquire();
do task::rekillable { blk() }
}).finally {
self.release();
}
do (|| {
self.acquire();
blk()
}).finally {
self.release();
}
}
}
@ -206,48 +203,41 @@ impl<'self> Condvar<'self> {
pub fn wait_on(&self, condvar_id: uint) {
let mut WaitEnd = None;
let mut out_of_bounds = None;
do task::unkillable {
// Release lock, 'atomically' enqueuing ourselves in so doing.
unsafe {
do (**self.sem).with |state| {
if condvar_id < state.blocked.len() {
// Drop the lock.
state.count += 1;
if state.count <= 0 {
state.waiters.signal();
}
// Create waiter nobe, and enqueue ourself to
// be woken up by a signaller.
WaitEnd = Some(state.blocked[condvar_id].wait_end());
} else {
out_of_bounds = Some(state.blocked.len());
// Release lock, 'atomically' enqueuing ourselves in so doing.
unsafe {
do (**self.sem).with |state| {
if condvar_id < state.blocked.len() {
// Drop the lock.
state.count += 1;
if state.count <= 0 {
state.waiters.signal();
}
// Create waiter nobe, and enqueue ourself to
// be woken up by a signaller.
WaitEnd = Some(state.blocked[condvar_id].wait_end());
} else {
out_of_bounds = Some(state.blocked.len());
}
}
}
// If deschedule checks start getting inserted anywhere, we can be
// killed before or after enqueueing. Deciding whether to
// unkillably reacquire the lock needs to happen atomically
// wrt enqueuing.
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
// Unconditionally "block". (Might not actually block if a
// signaller already sent -- I mean 'unconditionally' in contrast
// with acquire().)
do (|| {
do task::rekillable {
let _ = WaitEnd.take_unwrap().recv();
}
}).finally {
// Reacquire the condvar. Note this is back in the unkillable
// section; it needs to succeed, instead of itself dying.
match self.order {
Just(lock) => do lock.access {
self.sem.acquire();
},
Nothing => {
self.sem.acquire();
},
}
// If deschedule checks start getting inserted anywhere, we can be
// killed before or after enqueueing.
do check_cvar_bounds(out_of_bounds, condvar_id, "cond.wait_on()") {
// Unconditionally "block". (Might not actually block if a
// signaller already sent -- I mean 'unconditionally' in contrast
// with acquire().)
do (|| {
let _ = WaitEnd.take_unwrap().recv();
}).finally {
// Reacquire the condvar.
match self.order {
Just(lock) => do lock.access {
self.sem.acquire();
},
Nothing => {
self.sem.acquire();
},
}
}
}
@ -484,30 +474,28 @@ impl RWLock {
*/
pub fn read<U>(&self, blk: || -> U) -> U {
unsafe {
do task::unkillable {
do (&self.order_lock).access {
let state = &mut *self.state.get();
let old_count = state.read_count.fetch_add(1, atomics::Acquire);
if old_count == 0 {
(&self.access_lock).acquire();
state.read_mode = true;
}
do (&self.order_lock).access {
let state = &mut *self.state.get();
let old_count = state.read_count.fetch_add(1, atomics::Acquire);
if old_count == 0 {
(&self.access_lock).acquire();
state.read_mode = true;
}
do (|| {
do task::rekillable { blk() }
}).finally {
let state = &mut *self.state.get();
assert!(state.read_mode);
let old_count = state.read_count.fetch_sub(1, atomics::Release);
assert!(old_count > 0);
if old_count == 1 {
state.read_mode = false;
// Note: this release used to be outside of a locked access
// to exclusive-protected state. If this code is ever
// converted back to such (instead of using atomic ops),
// this access MUST NOT go inside the exclusive access.
(&self.access_lock).release();
}
}
do (|| {
blk()
}).finally {
let state = &mut *self.state.get();
assert!(state.read_mode);
let old_count = state.read_count.fetch_sub(1, atomics::Release);
assert!(old_count > 0);
if old_count == 1 {
state.read_mode = false;
// Note: this release used to be outside of a locked access
// to exclusive-protected state. If this code is ever
// converted back to such (instead of using atomic ops),
// this access MUST NOT go inside the exclusive access.
(&self.access_lock).release();
}
}
}
@ -518,14 +506,10 @@ impl RWLock {
* 'write' from other tasks will run concurrently with this one.
*/
pub fn write<U>(&self, blk: || -> U) -> U {
do task::unkillable {
(&self.order_lock).acquire();
do (&self.access_lock).access {
(&self.order_lock).release();
do task::rekillable {
blk()
}
}
(&self.order_lock).acquire();
do (&self.access_lock).access {
(&self.order_lock).release();
blk()
}
}
@ -562,16 +546,12 @@ impl RWLock {
// which can't happen until T2 finishes the downgrade-read entirely.
// The astute reader will also note that making waking writers use the
// order_lock is better for not starving readers.
do task::unkillable {
(&self.order_lock).acquire();
do (&self.access_lock).access_cond |cond| {
(&self.order_lock).release();
do task::rekillable {
let opt_lock = Just(&self.order_lock);
blk(&Condvar { sem: cond.sem, order: opt_lock,
token: NonCopyable })
}
}
(&self.order_lock).acquire();
do (&self.access_lock).access_cond |cond| {
(&self.order_lock).release();
let opt_lock = Just(&self.order_lock);
blk(&Condvar { sem: cond.sem, order: opt_lock,
token: NonCopyable })
}
}
@ -599,39 +579,35 @@ impl RWLock {
pub fn write_downgrade<U>(&self, blk: |v: RWLockWriteMode| -> U) -> U {
// Implementation slightly different from the slicker 'write's above.
// The exit path is conditional on whether the caller downgrades.
do task::unkillable {
(&self.order_lock).acquire();
(&self.access_lock).acquire();
(&self.order_lock).release();
do (|| {
do task::rekillable {
blk(RWLockWriteMode { lock: self, token: NonCopyable })
}
}).finally {
let writer_or_last_reader;
// Check if we're releasing from read mode or from write mode.
let state = unsafe { &mut *self.state.get() };
if state.read_mode {
// Releasing from read mode.
let old_count = state.read_count.fetch_sub(1, atomics::Release);
assert!(old_count > 0);
// Check if other readers remain.
if old_count == 1 {
// Case 1: Writer downgraded & was the last reader
writer_or_last_reader = true;
state.read_mode = false;
} else {
// Case 2: Writer downgraded & was not the last reader
writer_or_last_reader = false;
}
} else {
// Case 3: Writer did not downgrade
(&self.order_lock).acquire();
(&self.access_lock).acquire();
(&self.order_lock).release();
do (|| {
blk(RWLockWriteMode { lock: self, token: NonCopyable })
}).finally {
let writer_or_last_reader;
// Check if we're releasing from read mode or from write mode.
let state = unsafe { &mut *self.state.get() };
if state.read_mode {
// Releasing from read mode.
let old_count = state.read_count.fetch_sub(1, atomics::Release);
assert!(old_count > 0);
// Check if other readers remain.
if old_count == 1 {
// Case 1: Writer downgraded & was the last reader
writer_or_last_reader = true;
state.read_mode = false;
} else {
// Case 2: Writer downgraded & was not the last reader
writer_or_last_reader = false;
}
if writer_or_last_reader {
// Nobody left inside; release the "reader cloud" lock.
(&self.access_lock).release();
}
} else {
// Case 3: Writer did not downgrade
writer_or_last_reader = true;
}
if writer_or_last_reader {
// Nobody left inside; release the "reader cloud" lock.
(&self.access_lock).release();
}
}
}
@ -643,23 +619,21 @@ impl RWLock {
fail!("Can't downgrade() with a different rwlock's write_mode!");
}
unsafe {
do task::unkillable {
let state = &mut *self.state.get();
assert!(!state.read_mode);
state.read_mode = true;
// If a reader attempts to enter at this point, both the
// downgrader and reader will set the mode flag. This is fine.
let old_count = state.read_count.fetch_add(1, atomics::Release);
// If another reader was already blocking, we need to hand-off
// the "reader cloud" access lock to them.
if old_count != 0 {
// Guaranteed not to let another writer in, because
// another reader was holding the order_lock. Hence they
// must be the one to get the access_lock (because all
// access_locks are acquired with order_lock held). See
// the comment in write_cond for more justification.
(&self.access_lock).release();
}
let state = &mut *self.state.get();
assert!(!state.read_mode);
state.read_mode = true;
// If a reader attempts to enter at this point, both the
// downgrader and reader will set the mode flag. This is fine.
let old_count = state.read_count.fetch_add(1, atomics::Release);
// If another reader was already blocking, we need to hand-off
// the "reader cloud" access lock to them.
if old_count != 0 {
// Guaranteed not to let another writer in, because
// another reader was holding the order_lock. Hence they
// must be the one to get the access_lock (because all
// access_locks are acquired with order_lock held). See
// the comment in write_cond for more justification.
(&self.access_lock).release();
}
}
RWLockReadMode { lock: token.lock, token: NonCopyable }

View File

@ -867,7 +867,6 @@ pub fn run_test(force_ignore: bool,
let testfn_cell = ::std::cell::Cell::new(testfn);
do task::spawn {
let mut task = task::task();
task.unlinked();
task.name(match desc.name {
DynTestName(ref name) => SendStrOwned(name.clone()),
StaticTestName(name) => SendStrStatic(name),

View File

@ -954,10 +954,10 @@ mod tests {
use std::f64;
use std::result::{Err, Ok};
use std::libc;
#[cfg(windows)]
fn set_time_zone() {
use std::libc;
// Windows crt doesn't see any environment variable set by
// `SetEnvironmentVariable`, which `os::setenv` internally uses.
// It is why we use `putenv` here.

View File

@ -338,7 +338,6 @@ pub fn monitor(f: proc(@diagnostic::Emitter)) {
let ch_capture = ch.clone();
let mut task_builder = task::task();
task_builder.name("rustc");
task_builder.supervised();
// XXX: Hacks on hacks. If the env is trying to override the stack size
// then *don't* set it explicitly.

View File

@ -708,10 +708,11 @@ impl Context {
let prog_chan = prog_chan.clone();
let mut task = task::task();
task.unlinked(); // we kill things manually
task.name(format!("worker{}", i));
task.spawn_with(cache.clone(),
|cache| worker(cache, &port, &chan, &prog_chan));
let cache = cache.clone();
do task.spawn {
worker(cache, &port, &chan, &prog_chan);
}
fn worker(cache: RWArc<Cache>,
port: &SharedPort<Work>,

View File

@ -20,7 +20,6 @@ use std::io;
use std::rt::local::Local;
use std::rt::rtio;
use std::rt::sched::{Scheduler, SchedHandle};
use std::task;
use std::vec;
use super::{Loop, UvError, uv_error_to_io_error, wait_until_woken_after};
@ -298,26 +297,24 @@ impl Drop for FsRequest {
fn execute(f: &fn(*uvll::uv_fs_t, uvll::uv_fs_cb) -> c_int)
-> Result<FsRequest, UvError>
{
return do task::unkillable {
let mut req = FsRequest {
fired: false,
req: unsafe { uvll::malloc_req(uvll::UV_FS) }
};
match f(req.req, fs_cb) {
0 => {
req.fired = true;
let mut slot = None;
do wait_until_woken_after(&mut slot) {
unsafe { uvll::set_data_for_req(req.req, &slot) }
}
match req.get_result() {
n if n < 0 => Err(UvError(n)),
_ => Ok(req),
}
let mut req = FsRequest {
fired: false,
req: unsafe { uvll::malloc_req(uvll::UV_FS) }
};
return match f(req.req, fs_cb) {
0 => {
req.fired = true;
let mut slot = None;
do wait_until_woken_after(&mut slot) {
unsafe { uvll::set_data_for_req(req.req, &slot) }
}
match req.get_result() {
n if n < 0 => Err(UvError(n)),
_ => Ok(req),
}
n => Err(UvError(n))
}
n => Err(UvError(n))
};
extern fn fs_cb(req: *uvll::uv_fs_t) {

View File

@ -20,7 +20,6 @@ use std::rt::rtio;
use std::rt::sched::{Scheduler, SchedHandle};
use std::rt::tube::Tube;
use std::str;
use std::task;
use std::vec;
use stream::StreamWatcher;
@ -176,36 +175,34 @@ impl TcpWatcher {
{
struct Ctx { status: c_int, task: Option<BlockedTask> }
return do task::unkillable {
let tcp = TcpWatcher::new(loop_);
let ret = do socket_addr_as_sockaddr(address) |addr| {
let mut req = Request::new(uvll::UV_CONNECT);
let result = unsafe {
uvll::uv_tcp_connect(req.handle, tcp.handle, addr,
connect_cb)
};
match result {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { status: 0, task: None };
do wait_until_woken_after(&mut cx.task) {
req.set_data(&cx);
}
match cx.status {
0 => Ok(()),
n => Err(UvError(n)),
}
}
n => Err(UvError(n))
}
let tcp = TcpWatcher::new(loop_);
let ret = do socket_addr_as_sockaddr(address) |addr| {
let mut req = Request::new(uvll::UV_CONNECT);
let result = unsafe {
uvll::uv_tcp_connect(req.handle, tcp.handle, addr,
connect_cb)
};
match ret {
Ok(()) => Ok(tcp),
Err(e) => Err(e),
match result {
0 => {
req.defuse(); // uv callback now owns this request
let mut cx = Ctx { status: 0, task: None };
do wait_until_woken_after(&mut cx.task) {
req.set_data(&cx);
}
match cx.status {
0 => Ok(()),
n => Err(UvError(n)),
}
}
n => Err(UvError(n))
}
};
return match ret {
Ok(()) => Ok(tcp),
Err(e) => Err(e),
};
extern fn connect_cb(req: *uvll::uv_connect_t, status: c_int) {
let req = Request::wrap(req);
assert!(status != uvll::ECANCELED);
@ -291,25 +288,23 @@ impl TcpListener {
pub fn bind(loop_: &mut Loop, address: SocketAddr)
-> Result<~TcpListener, UvError>
{
do task::unkillable {
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
uvll::uv_tcp_init(loop_.handle, handle)
}, 0);
let l = ~TcpListener {
home: get_handle_to_current_scheduler!(),
handle: handle,
closing_task: None,
outgoing: Tube::new(),
};
let res = socket_addr_as_sockaddr(address, |addr| unsafe {
uvll::uv_tcp_bind(l.handle, addr)
});
match res {
0 => Ok(l.install()),
n => Err(UvError(n))
}
}
let handle = unsafe { uvll::malloc_handle(uvll::UV_TCP) };
assert_eq!(unsafe {
uvll::uv_tcp_init(loop_.handle, handle)
}, 0);
let l = ~TcpListener {
home: get_handle_to_current_scheduler!(),
handle: handle,
closing_task: None,
outgoing: Tube::new(),
};
let res = socket_addr_as_sockaddr(address, |addr| unsafe {
uvll::uv_tcp_bind(l.handle, addr)
});
return match res {
0 => Ok(l.install()),
n => Err(UvError(n))
};
}
}
@ -426,22 +421,20 @@ impl UdpWatcher {
pub fn bind(loop_: &Loop, address: SocketAddr)
-> Result<UdpWatcher, UvError>
{
do task::unkillable {
let udp = UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: get_handle_to_current_scheduler!(),
};
assert_eq!(unsafe {
uvll::uv_udp_init(loop_.handle, udp.handle)
}, 0);
let result = socket_addr_as_sockaddr(address, |addr| unsafe {
uvll::uv_udp_bind(udp.handle, addr, 0u32)
});
match result {
0 => Ok(udp),
n => Err(UvError(n)),
}
}
let udp = UdpWatcher {
handle: unsafe { uvll::malloc_handle(uvll::UV_UDP) },
home: get_handle_to_current_scheduler!(),
};
assert_eq!(unsafe {
uvll::uv_udp_init(loop_.handle, udp.handle)
}, 0);
let result = socket_addr_as_sockaddr(address, |addr| unsafe {
uvll::uv_udp_bind(udp.handle, addr, 0u32)
});
return match result {
0 => Ok(udp),
n => Err(UvError(n)),
};
}
}
@ -1123,16 +1116,14 @@ mod test {
assert!(maybe_socket.is_ok());
// block self on sched1
do task::unkillable { // FIXME(#8674)
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
// unblock task
do task.wake().map |task| {
// send self to sched2
tasksFriendHandle.take().send(TaskFromFriend(task));
};
// sched1 should now sleep since it has nothing else to do
}
let scheduler: ~Scheduler = Local::take();
do scheduler.deschedule_running_task_and_then |_, task| {
// unblock task
do task.wake().map |task| {
// send self to sched2
tasksFriendHandle.take().send(TaskFromFriend(task));
};
// sched1 should now sleep since it has nothing else to do
}
// sched2 will wake up and get the task as we do nothing else,
// the function ends and the socket goes out of scope sched2
@ -1180,7 +1171,7 @@ mod test {
let chan = Cell::new(chan);
let addr = next_test_ip4();
do task::spawn_unlinked { // please no linked failure
do spawn {
let w = TcpListener::bind(local_loop(), addr).unwrap();
let mut w = w.listen().unwrap();
chan.take().send(());

View File

@ -16,7 +16,6 @@ use std::rt::local::Local;
use std::rt::rtio::{RtioPipe, RtioUnixListener, RtioUnixAcceptor};
use std::rt::sched::{Scheduler, SchedHandle};
use std::rt::tube::Tube;
use std::task;
use stream::StreamWatcher;
use super::{Loop, UvError, UvHandle, Request, uv_error_to_io_error,
@ -74,26 +73,23 @@ impl PipeWatcher {
pub fn connect(loop_: &Loop, name: &CString) -> Result<PipeWatcher, UvError>
{
struct Ctx { task: Option<BlockedTask>, result: libc::c_int, }
return do task::unkillable {
let mut cx = Ctx { task: None, result: 0 };
let mut req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(loop_, false);
let mut cx = Ctx { task: None, result: 0 };
let mut req = Request::new(uvll::UV_CONNECT);
let pipe = PipeWatcher::new(loop_, false);
do wait_until_woken_after(&mut cx.task) {
unsafe {
uvll::uv_pipe_connect(req.handle,
pipe.handle(),
name.with_ref(|p| p),
connect_cb)
}
req.set_data(&cx);
req.defuse(); // uv callback now owns this request
do wait_until_woken_after(&mut cx.task) {
unsafe {
uvll::uv_pipe_connect(req.handle,
pipe.handle(),
name.with_ref(|p| p),
connect_cb)
}
match cx.result {
0 => Ok(pipe),
n => Err(UvError(n))
}
req.set_data(&cx);
req.defuse(); // uv callback now owns this request
}
return match cx.result {
0 => Ok(pipe),
n => Err(UvError(n))
};
extern fn connect_cb(req: *uvll::uv_connect_t, status: libc::c_int) {;
@ -153,24 +149,22 @@ extern fn pipe_close_cb(handle: *uvll::uv_handle_t) {
impl PipeListener {
pub fn bind(loop_: &Loop, name: &CString) -> Result<~PipeListener, UvError> {
do task::unkillable {
let pipe = PipeWatcher::new(loop_, false);
match unsafe {
uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
} {
0 => {
// If successful, unwrap the PipeWatcher because we control how
// we close the pipe differently. We can't rely on
// StreamWatcher's default close method.
let p = ~PipeListener {
home: get_handle_to_current_scheduler!(),
pipe: pipe.unwrap(),
outgoing: Tube::new(),
};
Ok(p.install())
}
n => Err(UvError(n))
let pipe = PipeWatcher::new(loop_, false);
match unsafe {
uvll::uv_pipe_bind(pipe.handle(), name.with_ref(|p| p))
} {
0 => {
// If successful, unwrap the PipeWatcher because we control how
// we close the pipe differently. We can't rely on
// StreamWatcher's default close method.
let p = ~PipeListener {
home: get_handle_to_current_scheduler!(),
pipe: pipe.unwrap(),
outgoing: Tube::new(),
};
Ok(p.install())
}
n => Err(UvError(n))
}
}
}
@ -245,7 +239,6 @@ mod tests {
use std::comm::oneshot;
use std::rt::rtio::{RtioUnixListener, RtioUnixAcceptor, RtioPipe};
use std::rt::test::next_test_unix;
use std::task;
use super::*;
use super::super::local_loop;
@ -314,7 +307,7 @@ mod tests {
let (port, chan) = oneshot();
let chan = Cell::new(chan);
do task::spawn_unlinked { // plz no linked failure
do spawn {
let p = PipeListener::bind(local_loop(), &path2.to_c_str()).unwrap();
let mut p = p.listen().unwrap();
chan.take().send(());

View File

@ -44,11 +44,6 @@ pub trait HomingIO {
fn go_to_IO_home(&mut self) -> uint {
use std::rt::sched::RunOnce;
unsafe {
let task: *mut Task = Local::unsafe_borrow();
(*task).death.inhibit_kill((*task).unwinder.unwinding);
}
let _f = ForbidUnwind::new("going home");
let current_sched_id = do Local::borrow |sched: &mut Scheduler| {
@ -127,11 +122,6 @@ impl Drop for HomingMissile {
}
util::ignore(f);
unsafe {
let task: *mut Task = Local::unsafe_borrow();
(*task).death.allow_kill((*task).unwinder.unwinding);
}
}
}

View File

@ -565,7 +565,6 @@ mod tests {
($name:expr => $code:block) => (
{
let mut t = task::task();
t.supervised();
t.name($name);
let res = do t.try $code;
assert!(res.is_err());

View File

@ -1290,7 +1290,6 @@ mod tests {
($name:expr => $code:block) => (
{
let mut t = task::task();
t.supervised();
t.name($name);
let res = do t.try $code;
assert!(res.is_err());

View File

@ -162,7 +162,7 @@ mod test {
for _ in range(0, 20) {
let (p, c) = comm::stream();
chans.push(c);
do task::spawn_with(p) |p| {
do task::spawn {
// wait until all the tasks are ready to go.
p.recv();

View File

@ -155,176 +155,59 @@ use cell::Cell;
use option::{Option, Some, None};
use prelude::*;
use rt::task::Task;
use rt::task::{UnwindResult, Failure};
use task::spawn::Taskgroup;
use task::LinkedFailure;
use to_bytes::IterBytes;
use unstable::atomics::{AtomicUint, Relaxed};
use unstable::sync::{UnsafeArc, UnsafeArcSelf, UnsafeArcT, LittleLock};
use util;
static KILLED_MSG: &'static str = "killed by linked failure";
// State values for the 'killed' and 'unkillable' atomic flags below.
static KILL_RUNNING: uint = 0;
static KILL_KILLED: uint = 1;
static KILL_UNKILLABLE: uint = 2;
struct KillFlag(AtomicUint);
type KillFlagHandle = UnsafeArc<KillFlag>;
use rt::task::UnwindResult;
use unstable::atomics::{AtomicUint, SeqCst};
use unstable::sync::UnsafeArc;
/// A handle to a blocked task. Usually this means having the ~Task pointer by
/// ownership, but if the task is killable, a killer can steal it at any time.
pub enum BlockedTask {
Unkillable(~Task),
Killable(KillFlagHandle),
Owned(~Task),
Shared(UnsafeArc<AtomicUint>),
}
// FIXME(#7544)(bblum): think about the cache efficiency of this
struct KillHandleInner {
// Is the task running, blocked, or killed? Possible values:
// * KILL_RUNNING - Not unkillable, no kill pending.
// * KILL_KILLED - Kill pending.
// * <ptr> - A transmuted blocked ~Task pointer.
// This flag is refcounted because it may also be referenced by a blocking
// concurrency primitive, used to wake the task normally, whose reference
// may outlive the handle's if the task is killed.
killed: KillFlagHandle,
// Has the task deferred kill signals? This flag guards the above one.
// Possible values:
// * KILL_RUNNING - Not unkillable, no kill pending.
// * KILL_KILLED - Kill pending.
// * KILL_UNKILLABLE - Kill signals deferred.
unkillable: AtomicUint,
// Shared state between task and children for exit code propagation. These
// are here so we can re-use the kill handle to implement watched children
// tasks. Using a separate Arc-like would introduce extra atomic adds/subs
// into common spawn paths, so this is just for speed.
// Locklessly accessed; protected by the enclosing refcount's barriers.
any_child_failed: bool,
// A lazy list, consuming which may unwrap() many child tombstones.
child_tombstones: Option<proc() -> bool>,
// Protects multiple children simultaneously creating tombstones.
graveyard_lock: LittleLock,
}
/// State shared between tasks used for task killing during linked failure.
#[deriving(Clone)]
pub struct KillHandle(UnsafeArc<KillHandleInner>);
/// Per-task state related to task death, killing, failure, etc.
pub struct Death {
// Shared among this task, its watched children, and any linked tasks who
// might kill it. This is optional so we can take it by-value at exit time.
kill_handle: Option<KillHandle>,
// Handle to a watching parent, if we have one, for exit code propagation.
priv watching_parent: Option<KillHandle>,
// Action to be done with the exit code. If set, also makes the task wait
// until all its watched children exit before collecting the status.
on_exit: Option<proc(UnwindResult)>,
// nesting level counter for task::unkillable calls (0 == killable).
priv unkillable: int,
// nesting level counter for unstable::atomically calls (0 == can deschedule).
priv wont_sleep: int,
// A "spare" handle to the kill flag inside the kill handle. Used during
// blocking/waking as an optimization to avoid two xadds on the refcount.
priv spare_kill_flag: Option<KillFlagHandle>,
}
impl Drop for KillFlag {
// Letting a KillFlag with a task inside get dropped would leak the task.
// We could free it here, but the task should get awoken by hand somehow.
fn drop(&mut self) {
match self.load(Relaxed) {
KILL_RUNNING | KILL_KILLED => { },
_ => rtabort!("can't drop kill flag with a blocked task inside!"),
}
}
}
// Whenever a task blocks, it swaps out its spare kill flag to use as the
// blocked task handle. So unblocking a task must restore that spare.
unsafe fn revive_task_ptr(task_ptr: uint, spare_flag: Option<KillFlagHandle>) -> ~Task {
let mut task: ~Task = cast::transmute(task_ptr);
if task.death.spare_kill_flag.is_none() {
task.death.spare_kill_flag = spare_flag;
} else {
// A task's spare kill flag is not used for blocking in one case:
// when an unkillable task blocks on select. In this case, a separate
// one was created, which we now discard.
rtassert!(task.death.unkillable > 0);
}
task
}
impl BlockedTask {
/// Returns Some if the task was successfully woken; None if already killed.
pub fn wake(self) -> Option<~Task> {
match self {
Unkillable(task) => Some(task),
Killable(flag_arc) => {
let flag = unsafe { &mut **flag_arc.get() };
match flag.swap(KILL_RUNNING, Relaxed) {
KILL_RUNNING => None, // woken from select(), perhaps
KILL_KILLED => None, // a killer stole it already
task_ptr =>
Some(unsafe { revive_task_ptr(task_ptr, Some(flag_arc)) })
Owned(task) => Some(task),
Shared(arc) => unsafe {
match (*arc.get()).swap(0, SeqCst) {
0 => None,
n => cast::transmute(n),
}
}
}
}
/// Create a blocked task, unless the task was already killed.
pub fn try_block(mut task: ~Task) -> Either<~Task, BlockedTask> {
// NB: As an optimization, we could give a free pass to being unkillable
// to tasks whose taskgroups haven't been initialized yet, but that
// introduces complications with select() and with the test cases below,
// and it's not clear the uncommon performance boost is worth it.
if task.death.unkillable > 0 {
Right(Unkillable(task))
} else {
rtassert!(task.death.kill_handle.is_some());
unsafe {
// The inverse of 'revive', above, occurs here.
// The spare kill flag will usually be Some, unless the task was
// already killed, in which case the killer will have deferred
// creating a new one until whenever it blocks during unwinding.
let flag_arc = match task.death.spare_kill_flag.take() {
Some(spare_flag) => spare_flag,
None => {
// A task that kills us won't have a spare kill flag to
// give back to us, so we restore it ourselves here. This
// situation should only arise when we're already failing.
rtassert!(task.unwinder.unwinding);
(*task.death.kill_handle.get_ref().get()).killed.clone()
}
};
let flag = &mut **flag_arc.get();
let task_ptr = cast::transmute(task);
// Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
match flag.compare_and_swap(KILL_RUNNING, task_ptr, Relaxed) {
KILL_RUNNING => Right(Killable(flag_arc)),
KILL_KILLED => Left(revive_task_ptr(task_ptr, Some(flag_arc))),
x => rtabort!("can't block task! kill flag = {}", x),
}
}
}
pub fn block(task: ~Task) -> BlockedTask {
Owned(task)
}
/// Converts one blocked task handle to a list of many handles to the same.
pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] {
let handles = match self {
Unkillable(task) => {
let flag = unsafe { KillFlag(AtomicUint::new(cast::transmute(task))) };
Owned(task) => {
let flag = unsafe {
AtomicUint::new(cast::transmute(task))
};
UnsafeArc::newN(flag, num_handles)
}
Killable(flag_arc) => flag_arc.cloneN(num_handles),
Shared(arc) => arc.cloneN(num_handles),
};
// Even if the task was unkillable before, we use 'Killable' because
// multiple pipes will have handles. It does not really mean killable.
handles.move_iter().map(|x| Killable(x)).collect()
handles.move_iter().map(|x| Shared(x)).collect()
}
// This assertion has two flavours because the wake involves an atomic op.
@ -337,16 +220,14 @@ impl BlockedTask {
/// Convert to an unsafe uint value. Useful for storing in a pipe's state flag.
#[inline]
pub unsafe fn cast_to_uint(self) -> uint {
// Use the low bit to distinguish the enum variants, to save a second
// allocation in the indestructible case.
match self {
Unkillable(task) => {
Owned(task) => {
let blocked_task_ptr: uint = cast::transmute(task);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr
},
Killable(flag_arc) => {
let blocked_task_ptr: uint = cast::transmute(~flag_arc);
}
Shared(arc) => {
let blocked_task_ptr: uint = cast::transmute(~arc);
rtassert!(blocked_task_ptr & 0x1 == 0);
blocked_task_ptr | 0x1
}
@ -357,318 +238,29 @@ impl BlockedTask {
#[inline]
pub unsafe fn cast_from_uint(blocked_task_ptr: uint) -> BlockedTask {
if blocked_task_ptr & 0x1 == 0 {
Unkillable(cast::transmute(blocked_task_ptr))
Owned(cast::transmute(blocked_task_ptr))
} else {
let ptr: ~KillFlagHandle = cast::transmute(blocked_task_ptr & !0x1);
match ptr {
~flag_arc => Killable(flag_arc)
}
}
}
}
// So that KillHandle can be hashed in the taskgroup bookkeeping code.
impl IterBytes for KillHandle {
fn iter_bytes(&self, lsb0: bool, f: |buf: &[u8]| -> bool) -> bool {
self.data.iter_bytes(lsb0, f)
}
}
impl Eq for KillHandle {
#[inline] fn eq(&self, other: &KillHandle) -> bool { self.data.eq(&other.data) }
#[inline] fn ne(&self, other: &KillHandle) -> bool { self.data.ne(&other.data) }
}
impl KillHandle {
pub fn new() -> (KillHandle, KillFlagHandle) {
let (flag, flag_clone) =
UnsafeArc::new2(KillFlag(AtomicUint::new(KILL_RUNNING)));
let handle = KillHandle(UnsafeArc::new(KillHandleInner {
// Linked failure fields
killed: flag,
unkillable: AtomicUint::new(KILL_RUNNING),
// Exit code propagation fields
any_child_failed: false,
child_tombstones: None,
graveyard_lock: LittleLock::new(),
}));
(handle, flag_clone)
}
// Will begin unwinding if a kill signal was received, unless already_failing.
// This can't be used recursively, because a task which sees a KILLED
// signal must fail immediately, which an already-unkillable task can't do.
#[inline]
pub fn inhibit_kill(&mut self, already_failing: bool) {
let inner = unsafe { &mut *self.get() };
// Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, Relaxed) {
KILL_RUNNING => { }, // normal case
KILL_KILLED => if !already_failing { fail!("{}", KILLED_MSG) },
_ => rtabort!("inhibit_kill: task already unkillable"),
}
}
// Will begin unwinding if a kill signal was received, unless already_failing.
#[inline]
pub fn allow_kill(&mut self, already_failing: bool) {
let inner = unsafe { &mut *self.get() };
// Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED.
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, Relaxed) {
KILL_UNKILLABLE => { }, // normal case
KILL_KILLED => if !already_failing { fail!("{}", KILLED_MSG) },
_ => rtabort!("allow_kill: task already killable"),
}
}
// Send a kill signal to the handle's owning task. Returns the task itself
// if it was blocked and needs punted awake. To be called by other tasks.
pub fn kill(&mut self) -> Option<~Task> {
let inner = unsafe { &mut *self.get() };
if inner.unkillable.swap(KILL_KILLED, Relaxed) == KILL_RUNNING {
// Got in. Allowed to try to punt the task awake.
let flag = unsafe { &mut *inner.killed.get() };
match flag.swap(KILL_KILLED, Relaxed) {
// Task either not blocked or already taken care of.
KILL_RUNNING | KILL_KILLED => None,
// Got ownership of the blocked task.
// While the usual 'wake' path can just pass back the flag
// handle, we (the slower kill path) haven't an extra one lying
// around. The task will wake up without a spare.
task_ptr => Some(unsafe { revive_task_ptr(task_ptr, None) }),
}
} else {
// Otherwise it was either unkillable or already killed. Somebody
// else was here first who will deal with the kill signal.
None
}
}
#[inline]
pub fn killed(&self) -> bool {
// Called every context switch, so shouldn't report true if the task
// is unkillable with a kill signal pending.
let inner = unsafe { &*self.get() };
let flag = unsafe { &*inner.killed.get() };
// A barrier-related concern here is that a task that gets killed
// awake needs to see the killer's write of KILLED to this flag. This
// is analogous to receiving a pipe payload; the appropriate barrier
// should happen when enqueueing the task.
flag.load(Relaxed) == KILL_KILLED
}
pub fn notify_immediate_failure(&mut self) {
// A benign data race may happen here if there are failing sibling
// tasks that were also spawned-watched. The refcount's write barriers
// in UnsafeArc ensure that this write will be seen by the
// unwrapper/destructor, whichever task may unwrap it.
unsafe { (*self.get()).any_child_failed = true; }
}
// For use when a task does not need to collect its children's exit
// statuses, but the task has a parent which might want them.
pub fn reparent_children_to(self, parent: &mut KillHandle) {
// Optimistic path: If another child of the parent's already failed,
// we don't need to worry about any of this.
if unsafe { (*parent.get()).any_child_failed } {
return;
}
// Try to see if all our children are gone already.
match self.try_unwrap() {
// Couldn't unwrap; children still alive. Reparent entire handle as
// our own tombstone, to be unwrapped later.
UnsafeArcSelf(this) => {
let this = Cell::new(this); // :(
do add_lazy_tombstone(parent) |other_tombstones| {
let this = Cell::new(this.take()); // :(
let others = Cell::new(other_tombstones); // :(
|| {
// Prefer to check tombstones that were there first,
// being "more fair" at the expense of tail-recursion.
others.take().map_default(true, |f| f()) && {
let mut inner = this.take().unwrap();
(!inner.any_child_failed) &&
inner.child_tombstones.take().map_default(true, |f| f())
}
}
}
}
// Whether or not all children exited, one or more already failed.
UnsafeArcT(KillHandleInner { any_child_failed: true, _ }) => {
parent.notify_immediate_failure();
}
// All children exited, but some left behind tombstones that we
// don't want to wait on now. Give them to our parent.
UnsafeArcT(KillHandleInner { any_child_failed: false,
child_tombstones: Some(f), _ }) => {
let f = Cell::new(f); // :(
do add_lazy_tombstone(parent) |other_tombstones| {
let f = Cell::new(f.take()); // :(
let others = Cell::new(other_tombstones); // :(
|| {
// Prefer fairness to tail-recursion, as in above case.
others.take().map_default(true, |f| f()) &&
f.take()()
}
}
}
// All children exited, none failed. Nothing to do!
UnsafeArcT(KillHandleInner { any_child_failed: false,
child_tombstones: None, _ }) => { }
}
// NB: Takes a pthread mutex -- 'blk' not allowed to reschedule.
#[inline]
fn add_lazy_tombstone(parent: &mut KillHandle,
blk: |Option<proc() -> bool>| -> proc() -> bool)
{
let inner: &mut KillHandleInner = unsafe { &mut *parent.get() };
unsafe {
do inner.graveyard_lock.lock {
// Update the current "head node" of the lazy list.
inner.child_tombstones =
Some(blk(util::replace(&mut inner.child_tombstones, None)));
}
}
let ptr: ~UnsafeArc<AtomicUint> = cast::transmute(blocked_task_ptr & !1);
Shared(*ptr)
}
}
}
impl Death {
pub fn new() -> Death {
let (handle, spare) = KillHandle::new();
Death {
kill_handle: Some(handle),
watching_parent: None,
on_exit: None,
unkillable: 0,
wont_sleep: 0,
spare_kill_flag: Some(spare),
}
}
pub fn new_child(&self) -> Death {
// FIXME(#7327)
let (handle, spare) = KillHandle::new();
Death {
kill_handle: Some(handle),
watching_parent: self.kill_handle.clone(),
on_exit: None,
unkillable: 0,
wont_sleep: 0,
spare_kill_flag: Some(spare),
}
}
/// Collect failure exit codes from children and propagate them to a parent.
pub fn collect_failure(&mut self, result: UnwindResult, group: Option<Taskgroup>) {
// This may run after the task has already failed, so even though the
// task appears to need to be killed, the scheduler should not fail us
// when we block to unwrap.
// (XXX: Another less-elegant reason for doing this is so that the use
// of the LittleLock in reparent_children_to doesn't need to access the
// unkillable flag in the kill_handle, since we'll have removed it.)
rtassert!(self.unkillable == 0);
self.unkillable = 1;
pub fn collect_failure(&mut self, result: UnwindResult) {
let result = Cell::new(result);
// NB. See corresponding comment at the callsite in task.rs.
// FIXME(#8192): Doesn't work with "let _ = ..."
{ use util; util::ignore(group); }
let mut success = result.is_success();
let mut result = Cell::new(result);
// Step 1. Decide if we need to collect child failures synchronously.
do self.on_exit.take().map |on_exit| {
if success {
// We succeeded, but our children might not. Need to wait for them.
let mut inner = self.kill_handle.take_unwrap().unwrap();
if inner.any_child_failed {
success = false;
} else {
// Lockless access to tombstones protected by unwrap barrier.
success = inner.child_tombstones.take().map_default(true, |f| f());
}
if !success {
result = Cell::new(Failure(~LinkedFailure as ~Any));
}
}
on_exit(result.take());
};
// Step 2. Possibly alert possibly-watching parent to failure status.
// Note that as soon as parent_handle goes out of scope, the parent
// can successfully unwrap its handle and collect our reported status.
do self.watching_parent.take().map |mut parent_handle| {
if success {
// Our handle might be None if we had an exit callback, and
// already unwrapped it. But 'success' being true means no
// child failed, so there's nothing to do (see below case).
do self.kill_handle.take().map |own_handle| {
own_handle.reparent_children_to(&mut parent_handle);
};
} else {
// Can inform watching parent immediately that we failed.
// (Note the importance of non-failing tasks NOT writing
// 'false', which could obscure another task's failure.)
parent_handle.notify_immediate_failure();
}
};
// Can't use allow_kill directly; that would require the kill handle.
rtassert!(self.unkillable == 1);
self.unkillable = 0;
}
/// Fails if a kill signal was received.
#[inline]
pub fn check_killed(&self, already_failing: bool) {
match self.kill_handle {
Some(ref kill_handle) =>
// The task may be both unkillable and killed if it does some
// synchronization during unwinding or cleanup (for example,
// sending on a notify port). In that case failing won't help.
if self.unkillable == 0 && (!already_failing) && kill_handle.killed() {
fail!("{}", KILLED_MSG);
},
// This may happen during task death (see comments in collect_failure).
None => rtassert!(self.unkillable > 0),
}
}
/// Enter a possibly-nested unkillable section of code.
/// All calls must be paired with a subsequent call to allow_kill.
#[inline]
pub fn inhibit_kill(&mut self, already_failing: bool) {
self.unkillable += 1;
// May fail, hence must happen *after* incrementing the counter
if self.unkillable == 1 {
rtassert!(self.kill_handle.is_some());
self.kill_handle.get_mut_ref().inhibit_kill(already_failing);
}
}
/// Exit a possibly-nested unkillable section of code.
/// All calls must be paired with a preceding call to inhibit_kill.
#[inline]
pub fn allow_kill(&mut self, already_failing: bool) {
if self.unkillable == 0 {
// we need to decrement the counter before failing.
self.unkillable -= 1;
fail!("Cannot enter a rekillable() block without a surrounding unkillable()");
}
self.unkillable -= 1;
if self.unkillable == 0 {
rtassert!(self.kill_handle.is_some());
self.kill_handle.get_mut_ref().allow_kill(already_failing);
}
}
/// Enter a possibly-nested "atomic" section of code. Just for assertions.
@ -699,296 +291,21 @@ impl Death {
impl Drop for Death {
fn drop(&mut self) {
// Mustn't be in an atomic or unkillable section at task death.
rtassert!(self.unkillable == 0);
rtassert!(self.wont_sleep == 0);
}
}
#[cfg(test)]
mod test {
#[allow(unused_mut)];
use cell::Cell;
use rt::test::*;
use super::*;
use util;
// Test cases don't care about the spare killed flag.
fn make_kill_handle() -> KillHandle { let (h,_) = KillHandle::new(); h }
#[ignore(reason = "linked failure")]
#[test]
fn no_tombstone_success() {
do run_in_newsched_task {
// Tests case 4 of the 4-way match in reparent_children.
let mut parent = make_kill_handle();
let mut child = make_kill_handle();
// Without another handle to child, the try unwrap should succeed.
child.reparent_children_to(&mut parent);
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.is_none());
assert!(parent_inner.any_child_failed == false);
}
}
#[test]
fn no_tombstone_failure() {
do run_in_newsched_task {
// Tests case 2 of the 4-way match in reparent_children.
let mut parent = make_kill_handle();
let mut child = make_kill_handle();
child.notify_immediate_failure();
// Without another handle to child, the try unwrap should succeed.
child.reparent_children_to(&mut parent);
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.is_none());
// Immediate failure should have been propagated.
assert!(parent_inner.any_child_failed);
}
}
#[test]
fn no_tombstone_because_sibling_already_failed() {
do run_in_newsched_task {
// Tests "case 0, the optimistic path in reparent_children.
let mut parent = make_kill_handle();
let mut child1 = make_kill_handle();
let mut child2 = make_kill_handle();
let mut link = child2.clone();
// Should set parent's child_failed flag
child1.notify_immediate_failure();
child1.reparent_children_to(&mut parent);
// Should bypass trying to unwrap child2 entirely.
// Otherwise, due to 'link', it would try to tombstone.
child2.reparent_children_to(&mut parent);
// Should successfully unwrap even though 'link' is still alive.
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.is_none());
// Immediate failure should have been propagated by first child.
assert!(parent_inner.any_child_failed);
util::ignore(link);
}
}
#[test]
fn one_tombstone_success() {
do run_in_newsched_task {
let mut parent = make_kill_handle();
let mut child = make_kill_handle();
let mut link = child.clone();
// Creates 1 tombstone. Existence of 'link' makes try-unwrap fail.
child.reparent_children_to(&mut parent);
// Let parent collect tombstones.
util::ignore(link);
// Must have created a tombstone
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.take_unwrap()());
assert!(parent_inner.any_child_failed == false);
}
}
#[test]
fn one_tombstone_failure() {
do run_in_newsched_task {
let mut parent = make_kill_handle();
let mut child = make_kill_handle();
let mut link = child.clone();
// Creates 1 tombstone. Existence of 'link' makes try-unwrap fail.
child.reparent_children_to(&mut parent);
// Must happen after tombstone to not be immediately propagated.
link.notify_immediate_failure();
// Let parent collect tombstones.
util::ignore(link);
// Must have created a tombstone
let mut parent_inner = parent.unwrap();
// Failure must be seen in the tombstone.
assert!(parent_inner.child_tombstones.take_unwrap()() == false);
assert!(parent_inner.any_child_failed == false);
}
}
#[test]
fn two_tombstones_success() {
do run_in_newsched_task {
let mut parent = make_kill_handle();
let mut middle = make_kill_handle();
let mut child = make_kill_handle();
let mut link = child.clone();
child.reparent_children_to(&mut middle); // case 1 tombstone
// 'middle' should try-unwrap okay, but still have to reparent.
middle.reparent_children_to(&mut parent); // case 3 tombston
// Let parent collect tombstones.
util::ignore(link);
// Must have created a tombstone
let mut parent_inner = parent.unwrap();
assert!(parent_inner.child_tombstones.take_unwrap()());
assert!(parent_inner.any_child_failed == false);
}
}
#[test]
fn two_tombstones_failure() {
do run_in_newsched_task {
let mut parent = make_kill_handle();
let mut middle = make_kill_handle();
let mut child = make_kill_handle();
let mut link = child.clone();
child.reparent_children_to(&mut middle); // case 1 tombstone
// Must happen after tombstone to not be immediately propagated.
link.notify_immediate_failure();
// 'middle' should try-unwrap okay, but still have to reparent.
middle.reparent_children_to(&mut parent); // case 3 tombstone
// Let parent collect tombstones.
util::ignore(link);
// Must have created a tombstone
let mut parent_inner = parent.unwrap();
// Failure must be seen in the tombstone.
assert!(parent_inner.child_tombstones.take_unwrap()() == false);
assert!(parent_inner.any_child_failed == false);
}
}
// Task killing tests
#[test]
fn kill_basic() {
do run_in_newsched_task {
let mut handle = make_kill_handle();
assert!(!handle.killed());
assert!(handle.kill().is_none());
assert!(handle.killed());
}
}
#[test]
fn double_kill() {
do run_in_newsched_task {
let mut handle = make_kill_handle();
assert!(!handle.killed());
assert!(handle.kill().is_none());
assert!(handle.killed());
assert!(handle.kill().is_none());
assert!(handle.killed());
}
}
#[test]
fn unkillable_after_kill() {
do run_in_newsched_task {
let mut handle = make_kill_handle();
assert!(handle.kill().is_none());
assert!(handle.killed());
let handle_cell = Cell::new(handle);
let result = do spawntask_try {
handle_cell.take().inhibit_kill(false);
};
assert!(result.is_err());
}
}
#[test]
fn unkillable_during_kill() {
do run_in_newsched_task {
let mut handle = make_kill_handle();
handle.inhibit_kill(false);
assert!(handle.kill().is_none());
assert!(!handle.killed());
let handle_cell = Cell::new(handle);
let result = do spawntask_try {
handle_cell.take().allow_kill(false);
};
assert!(result.is_err());
}
}
#[test]
fn unkillable_before_kill() {
do run_in_newsched_task {
let mut handle = make_kill_handle();
handle.inhibit_kill(false);
handle.allow_kill(false);
assert!(handle.kill().is_none());
assert!(handle.killed());
}
}
// Task blocking tests
#[test]
fn block_and_wake() {
do with_test_task |mut task| {
BlockedTask::try_block(task).unwrap_right().wake().unwrap()
}
}
#[ignore(reason = "linked failure")]
#[test]
fn block_and_get_killed() {
do with_test_task |mut task| {
let mut handle = task.death.kill_handle.get_ref().clone();
let result = BlockedTask::try_block(task).unwrap_right();
let task = handle.kill().unwrap();
assert!(result.wake().is_none());
task
}
}
#[ignore(reason = "linked failure")]
#[test]
fn block_already_killed() {
do with_test_task |mut task| {
let mut handle = task.death.kill_handle.get_ref().clone();
assert!(handle.kill().is_none());
BlockedTask::try_block(task).unwrap_left()
}
}
#[ignore(reason = "linked failure")]
#[test]
fn block_unkillably_and_get_killed() {
do with_test_task |mut task| {
let mut handle = task.death.kill_handle.get_ref().clone();
task.death.inhibit_kill(false);
let result = BlockedTask::try_block(task).unwrap_right();
assert!(handle.kill().is_none());
let mut task = result.wake().unwrap();
// This call wants to fail, but we can't have that happen since
// we're not running in a newsched task, so we can't even use
// spawntask_try. But the failing behaviour is already tested
// above, in unkillable_during_kill(), so we punt on it here.
task.death.allow_kill(true);
task
}
}
#[ignore(reason = "linked failure")]
#[test]
fn block_on_pipe() {
// Tests the "killable" path of casting to/from uint.
do run_in_newsched_task {
do with_test_task |mut task| {
let result = BlockedTask::try_block(task).unwrap_right();
let result = unsafe { result.cast_to_uint() };
let result = unsafe { BlockedTask::cast_from_uint(result) };
result.wake().unwrap()
}
}
}
#[ignore(reason = "linked failure")]
#[test]
fn block_unkillably_on_pipe() {
// Tests the "indestructible" path of casting to/from uint.
do run_in_newsched_task {
do with_test_task |mut task| {
task.death.inhibit_kill(false);
let result = BlockedTask::try_block(task).unwrap_right();
let result = unsafe { result.cast_to_uint() };
let result = unsafe { BlockedTask::cast_from_uint(result) };
let mut task = result.wake().unwrap();
task.death.allow_kill(false);
task
}
do with_test_task |task| {
BlockedTask::block(task).wake().unwrap()
}
}
}

View File

@ -88,7 +88,7 @@ pub use self::util::set_exit_status;
pub use self::util::default_sched_threads;
// Re-export of the functionality in the kill module
pub use self::kill::{KillHandle, BlockedTask};
pub use self::kill::BlockedTask;
// XXX: these probably shouldn't be public...
#[doc(hidden)]

View File

@ -8,7 +8,6 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use either::{Left, Right};
use option::{Option, Some, None};
use cast::{transmute, transmute_mut_region, transmute_mut_unsafe};
use clone::Clone;
@ -621,9 +620,6 @@ impl Scheduler {
unsafe {
let task: *mut Task = Local::unsafe_borrow();
(*task).sched.get_mut_ref().run_cleanup_job();
// Must happen after running the cleanup job (of course).
(*task).death.check_killed((*task).unwinder.unwinding);
}
}
@ -689,14 +685,9 @@ impl Scheduler {
pub fn switch_running_tasks_and_then(~self, next_task: ~Task,
f: |&mut Scheduler, BlockedTask|) {
// This is where we convert the BlockedTask-taking closure into one
// that takes just a Task, and is aware of the block-or-killed protocol.
// that takes just a Task
do self.change_task_context(next_task) |sched, task| {
// Task might need to receive a kill signal instead of blocking.
// We can call the "and_then" only if it blocks successfully.
match BlockedTask::try_block(task) {
Left(killed_task) => sched.enqueue_task(killed_task),
Right(blocked_task) => f(sched, blocked_task),
}
f(sched, BlockedTask::block(task))
}
}

View File

@ -36,8 +36,6 @@ use rt::logging::StdErrLogger;
use rt::sched::{Scheduler, SchedHandle};
use rt::stack::{StackSegment, StackPool};
use send_str::SendStr;
use task::LinkedFailure;
use task::spawn::Taskgroup;
use unstable::finally::Finally;
// The Task struct represents all state associated with a rust
@ -52,7 +50,6 @@ pub struct Task {
storage: LocalStorage,
logger: Option<StdErrLogger>,
unwinder: Unwinder,
taskgroup: Option<Taskgroup>,
death: Death,
destroyed: bool,
name: Option<SendStr>,
@ -188,7 +185,6 @@ impl Task {
storage: LocalStorage(None),
logger: None,
unwinder: Unwinder { unwinding: false, cause: None },
taskgroup: None,
death: Death::new(),
destroyed: false,
coroutine: Some(Coroutine::empty()),
@ -223,7 +219,6 @@ impl Task {
storage: LocalStorage(None),
logger: None,
unwinder: Unwinder { unwinding: false, cause: None },
taskgroup: None,
death: Death::new(),
destroyed: false,
name: None,
@ -246,9 +241,7 @@ impl Task {
storage: LocalStorage(None),
logger: None,
unwinder: Unwinder { unwinding: false, cause: None },
taskgroup: None,
// FIXME(#7544) make watching optional
death: self.death.new_child(),
death: Death::new(),
destroyed: false,
name: None,
coroutine: Some(Coroutine::new(stack_pool, stack_size, start)),
@ -333,11 +326,7 @@ impl Task {
// Cleanup the dynamic borrowck debugging info
borrowck::clear_task_borrow_list();
// NB. We pass the taskgroup into death so that it can be dropped while
// the unkillable counter is set. This is necessary for when the
// taskgroup destruction code drops references on KillHandles, which
// might require using unkillable (to synchronize with an unwrapper).
self.death.collect_failure(self.unwinder.to_unwind_result(), self.taskgroup.take());
self.death.collect_failure(self.unwinder.to_unwind_result());
self.destroyed = true;
}
@ -660,10 +649,7 @@ pub fn begin_unwind<M: Any + Send>(msg: M, file: &'static str, line: uint) -> !
Some(s) => *s,
None => match msg.as_ref::<~str>() {
Some(s) => s.as_slice(),
None => match msg.as_ref::<LinkedFailure>() {
Some(*) => "linked failure",
None => "~Any",
}
None => "~Any",
}
};
@ -785,16 +771,6 @@ mod test {
}
}
#[test]
fn linked_failure() {
do run_in_newsched_task() {
let res = do spawntask_try {
spawntask_random(|| fail!());
};
assert!(res.is_err());
}
}
#[test]
fn heap_cycles() {
use option::{Option, Some, None};

View File

@ -21,7 +21,6 @@ use io;
use libc::{pid_t, c_int};
use libc;
use prelude::*;
use task;
/**
* A value representing a child process.
@ -221,13 +220,7 @@ impl Process {
let ch = SharedChan::new(ch);
let ch_clone = ch.clone();
// FIXME(#910, #8674): right now I/O is incredibly brittle when it comes
// to linked failure, so these tasks must be spawn so they're not
// affected by linked failure. If these are removed, then the
// runtime may never exit because linked failure will cause some
// SchedHandle structures to not get destroyed, meaning that
// there's always an async watcher available.
do task::spawn_unlinked {
do spawn {
do io::ignore_io_error {
match error.take() {
Some(ref mut e) => ch.send((2, e.read_to_end())),
@ -235,7 +228,7 @@ impl Process {
}
}
}
do task::spawn_unlinked {
do spawn {
do io::ignore_io_error {
match output.take() {
Some(ref mut e) => ch_clone.send((1, e.read_to_end())),

View File

@ -21,7 +21,6 @@ use rt::local::Local;
use rt::rtio::EventLoop;
use rt::sched::Scheduler;
use rt::shouldnt_be_public::{SelectInner, SelectPortInner};
use task;
use unstable::finally::Finally;
use vec::{OwnedVector, MutableVector};
@ -79,11 +78,10 @@ pub fn select<A: Select>(ports: &mut [A]) -> uint {
do sched.event_loop.callback { c.take().send_deferred(()) }
}
}).finally {
let p = Cell::new(p.take());
// Unkillable is necessary not because getting killed is dangerous here,
// but to force the recv not to use the same kill-flag that we used for
// selecting. Otherwise a user-sender could spuriously wakeup us here.
do task::unkillable { p.take().recv(); }
p.take().recv();
}
// Task resumes. Now unblock ourselves from all the ports we blocked on.
@ -230,9 +228,9 @@ mod test {
}
#[test]
fn select_unkillable() {
fn select_simple() {
do run_in_uv_task {
do task::unkillable { select_helper(2, [1]) }
select_helper(2, [1])
}
}
@ -240,36 +238,27 @@ mod test {
#[test]
fn select_blocking() {
select_blocking_helper(true);
select_blocking_helper(false);
do run_in_uv_task {
let (p1,_c) = oneshot();
let (p2,c2) = oneshot();
let mut ports = [p1,p2];
fn select_blocking_helper(killable: bool) {
do run_in_uv_task {
let (p1,_c) = oneshot();
let (p2,c2) = oneshot();
let mut ports = [p1,p2];
let (p3,c3) = oneshot();
let (p4,c4) = oneshot();
let (p3,c3) = oneshot();
let (p4,c4) = oneshot();
let x = Cell::new((c2, p3, c4));
do task::spawn {
let (c2, p3, c4) = x.take();
p3.recv(); // handshake parent
c4.send(()); // normal receive
task::deschedule();
c2.send(()); // select receive
}
// Try to block before child sends on c2.
c3.send(());
p4.recv();
if killable {
assert!(select(ports) == 1);
} else {
do task::unkillable { assert!(select(ports) == 1); }
}
let x = Cell::new((c2, p3, c4));
do task::spawn {
let (c2, p3, c4) = x.take();
p3.recv(); // handshake parent
c4.send(()); // normal receive
task::deschedule();
c2.send(()); // select receive
}
// Try to block before child sends on c2.
c3.send(());
p4.recv();
assert!(select(ports) == 1);
}
}
@ -277,16 +266,12 @@ mod test {
fn select_racing_senders() {
static NUM_CHANS: uint = 10;
select_racing_senders_helper(true, ~[0,1,2,3,4,5,6,7,8,9]);
select_racing_senders_helper(false, ~[0,1,2,3,4,5,6,7,8,9]);
select_racing_senders_helper(true, ~[0,1,2]);
select_racing_senders_helper(false, ~[0,1,2]);
select_racing_senders_helper(true, ~[3,4,5,6]);
select_racing_senders_helper(false, ~[3,4,5,6]);
select_racing_senders_helper(true, ~[7,8,9]);
select_racing_senders_helper(false, ~[7,8,9]);
select_racing_senders_helper(~[0,1,2,3,4,5,6,7,8,9]);
select_racing_senders_helper(~[0,1,2]);
select_racing_senders_helper(~[3,4,5,6]);
select_racing_senders_helper(~[7,8,9]);
fn select_racing_senders_helper(killable: bool, send_on_chans: ~[uint]) {
fn select_racing_senders_helper(send_on_chans: ~[uint]) {
use rt::test::spawntask_random;
do run_in_uv_task {
@ -307,45 +292,10 @@ mod test {
}
}
// nondeterministic result, but should succeed
if killable {
select(ports);
} else {
do task::unkillable { select(ports); }
}
select(ports);
}
}
}
}
}
#[test]
fn select_killed() {
do run_in_uv_task {
let (success_p, success_c) = oneshot::<bool>();
let success_c = Cell::new(success_c);
do task::try {
let success_c = Cell::new(success_c.take());
do task::unkillable {
let (p,c) = oneshot();
let c = Cell::new(c);
do task::spawn {
let (dead_ps, dead_cs) = unzip(range(0u, 5).map(|_| oneshot::<()>()));
let mut ports = dead_ps;
select(ports); // should get killed; nothing should leak
c.take().send(()); // must not happen
// Make sure dead_cs doesn't get closed until after select.
let _ = dead_cs;
}
do task::spawn {
fail!(); // should kill sibling awake
}
// wait for killed selector to close (NOT send on) its c.
// hope to send 'true'.
success_c.take().send(p.try_recv().is_none());
}
};
assert!(success_p.recv());
}
}
}

View File

@ -62,16 +62,12 @@ use rt::in_green_task_context;
use rt::local::Local;
use rt::task::{UnwindResult, Success, Failure};
use send_str::{SendStr, IntoSendStr};
use unstable::finally::Finally;
use util;
#[cfg(test)] use any::Any;
#[cfg(test)] use cast;
#[cfg(test)] use comm::SharedChan;
#[cfg(test)] use comm;
#[cfg(test)] use ptr;
#[cfg(test)] use result;
#[cfg(test)] use task;
pub mod spawn;
@ -86,8 +82,6 @@ pub mod spawn;
/// children tasks complete, recommend using a result future.
pub type TaskResult = Result<(), ~Any>;
pub struct LinkedFailure;
pub struct TaskResultPort {
priv port: Port<UnwindResult>
}
@ -141,24 +135,11 @@ pub struct SchedOpts {
*
* # Fields
*
* * linked - Propagate failure bidirectionally between child and parent.
* True by default. If both this and 'supervised' are false, then
* either task's failure will not affect the other ("unlinked").
*
* * supervised - Propagate failure unidirectionally from parent to child,
* but not from child to parent. False by default.
*
* * watched - Make parent task collect exit status notifications from child
* before reporting its own exit status. (This delays the parent
* task's death and cleanup until after all transitively watched
* children also exit.) True by default.
*
* * indestructible - Configures the task to ignore kill signals received from
* linked failure. This may cause process hangs during
* failure if not used carefully, but causes task blocking
* code paths (e.g. port recv() calls) to be faster by 2
* atomic operations. False by default.
*
* * notify_chan - Enable lifecycle notifications on the given channel
*
* * name - A name for the task-to-be, for identification in failure messages.
@ -169,10 +150,7 @@ pub struct SchedOpts {
* scheduler other tasks will be impeded or even blocked indefinitely.
*/
pub struct TaskOpts {
priv linked: bool,
priv supervised: bool,
priv watched: bool,
priv indestructible: bool,
priv notify_chan: Option<Chan<UnwindResult>>,
name: Option<SendStr>,
sched: SchedOpts,
@ -191,13 +169,10 @@ pub struct TaskOpts {
// when you try to reuse the builder to spawn a new task. We'll just
// sidestep that whole issue by making builders uncopyable and making
// the run function move them in.
// FIXME (#3724): Replace the 'consumed' bit with move mode on self
pub struct TaskBuilder {
opts: TaskOpts,
priv gen_body: Option<proc(v: proc()) -> proc()>,
priv can_not_copy: Option<util::NonCopyable>,
priv consumed: bool,
}
/**
@ -210,25 +185,17 @@ pub fn task() -> TaskBuilder {
opts: default_task_opts(),
gen_body: None,
can_not_copy: None,
consumed: false,
}
}
impl TaskBuilder {
fn consume(&mut self) -> TaskBuilder {
if self.consumed {
fail!("Cannot copy a task_builder"); // Fake move mode on self
}
self.consumed = true;
fn consume(mut self) -> TaskBuilder {
let gen_body = self.gen_body.take();
let notify_chan = self.opts.notify_chan.take();
let name = self.opts.name.take();
TaskBuilder {
opts: TaskOpts {
linked: self.opts.linked,
supervised: self.opts.supervised,
watched: self.opts.watched,
indestructible: self.opts.indestructible,
notify_chan: notify_chan,
name: name,
sched: self.opts.sched,
@ -236,34 +203,9 @@ impl TaskBuilder {
},
gen_body: gen_body,
can_not_copy: None,
consumed: false
}
}
/// Decouple the child task's failure from the parent's. If either fails,
/// the other will not be killed.
pub fn unlinked(&mut self) {
self.opts.linked = false;
self.opts.watched = false;
}
/// Unidirectionally link the child task's failure with the parent's. The
/// child's failure will not kill the parent, but the parent's will kill
/// the child.
pub fn supervised(&mut self) {
self.opts.supervised = true;
self.opts.linked = false;
self.opts.watched = false;
}
/// Link the child task's and parent task's failures. If either fails, the
/// other will be killed.
pub fn linked(&mut self) {
self.opts.linked = true;
self.opts.supervised = false;
self.opts.watched = true;
}
/// Cause the parent task to collect the child's exit status (and that of
/// all transitively-watched grandchildren) before reporting its own.
pub fn watched(&mut self) {
@ -276,13 +218,6 @@ impl TaskBuilder {
self.opts.watched = false;
}
/// Cause the child task to ignore any kill signals received from linked
/// failure. This optimizes context switching, at the possible expense of
/// process hangs in the case of unexpected failure.
pub fn indestructible(&mut self) {
self.opts.indestructible = true;
}
/// Get a future representing the exit status of the task.
///
/// Taking the value of the future will block until the child task
@ -372,16 +307,13 @@ impl TaskBuilder {
* When spawning into a new scheduler, the number of threads requested
* must be greater than zero.
*/
pub fn spawn(&mut self, f: proc()) {
pub fn spawn(mut self, f: proc()) {
let gen_body = self.gen_body.take();
let notify_chan = self.opts.notify_chan.take();
let name = self.opts.name.take();
let x = self.consume();
let opts = TaskOpts {
linked: x.opts.linked,
supervised: x.opts.supervised,
watched: x.opts.watched,
indestructible: x.opts.indestructible,
notify_chan: notify_chan,
name: name,
sched: x.opts.sched,
@ -398,14 +330,6 @@ impl TaskBuilder {
spawn::spawn_raw(opts, f);
}
/// Runs a task, while transferring ownership of one argument to the child.
pub fn spawn_with<A:Send>(&mut self, arg: A, f: proc(v: A)) {
let arg = Cell::new(arg);
do self.spawn {
f(arg.take());
}
}
/**
* Execute a function in another task and return either the return value
* of the function or result::err.
@ -419,7 +343,7 @@ impl TaskBuilder {
* # Failure
* Fails if a future_result was already set for this task.
*/
pub fn try<T:Send>(&mut self, f: proc() -> T) -> Result<T, ~Any> {
pub fn try<T:Send>(mut self, f: proc() -> T) -> Result<T, ~Any> {
let (po, ch) = stream::<T>();
let result = self.future_result();
@ -447,10 +371,7 @@ pub fn default_task_opts() -> TaskOpts {
*/
TaskOpts {
linked: true,
supervised: false,
watched: true,
indestructible: false,
notify_chan: None,
name: None,
sched: SchedOpts {
@ -469,56 +390,10 @@ pub fn default_task_opts() -> TaskOpts {
///
/// This function is equivalent to `task().spawn(f)`.
pub fn spawn(f: proc()) {
let mut task = task();
let task = task();
task.spawn(f)
}
/// Creates a child task unlinked from the current one. If either this
/// task or the child task fails, the other will not be killed.
pub fn spawn_unlinked(f: proc()) {
let mut task = task();
task.unlinked();
task.spawn(f)
}
pub fn spawn_supervised(f: proc()) {
/*!
* Creates a child task supervised by the current one. If the child
* task fails, the parent will not be killed, but if the parent fails,
* the child will be killed.
*/
let mut task = task();
task.supervised();
task.spawn(f)
}
/// Creates a child task that cannot be killed by linked failure. This causes
/// its context-switch path to be faster by 2 atomic swap operations.
/// (Note that this convenience wrapper still uses linked-failure, so the
/// child's children will still be killable by the parent. For the fastest
/// possible spawn mode, use task::task().unlinked().indestructible().spawn.)
pub fn spawn_indestructible(f: proc()) {
let mut task = task();
task.indestructible();
task.spawn(f)
}
pub fn spawn_with<A:Send>(arg: A, f: proc(v: A)) {
/*!
* Runs a task, while transferring ownership of one argument to the
* child.
*
* This is useful for transferring ownership of noncopyables to
* another task.
*
* This function is equivalent to `task().spawn_with(arg, f)`.
*/
let mut task = task();
task.spawn_with(arg, f)
}
pub fn spawn_sched(mode: SchedMode, f: proc()) {
/*!
* Creates a new task on a new or existing scheduler.
@ -545,8 +420,7 @@ pub fn try<T:Send>(f: proc() -> T) -> Result<T, ~Any> {
* This is equivalent to task().supervised().try.
*/
let mut task = task();
task.supervised();
let task = task();
task.try(f)
}
@ -590,159 +464,6 @@ pub fn failing() -> bool {
}
}
/**
* Temporarily make the task unkillable
*
* # Example
*
* ```
* do task::unkillable {
* // detach / deschedule / destroy must all be called together
* rustrt::rust_port_detach(po);
* // This must not result in the current task being killed
* task::deschedule();
* rustrt::rust_port_destroy(po);
* }
* ```
*/
pub fn unkillable<U>(f: || -> U) -> U {
use rt::task::Task;
unsafe {
if in_green_task_context() {
// The inhibits/allows might fail and need to borrow the task.
let t: *mut Task = Local::unsafe_borrow();
do (|| {
(*t).death.inhibit_kill((*t).unwinder.unwinding);
f()
}).finally {
(*t).death.allow_kill((*t).unwinder.unwinding);
}
} else {
// FIXME(#3095): This should be an rtabort as soon as the scheduler
// no longer uses a workqueue implemented with an Exclusive.
f()
}
}
}
/**
* Makes killable a task marked as unkillable. This
* is meant to be used only nested in unkillable.
*
* # Example
*
* ```
* do task::unkillable {
* do task::rekillable {
* // Task is killable
* }
* // Task is unkillable again
* }
*/
pub fn rekillable<U>(f: || -> U) -> U {
use rt::task::Task;
unsafe {
if in_green_task_context() {
let t: *mut Task = Local::unsafe_borrow();
do (|| {
(*t).death.allow_kill((*t).unwinder.unwinding);
f()
}).finally {
(*t).death.inhibit_kill((*t).unwinder.unwinding);
}
} else {
// FIXME(#3095): As in unkillable().
f()
}
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_kill_unkillable_task() {
use rt::test::*;
// Attempt to test that when a kill signal is received at the start of an
// unkillable section, 'unkillable' unwinds correctly. This is actually
// quite a difficult race to expose, as the kill has to happen on a second
// CPU, *after* the spawner is already switched-back-to (and passes the
// killed check at the start of its timeslice). As far as I know, it's not
// possible to make this race deterministic, or even more likely to happen.
do run_in_uv_task {
do task::try {
do task::spawn {
fail!();
}
do task::unkillable { }
};
}
}
#[test]
#[ignore(cfg(windows))]
fn test_kill_rekillable_task() {
use rt::test::*;
// Tests that when a kill signal is received, 'rekillable' and
// 'unkillable' unwind correctly in conjunction with each other.
do run_in_uv_task {
do task::try {
do task::unkillable {
do task::rekillable {
do task::spawn {
fail!();
}
}
}
};
}
}
#[test]
#[should_fail]
#[ignore(cfg(windows))]
fn test_rekillable_not_nested() {
do rekillable {
// This should fail before
// receiving anything since
// this block should be nested
// into a unkillable block.
deschedule();
}
}
#[test]
#[ignore(cfg(windows))]
fn test_rekillable_nested_failure() {
let result = do task::try {
do unkillable {
do rekillable {
let (port,chan) = comm::stream();
do task::spawn { chan.send(()); fail!(); }
port.recv(); // wait for child to exist
port.recv(); // block forever, expect to get killed.
}
}
};
assert!(result.is_err());
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_cant_dup_task_builder() {
let mut builder = task();
builder.unlinked();
do builder.spawn {}
// FIXME(#3724): For now, this is a -runtime- failure, because we haven't
// got move mode on self. When 3724 is fixed, this test should fail to
// compile instead, and should go in tests/compile-fail.
do builder.spawn {} // b should have been consumed by the previous call
}
// The following 8 tests test the following 2^3 combinations:
// {un,}linked {un,}supervised failure propagation {up,down}wards.
@ -752,207 +473,6 @@ fn test_cant_dup_task_builder() {
#[cfg(test)]
fn block_forever() { let (po, _ch) = stream::<()>(); po.recv(); }
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_unlinked_unsup_no_fail_down() { // grandchild sends on a port
use rt::test::run_in_uv_task;
do run_in_uv_task {
let (po, ch) = stream();
let ch = SharedChan::new(ch);
do spawn_unlinked {
let ch = ch.clone();
do spawn_unlinked {
// Give middle task a chance to fail-but-not-kill-us.
do 16.times { task::deschedule(); }
ch.send(()); // If killed first, grandparent hangs.
}
fail!(); // Shouldn't kill either (grand)parent or (grand)child.
}
po.recv();
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_unlinked_unsup_no_fail_up() { // child unlinked fails
use rt::test::run_in_uv_task;
do run_in_uv_task {
do spawn_unlinked { fail!(); }
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_unlinked_sup_no_fail_up() { // child unlinked fails
use rt::test::run_in_uv_task;
do run_in_uv_task {
do spawn_supervised { fail!(); }
// Give child a chance to fail-but-not-kill-us.
do 16.times { task::deschedule(); }
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_unlinked_sup_fail_down() {
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
do spawn_supervised { block_forever(); }
fail!(); // Shouldn't leave a child hanging around.
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_sup_fail_up() { // child fails; parent fails
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// Unidirectional "parenting" shouldn't override bidirectional linked.
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
let mut b0 = task();
b0.opts.linked = true;
b0.opts.supervised = true;
do b0.spawn {
fail!();
}
block_forever(); // We should get punted awake
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_sup_fail_down() { // parent fails; child fails
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// We have to cheat with opts - the interface doesn't support them because
// they don't make sense (redundant with task().supervised()).
let mut b0 = task();
b0.opts.linked = true;
b0.opts.supervised = true;
do b0.spawn { block_forever(); }
fail!(); // *both* mechanisms would be wrong if this didn't kill the child
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_unsup_fail_up() { // child fails; parent fails
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// Default options are to spawn linked & unsupervised.
do spawn { fail!(); }
block_forever(); // We should get punted awake
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_unsup_fail_down() { // parent fails; child fails
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// Default options are to spawn linked & unsupervised.
do spawn { block_forever(); }
fail!();
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_unsup_default_opts() { // parent fails; child fails
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// Make sure the above test is the same as this one.
let mut builder = task();
builder.linked();
do builder.spawn { block_forever(); }
fail!();
};
assert!(result.is_err());
}
}
// A couple bonus linked failure tests - testing for failure propagation even
// when the middle task exits successfully early before kill signals are sent.
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_failure_propagate_grandchild() {
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// Middle task exits; does grandparent's failure propagate across the gap?
do spawn_supervised {
do spawn_supervised { block_forever(); }
}
do 16.times { task::deschedule(); }
fail!();
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_failure_propagate_secondborn() {
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// First-born child exits; does parent's failure propagate to sibling?
do spawn_supervised {
do spawn { block_forever(); } // linked
}
do 16.times { task::deschedule(); }
fail!();
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_failure_propagate_nephew_or_niece() {
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// Our sibling exits; does our failure propagate to sibling's child?
do spawn { // linked
do spawn_supervised { block_forever(); }
}
do 16.times { task::deschedule(); }
fail!();
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_linked_sup_propagate_sibling() {
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result: Result<(), ~Any> = do try {
// Middle sibling exits - does eldest's failure propagate to youngest?
do spawn { // linked
do spawn { block_forever(); } // linked
}
do 16.times { task::deschedule(); }
fail!();
};
assert!(result.is_err());
}
}
#[test]
fn test_unnamed_task() {
use rt::test::run_in_uv_task;
@ -1014,7 +534,7 @@ fn test_send_named_task() {
#[test]
fn test_run_basic() {
let (po, ch) = stream::<()>();
let mut builder = task();
let builder = task();
do builder.spawn {
ch.send(());
}
@ -1053,7 +573,6 @@ fn test_future_result() {
let mut builder = task();
let result = builder.future_result();
builder.unlinked();
do builder.spawn {
fail!();
}
@ -1224,7 +743,7 @@ fn test_avoid_copying_the_body_spawn() {
#[test]
fn test_avoid_copying_the_body_task_spawn() {
do avoid_copying_the_body |f| {
let mut builder = task();
let builder = task();
do builder.spawn || {
f();
}
@ -1240,86 +759,6 @@ fn test_avoid_copying_the_body_try() {
}
}
#[test]
fn test_avoid_copying_the_body_unlinked() {
do avoid_copying_the_body |f| {
do spawn_unlinked || {
f();
}
}
}
#[ignore(reason = "linked failure")]
#[test]
#[should_fail]
fn test_unkillable() {
let (po, ch) = stream();
// We want to do this after failing
do spawn_unlinked {
do 10.times { deschedule() }
ch.send(());
}
do spawn {
deschedule();
// We want to fail after the unkillable task
// blocks on recv
fail!();
}
unsafe {
do unkillable {
let p = ~0;
let pp: *uint = cast::transmute(p);
// If we are killed here then the box will leak
po.recv();
let _p: ~int = cast::transmute(pp);
}
}
// Now we can be killed
po.recv();
}
#[ignore(reason = "linked failure")]
#[test]
#[should_fail]
fn test_unkillable_nested() {
let (po, ch) = comm::stream();
// We want to do this after failing
do spawn_unlinked || {
do 10.times { deschedule() }
ch.send(());
}
do spawn {
deschedule();
// We want to fail after the unkillable task
// blocks on recv
fail!();
}
unsafe {
do unkillable {
do unkillable {} // Here's the difference from the previous test.
let p = ~0;
let pp: *uint = cast::transmute(p);
// If we are killed here then the box will leak
po.recv();
let _p: ~int = cast::transmute(pp);
}
}
// Now we can be killed
po.recv();
}
#[test]
fn test_child_doesnt_ref_parent() {
// If the child refcounts the parent task, this will stack overflow when
@ -1350,67 +789,6 @@ fn test_simple_newsched_spawn() {
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_spawn_watched() {
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result = do try {
let mut t = task();
t.unlinked();
t.watched();
do t.spawn {
let mut t = task();
t.unlinked();
t.watched();
do t.spawn {
task::deschedule();
fail!();
}
}
};
assert!(result.is_err());
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_indestructible() {
use rt::test::run_in_uv_task;
do run_in_uv_task {
let result = do try {
let mut t = task();
t.watched();
t.supervised();
t.indestructible();
do t.spawn {
let (p1, _c1) = stream::<()>();
let (p2, c2) = stream::<()>();
let (p3, c3) = stream::<()>();
let mut t = task();
t.unwatched();
do t.spawn {
do (|| {
p1.recv(); // would deadlock if not killed
}).finally {
c2.send(());
};
}
let mut t = task();
t.unwatched();
do t.spawn {
p3.recv();
task::deschedule();
fail!();
}
c3.send(());
p2.recv();
}
};
assert!(result.is_ok());
}
}
#[test]
fn test_try_fail_message_static_str() {
match do try {
@ -1455,19 +833,6 @@ fn test_try_fail_message_any() {
}
}
#[ignore(reason = "linked failure")]
#[test]
fn test_try_fail_message_linked() {
match do try {
do spawn {
fail!()
}
} {
Err(ref e) if e.is::<LinkedFailure>() => {}
Err(_) | Ok(()) => fail!()
}
}
#[test]
fn test_try_fail_message_unit_struct() {
struct Juju;

View File

@ -9,6 +9,10 @@
// except according to those terms.
/*!**************************************************************************
*
* WARNING: linked failure has been removed since this doc comment was written,
* but it was so pretty that I didn't want to remove it.
*
* Spawning & linked failure
*
* Several data structures are involved in task management to allow properly
@ -73,541 +77,30 @@
use prelude::*;
use cast::transmute;
use cast;
use cell::Cell;
use comm::{Chan, GenericChan, oneshot};
use container::MutableMap;
use hashmap::{HashSet, HashSetMoveIterator};
use local_data;
use comm::{GenericChan, oneshot};
use rt::local::Local;
use rt::sched::{Scheduler, Shutdown, TaskFromFriend};
use rt::task::{Task, Sched};
use rt::task::{UnwindResult, Success, Failure};
use rt::task::UnwindResult;
use rt::thread::Thread;
use rt::work_queue::WorkQueue;
use rt::{in_green_task_context, new_event_loop, KillHandle};
use task::LinkedFailure;
use rt::{in_green_task_context, new_event_loop};
use task::SingleThreaded;
use task::TaskOpts;
use task::unkillable;
use uint;
use unstable::sync::Exclusive;
use util;
#[cfg(test)] use task::default_task_opts;
#[cfg(test)] use comm;
#[cfg(test)] use task;
struct TaskSet(HashSet<KillHandle>);
impl TaskSet {
#[inline]
fn new() -> TaskSet {
TaskSet(HashSet::new())
}
#[inline]
fn insert(&mut self, task: KillHandle) {
let didnt_overwrite = (**self).insert(task);
assert!(didnt_overwrite);
}
#[inline]
fn remove(&mut self, task: &KillHandle) {
let was_present = (**self).remove(task);
assert!(was_present);
}
#[inline]
fn move_iter(self) -> HashSetMoveIterator<KillHandle> {
(*self).move_iter()
}
}
// One of these per group of linked-failure tasks.
struct TaskGroupData {
// All tasks which might kill this group. When this is empty, the group
// can be "GC"ed (i.e., its link in the ancestor list can be removed).
members: TaskSet,
// All tasks unidirectionally supervised by (directly or transitively)
// tasks in this group.
descendants: TaskSet,
}
type TaskGroupArc = Exclusive<Option<TaskGroupData>>;
type TaskGroupInner<'self> = &'self mut Option<TaskGroupData>;
// A taskgroup is 'dead' when nothing can cause it to fail; only members can.
fn taskgroup_is_dead(tg: &TaskGroupData) -> bool {
tg.members.is_empty()
}
// A list-like structure by which taskgroups keep track of all ancestor groups
// which may kill them. Needed for tasks to be able to remove themselves from
// ancestor groups upon exit. The list has a node for each "generation", and
// ends either at the root taskgroup (which has no ancestors) or at a
// taskgroup which was spawned-unlinked. Tasks from intermediate generations
// have references to the middle of the list; when intermediate generations
// die, their node in the list will be collected at a descendant's spawn-time.
struct AncestorNode {
// Since the ancestor list is recursive, we end up with references to
// exclusives within other exclusives. This is dangerous business (if
// circular references arise, deadlock and memory leaks are imminent).
// Hence we assert that this counter monotonically decreases as we
// approach the tail of the list.
generation: uint,
// Handle to the tasks in the group of the current generation.
parent_group: TaskGroupArc,
// Recursive rest of the list.
ancestors: AncestorList,
}
struct AncestorList(Option<Exclusive<AncestorNode>>);
// Accessors for taskgroup arcs and ancestor arcs that wrap the unsafety.
#[inline]
fn access_group<U>(x: &TaskGroupArc, blk: |TaskGroupInner| -> U) -> U {
unsafe {
x.with(blk)
}
}
#[inline]
fn access_ancestors<U>(
x: &Exclusive<AncestorNode>,
blk: |x: &mut AncestorNode| -> U)
-> U {
unsafe {
x.with(blk)
}
}
#[inline] #[cfg(test)]
fn check_generation(younger: uint, older: uint) { assert!(younger > older); }
#[inline] #[cfg(not(test))]
fn check_generation(_younger: uint, _older: uint) { }
#[inline] #[cfg(test)]
fn incr_generation(ancestors: &AncestorList) -> uint {
ancestors.as_ref().map_default(0, |arc| access_ancestors(arc, |a| a.generation+1))
}
#[inline] #[cfg(not(test))]
fn incr_generation(_ancestors: &AncestorList) -> uint { 0 }
// Iterates over an ancestor list.
// (1) Runs forward_blk on each ancestral taskgroup in the list
// (2) If forward_blk "break"s, runs optional bail_blk on all ancestral
// taskgroups that forward_blk already ran on successfully (Note: bail_blk
// is NOT called on the block that forward_blk broke on!).
// (3) As a bonus, coalesces away all 'dead' taskgroup nodes in the list.
fn each_ancestor(list: &mut AncestorList,
bail_blk: |TaskGroupInner|,
forward_blk: |TaskGroupInner| -> bool)
-> bool {
// "Kickoff" call - there was no last generation.
return !coalesce(list, bail_blk, forward_blk, uint::max_value);
// Recursively iterates, and coalesces afterwards if needed. Returns
// whether or not unwinding is needed (i.e., !successful iteration).
fn coalesce(list: &mut AncestorList,
bail_blk: |TaskGroupInner|,
forward_blk: |TaskGroupInner| -> bool,
last_generation: uint) -> bool {
let (coalesce_this, early_break) =
iterate(list, bail_blk, forward_blk, last_generation);
// What should our next ancestor end up being?
if coalesce_this.is_some() {
// Needed coalesce. Our next ancestor becomes our old
// ancestor's next ancestor. ("next = old_next->next;")
*list = coalesce_this.unwrap();
}
return early_break;
}
// Returns an optional list-to-coalesce and whether unwinding is needed.
// Option<ancestor_list>:
// Whether or not the ancestor taskgroup being iterated over is
// dead or not; i.e., it has no more tasks left in it, whether or not
// it has descendants. If dead, the caller shall coalesce it away.
// bool:
// True if the supplied block did 'break', here or in any recursive
// calls. If so, must call the unwinder on all previous nodes.
fn iterate(ancestors: &mut AncestorList,
bail_blk: |TaskGroupInner|,
forward_blk: |TaskGroupInner| -> bool,
last_generation: uint)
-> (Option<AncestorList>, bool) {
// At each step of iteration, three booleans are at play which govern
// how the iteration should behave.
// 'nobe_is_dead' - Should the list should be coalesced at this point?
// Largely unrelated to the other two.
// 'need_unwind' - Should we run the bail_blk at this point? (i.e.,
// do_continue was false not here, but down the line)
// 'do_continue' - Did the forward_blk succeed at this point? (i.e.,
// should we recurse? or should our callers unwind?)
let forward_blk = Cell::new(forward_blk);
// The map defaults to None, because if ancestors is None, we're at
// the end of the list, which doesn't make sense to coalesce.
do ancestors.as_ref().map_default((None,false)) |ancestor_arc| {
// NB: Takes a lock! (this ancestor node)
do access_ancestors(ancestor_arc) |nobe| {
// Argh, but we couldn't give it to coalesce() otherwise.
let forward_blk = forward_blk.take();
// Check monotonicity
check_generation(last_generation, nobe.generation);
/*##########################################################*
* Step 1: Look at this ancestor group (call iterator block).
*##########################################################*/
let mut nobe_is_dead = false;
let do_continue =
// NB: Takes a lock! (this ancestor node's parent group)
do access_group(&nobe.parent_group) |tg_opt| {
// Decide whether this group is dead. Note that the
// group being *dead* is disjoint from it *failing*.
nobe_is_dead = match *tg_opt {
Some(ref tg) => taskgroup_is_dead(tg),
None => nobe_is_dead
};
// Call iterator block. (If the group is dead, it's
// safe to skip it. This will leave our KillHandle
// hanging around in the group even after it's freed,
// but that's ok because, by virtue of the group being
// dead, nobody will ever kill-all (for) over it.)
if nobe_is_dead { true } else { forward_blk(tg_opt) }
};
/*##########################################################*
* Step 2: Recurse on the rest of the list; maybe coalescing.
*##########################################################*/
// 'need_unwind' is only set if blk returned true above, *and*
// the recursive call early-broke.
let mut need_unwind = false;
if do_continue {
// NB: Takes many locks! (ancestor nodes & parent groups)
need_unwind = coalesce(&mut nobe.ancestors, |tg| bail_blk(tg),
forward_blk, nobe.generation);
}
/*##########################################################*
* Step 3: Maybe unwind; compute return info for our caller.
*##########################################################*/
if need_unwind && !nobe_is_dead {
do access_group(&nobe.parent_group) |tg_opt| {
bail_blk(tg_opt)
}
}
// Decide whether our caller should unwind.
need_unwind = need_unwind || !do_continue;
// Tell caller whether or not to coalesce and/or unwind
if nobe_is_dead {
// Swap the list out here; the caller replaces us with it.
let rest = util::replace(&mut nobe.ancestors,
AncestorList(None));
(Some(rest), need_unwind)
} else {
(None, need_unwind)
}
}
}
}
}
// One of these per task.
pub struct Taskgroup {
// List of tasks with whose fates this one's is intertwined.
priv tasks: TaskGroupArc, // 'none' means the group has failed.
// Lists of tasks who will kill us if they fail, but whom we won't kill.
priv ancestors: AncestorList,
priv notifier: Option<AutoNotify>,
}
impl Drop for Taskgroup {
// Runs on task exit.
fn drop(&mut self) {
// If we are failing, the whole taskgroup needs to die.
do RuntimeGlue::with_task_handle_and_failing |me, failing| {
if failing {
for x in self.notifier.mut_iter() {
x.task_result = Some(Failure(~LinkedFailure as ~Any));
}
// Take everybody down with us. After this point, every
// other task in the group will see 'tg' as none, which
// indicates the whole taskgroup is failing (and forbids
// new spawns from succeeding).
let tg = do access_group(&self.tasks) |tg| { tg.take() };
// It's safe to send kill signals outside the lock, because
// we have a refcount on all kill-handles in the group.
kill_taskgroup(tg, me);
} else {
// Remove ourselves from the group(s).
do access_group(&self.tasks) |tg| {
leave_taskgroup(tg, me, true);
}
}
// It doesn't matter whether this happens before or after dealing
// with our own taskgroup, so long as both happen before we die.
// We remove ourself from every ancestor we can, so no cleanup; no
// break.
do each_ancestor(&mut self.ancestors, |_| {}) |ancestor_group| {
leave_taskgroup(ancestor_group, me, false);
true
};
}
}
}
pub fn Taskgroup(tasks: TaskGroupArc,
ancestors: AncestorList,
mut notifier: Option<AutoNotify>) -> Taskgroup {
for x in notifier.mut_iter() {
x.task_result = Some(Success);
}
Taskgroup {
tasks: tasks,
ancestors: ancestors,
notifier: notifier
}
}
struct AutoNotify {
notify_chan: Chan<UnwindResult>,
// XXX: By value self drop would allow this to be a plain UnwindResult
task_result: Option<UnwindResult>,
}
impl AutoNotify {
pub fn new(chan: Chan<UnwindResult>) -> AutoNotify {
AutoNotify {
notify_chan: chan,
// Un-set above when taskgroup successfully made.
task_result: Some(Failure(~("AutoNotify::new()") as ~Any))
}
}
}
impl Drop for AutoNotify {
fn drop(&mut self) {
let result = self.task_result.take_unwrap();
self.notify_chan.send(result);
}
}
fn enlist_in_taskgroup(state: TaskGroupInner, me: KillHandle,
is_member: bool) -> bool {
let me = Cell::new(me); // :(
// If 'None', the group was failing. Can't enlist.
do state.as_mut().map_default(false) |group| {
(if is_member {
&mut group.members
} else {
&mut group.descendants
}).insert(me.take());
true
}
}
// NB: Runs in destructor/post-exit context. Can't 'fail'.
fn leave_taskgroup(state: TaskGroupInner, me: &KillHandle, is_member: bool) {
let me = Cell::new(me); // :(
// If 'None', already failing and we've already gotten a kill signal.
do state.as_mut().map |group| {
(if is_member {
&mut group.members
} else {
&mut group.descendants
}).remove(me.take());
};
}
// NB: Runs in destructor/post-exit context. Can't 'fail'.
fn kill_taskgroup(state: Option<TaskGroupData>, me: &KillHandle) {
// Might already be None, if somebody is failing simultaneously.
// That's ok; only one task needs to do the dirty work. (Might also
// see 'None' if somebody already failed and we got a kill signal.)
do state.map |TaskGroupData { members: members, descendants: descendants }| {
for sibling in members.move_iter() {
// Skip self - killing ourself won't do much good.
if &sibling != me {
RuntimeGlue::kill_task(sibling);
}
}
for child in descendants.move_iter() {
assert!(&child != me);
RuntimeGlue::kill_task(child);
}
};
// (note: multiple tasks may reach this point)
}
// FIXME (#2912): Work around core-vs-coretest function duplication. Can't use
// a proper closure because the #[test]s won't understand. Have to fake it.
fn taskgroup_key() -> local_data::Key<@@mut Taskgroup> {
unsafe { cast::transmute(-2) }
}
// Transitionary.
struct RuntimeGlue;
impl RuntimeGlue {
fn kill_task(mut handle: KillHandle) {
do handle.kill().map |killed_task| {
let killed_task = Cell::new(killed_task);
do Local::borrow |sched: &mut Scheduler| {
sched.enqueue_task(killed_task.take());
}
};
}
fn with_task_handle_and_failing(blk: |&KillHandle, bool|) {
assert!(in_green_task_context());
unsafe {
// Can't use safe borrow, because the taskgroup destructor needs to
// access the scheduler again to send kill signals to other tasks.
let me: *mut Task = Local::unsafe_borrow();
blk((*me).death.kill_handle.get_ref(), (*me).unwinder.unwinding)
}
}
fn with_my_taskgroup<U>(blk: |&Taskgroup| -> U) -> U {
assert!(in_green_task_context());
unsafe {
// Can't use safe borrow, because creating new hashmaps for the
// tasksets requires an rng, which needs to borrow the sched.
let me: *mut Task = Local::unsafe_borrow();
blk(match (*me).taskgroup {
None => {
// First task in its (unlinked/unsupervised) taskgroup.
// Lazily initialize.
let mut members = TaskSet::new();
let my_handle = (*me).death.kill_handle.get_ref().clone();
members.insert(my_handle);
let tasks = Exclusive::new(Some(TaskGroupData {
members: members,
descendants: TaskSet::new(),
}));
let group = Taskgroup(tasks, AncestorList(None), None);
(*me).taskgroup = Some(group);
(*me).taskgroup.get_ref()
}
Some(ref group) => group,
})
}
}
}
// Returns 'None' in the case where the child's TG should be lazily initialized.
fn gen_child_taskgroup(linked: bool, supervised: bool)
-> Option<(TaskGroupArc, AncestorList)> {
if linked || supervised {
// with_my_taskgroup will lazily initialize the parent's taskgroup if
// it doesn't yet exist. We don't want to call it in the unlinked case.
do RuntimeGlue::with_my_taskgroup |spawner_group| {
let ancestors = AncestorList(spawner_group.ancestors.as_ref().map(|x| x.clone()));
if linked {
// Child is in the same group as spawner.
// Child's ancestors are spawner's ancestors.
Some((spawner_group.tasks.clone(), ancestors))
} else {
// Child is in a separate group from spawner.
let g = Exclusive::new(Some(TaskGroupData {
members: TaskSet::new(),
descendants: TaskSet::new(),
}));
let a = if supervised {
let new_generation = incr_generation(&ancestors);
assert!(new_generation < uint::max_value);
// Child's ancestors start with the spawner.
// Build a new node in the ancestor list.
AncestorList(Some(Exclusive::new(AncestorNode {
generation: new_generation,
parent_group: spawner_group.tasks.clone(),
ancestors: ancestors,
})))
} else {
// Child has no ancestors.
AncestorList(None)
};
Some((g, a))
}
}
} else {
None
}
}
// Set up membership in taskgroup and descendantship in all ancestor
// groups. If any enlistment fails, Some task was already failing, so
// don't let the child task run, and undo every successful enlistment.
fn enlist_many(child: &KillHandle, child_arc: &TaskGroupArc,
ancestors: &mut AncestorList) -> bool {
// Join this taskgroup.
let mut result = do access_group(child_arc) |child_tg| {
enlist_in_taskgroup(child_tg, child.clone(), true) // member
};
if result {
// Unwinding function in case any ancestral enlisting fails
let bail: |TaskGroupInner| = |tg| { leave_taskgroup(tg, child, false) };
// Attempt to join every ancestor group.
result = do each_ancestor(ancestors, bail) |ancestor_tg| {
// Enlist as a descendant, not as an actual member.
// Descendants don't kill ancestor groups on failure.
enlist_in_taskgroup(ancestor_tg, child.clone(), false)
};
// If any ancestor group fails, need to exit this group too.
if !result {
do access_group(child_arc) |child_tg| {
leave_taskgroup(child_tg, child, true); // member
}
}
}
result
}
pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
assert!(in_green_task_context());
let child_data = Cell::new(gen_child_taskgroup(opts.linked, opts.supervised));
let indestructible = opts.indestructible;
let child_wrapper: proc() = || {
// Child task runs this code.
// If child data is 'None', the enlist is vacuously successful.
let enlist_success = do child_data.take().map_default(true) |child_data| {
let child_data = Cell::new(child_data); // :(
do Local::borrow |me: &mut Task| {
let (child_tg, ancestors) = child_data.take();
let mut ancestors = ancestors;
let handle = me.death.kill_handle.get_ref();
// Atomically try to get into all of our taskgroups.
if enlist_many(handle, &child_tg, &mut ancestors) {
// Got in. We can run the provided child body, and can also run
// the taskgroup's exit-time-destructor afterward.
me.taskgroup = Some(Taskgroup(child_tg, ancestors, None));
true
} else {
false
}
}
};
// Should be run after the local-borrowed task is returned.
let f_cell = Cell::new(f);
if enlist_success {
if indestructible {
do unkillable { f_cell.take()() }
} else {
f_cell.take()()
}
}
};
let mut task = if opts.sched.mode != SingleThreaded {
if opts.watched {
Task::build_child(opts.stack_size, child_wrapper)
Task::build_child(opts.stack_size, f)
} else {
Task::build_root(opts.stack_size, child_wrapper)
Task::build_root(opts.stack_size, f)
}
} else {
unsafe {
@ -634,9 +127,9 @@ pub fn spawn_raw(mut opts: TaskOpts, f: proc()) {
// Pin the new task to the new scheduler
let new_task = if opts.watched {
Task::build_homed_child(opts.stack_size, child_wrapper, Sched(new_sched_handle))
Task::build_homed_child(opts.stack_size, f, Sched(new_sched_handle))
} else {
Task::build_homed_root(opts.stack_size, child_wrapper, Sched(new_sched_handle))
Task::build_homed_root(opts.stack_size, f, Sched(new_sched_handle))
};
// Create a task that will later be used to join with the new scheduler
@ -711,7 +204,6 @@ fn test_spawn_raw_simple() {
#[test]
fn test_spawn_raw_unsupervise() {
let opts = task::TaskOpts {
linked: false,
watched: false,
notify_chan: None,
.. default_task_opts()
@ -740,7 +232,6 @@ fn test_spawn_raw_notify_failure() {
let (notify_po, notify_ch) = comm::stream();
let opts = task::TaskOpts {
linked: false,
watched: false,
notify_chan: Some(notify_ch),
.. default_task_opts()

View File

@ -25,7 +25,7 @@ do || {
use ops::Drop;
#[cfg(test)] use task::{failing, spawn};
#[cfg(test)] use task::failing;
pub trait Finally<T> {
fn finally(&self, dtor: ||) -> T;

View File

@ -135,65 +135,63 @@ impl<T: Send> UnsafeArc<T> {
/// block; otherwise, an unwrapping task can be killed by linked failure.
pub fn unwrap(self) -> T {
let this = Cell::new(self); // argh
do task::unkillable {
unsafe {
let mut this = this.take();
// The ~ dtor needs to run if this code succeeds.
let mut data: ~ArcData<T> = cast::transmute(this.data);
// Set up the unwrap protocol.
let (p1,c1) = comm::oneshot(); // ()
let (p2,c2) = comm::oneshot(); // bool
// Try to put our server end in the unwrapper slot.
// This needs no barrier -- it's protected by the release barrier on
// the xadd, and the acquire+release barrier in the destructor's xadd.
if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
// Got in. Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// Drop our own reference.
let old_count = data.count.fetch_sub(1, Release);
assert!(old_count >= 1);
if old_count == 1 {
// We were the last owner. Can unwrap immediately.
// AtomicOption's destructor will free the server endpoint.
unsafe {
let mut this = this.take();
// The ~ dtor needs to run if this code succeeds.
let mut data: ~ArcData<T> = cast::transmute(this.data);
// Set up the unwrap protocol.
let (p1,c1) = comm::oneshot(); // ()
let (p2,c2) = comm::oneshot(); // bool
// Try to put our server end in the unwrapper slot.
// This needs no barrier -- it's protected by the release barrier on
// the xadd, and the acquire+release barrier in the destructor's xadd.
if data.unwrapper.fill(~(c1,p2), Relaxed).is_none() {
// Got in. Tell this handle's destructor not to run (we are now it).
this.data = ptr::mut_null();
// Drop our own reference.
let old_count = data.count.fetch_sub(1, Release);
assert!(old_count >= 1);
if old_count == 1 {
// We were the last owner. Can unwrap immediately.
// AtomicOption's destructor will free the server endpoint.
// FIXME(#3224): it should be like this
// let ~ArcData { data: user_data, _ } = data;
// user_data
data.data.take_unwrap()
} else {
// The *next* person who sees the refcount hit 0 will wake us.
let p1 = Cell::new(p1); // argh
// Unlike the above one, this cell is necessary. It will get
// taken either in the do block or in the finally block.
let c2_and_data = Cell::new((c2,data));
do (|| {
p1.take().recv();
// Got here. Back in the 'unkillable' without getting killed.
let (c2, data) = c2_and_data.take();
c2.send(true);
// FIXME(#3224): it should be like this
// let ~ArcData { data: user_data, _ } = data;
// user_data
let mut data = data;
data.data.take_unwrap()
} else {
// The *next* person who sees the refcount hit 0 will wake us.
let p1 = Cell::new(p1); // argh
// Unlike the above one, this cell is necessary. It will get
// taken either in the do block or in the finally block.
let c2_and_data = Cell::new((c2,data));
do (|| {
do task::rekillable { p1.take().recv(); }
// Got here. Back in the 'unkillable' without getting killed.
}).finally {
if task::failing() {
// Killed during wait. Because this might happen while
// someone else still holds a reference, we can't free
// the data now; the "other" last refcount will free it.
let (c2, data) = c2_and_data.take();
c2.send(true);
// FIXME(#3224): it should be like this
// let ~ArcData { data: user_data, _ } = data;
// user_data
let mut data = data;
data.data.take_unwrap()
}).finally {
if task::failing() {
// Killed during wait. Because this might happen while
// someone else still holds a reference, we can't free
// the data now; the "other" last refcount will free it.
let (c2, data) = c2_and_data.take();
c2.send(false);
cast::forget(data);
} else {
assert!(c2_and_data.is_empty());
}
c2.send(false);
cast::forget(data);
} else {
assert!(c2_and_data.is_empty());
}
}
} else {
// If 'put' returns the server end back to us, we were rejected;
// someone else was trying to unwrap. Avoid guaranteed deadlock.
cast::forget(data);
fail!("Another task is already unwrapping this Arc!");
}
} else {
// If 'put' returns the server end back to us, we were rejected;
// someone else was trying to unwrap. Avoid guaranteed deadlock.
cast::forget(data);
fail!("Another task is already unwrapping this Arc!");
}
}
}
@ -259,17 +257,15 @@ impl<T> Drop for UnsafeArc<T>{
match data.unwrapper.take(Acquire) {
Some(~(message,response)) => {
let cell = Cell::new((message, response, data));
do task::unkillable {
let (message, response, data) = cell.take();
// Send 'ready' and wait for a response.
message.send(());
// Unkillable wait. Message guaranteed to come.
if response.recv() {
// Other task got the data.
cast::forget(data);
} else {
// Other task was killed. drop glue takes over.
}
let (message, response, data) = cell.take();
// Send 'ready' and wait for a response.
message.send(());
// Unkillable wait. Message guaranteed to come.
if response.recv() {
// Other task got the data.
cast::forget(data);
} else {
// Other task was killed. drop glue takes over.
}
}
None => {
@ -678,24 +674,4 @@ mod tests {
assert!(x.unwrap() == ~~"hello");
assert!(res.recv().is_ok());
}
#[test]
fn exclusive_new_unwrap_deadlock() {
// This is not guaranteed to get to the deadlock before being killed,
// but it will show up sometimes, and if the deadlock were not there,
// the test would nondeterministically fail.
let result = do task::try {
// a task that has two references to the same Exclusive::new will
// deadlock when it unwraps. nothing to be done about that.
let x = Exclusive::new(~~"hello");
let x2 = x.clone();
do task::spawn {
do 10.times { task::deschedule(); } // try to let the unwrapper go
fail!(); // punt it awake from its deadlock
}
let _z = x.unwrap();
unsafe { do x2.with |_hello| { } }
};
assert!(result.is_err());
}
}

View File

@ -179,9 +179,9 @@ fn main() {
let (from_parent, to_child) = comm::stream();
do task::spawn_with(from_parent) |from_parent| {
do spawn {
make_sequence_processor(sz, &from_parent, &to_parent_);
};
}
to_child
}.collect::<~[Chan<~[u8]>]>();

View File

@ -28,7 +28,7 @@ fn child_generation(gens_left: uint, c: comm::Chan<()>) {
// With this code, only as many generations are alive at a time as tasks
// alive at a time,
let c = Cell::new(c);
do task::spawn_supervised {
do spawn {
let c = c.take();
if gens_left & 1 == 1 {
task::deschedule(); // shake things up a bit

View File

@ -1,4 +1,5 @@
// xfail-pretty
// xfail-test linked failure
// Copyright 2012 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
@ -35,8 +36,6 @@ fn grandchild_group(num_tasks: uint) {
for _ in range(0, num_tasks) {
let ch = ch.clone();
let mut t = task::task();
t.linked();
t.unwatched();
do t.spawn { // linked
ch.send(());
let (p, _c) = stream::<()>();

View File

@ -10,8 +10,11 @@
// error-pattern:task '<unnamed>' failed at 'test'
use std::task;
fn main() {
do spawn {
do task::try {
fail!("test");
}
1
}.unwrap()
}

View File

@ -10,10 +10,13 @@
// error-pattern:task 'owned name' failed at 'test'
use std::task;
fn main() {
let mut t = ::std::task::task();
let mut t = task::task();
t.name(~"owned name");
do t.spawn {
do t.try {
fail!("test");
}
1
}.unwrap()
}

View File

@ -13,7 +13,8 @@
fn main() {
let mut t = ::std::task::task();
t.name("send name".to_send_str());
do t.spawn {
do t.try {
fail!("test");
}
3
}.unwrap()
}

View File

@ -13,7 +13,7 @@
fn main() {
let mut t = ::std::task::task();
t.name("static name");
do t.spawn {
do t.try {
fail!("test");
}
}.unwrap()
}

View File

@ -15,7 +15,7 @@ use std::task;
fn main() {
// the purpose of this test is to make sure that task::spawn()
// works when provided with a bare function:
task::spawn(startfn);
task::try(startfn).unwrap();
}
fn startfn() {

View File

@ -18,5 +18,5 @@ fn f() {
}
pub fn main() {
task::spawn_unlinked(f);
task::spawn(f);
}

View File

@ -42,7 +42,7 @@ fn f(c: SharedChan<bool>) {
pub fn main() {
let (p, c) = stream();
let c = SharedChan::new(c);
task::spawn_unlinked(|| f(c.clone()) );
task::spawn(|| f(c.clone()) );
error!("hiiiiiiiii");
assert!(p.recv());
}

View File

@ -35,5 +35,5 @@ fn f() {
}
pub fn main() {
task::spawn_unlinked(f);
task::spawn(f);
}

View File

@ -18,5 +18,5 @@ fn f() {
}
pub fn main() {
task::spawn_unlinked(f);
task::spawn(f);
}