core: port task.rs to comm::Chan/Port to pipes::Chan/Port

This commit is contained in:
Jeff Olson 2012-08-18 08:05:27 -07:00 committed by Brian Anderson
parent 940ef65e0e
commit 63e25946f0

View File

@ -29,6 +29,7 @@
use cmp::Eq;
use result::Result;
use pipes::{stream, Chan, Port};
export Task;
export TaskResult;
@ -77,6 +78,10 @@ export ThreadPerTask;
export ManualThreads;
export PlatformThread;
macro_rules! move_it {
{ $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } }
}
/* Data types */
/// A handle to a task
@ -203,8 +208,8 @@ type SchedOpts = {
type TaskOpts = {
linked: bool,
supervised: bool,
notify_chan: Option<comm::Chan<Notification>>,
sched: Option<SchedOpts>,
mut notify_chan: Option<Chan<Notification>>,
sched: option<SchedOpts>,
};
/**
@ -214,7 +219,7 @@ type TaskOpts = {
*/
// NB: Builders are designed to be single-use because they do stateful
// things that get weird when reusing - e.g. if you create a result future
// it only applies to a single task, so then you have to maintain some
// it only applies to a single task, so then you have to maintain Some
// potentially tricky state to ensure that everything behaves correctly
// when you try to reuse the builder to spawn a new task. We'll just
// sidestep that whole issue by making builders uncopyable and making
@ -248,7 +253,22 @@ priv impl TaskBuilder {
fail ~"Cannot copy a task_builder"; // Fake move mode on self
}
self.consumed = true;
TaskBuilder({ can_not_copy: None, mut consumed: false,.. *self })
let notify_chan = if self.opts.notify_chan == None {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
TaskBuilder({
opts: {
linked: self.opts.linked,
supervised: self.opts.supervised,
mut notify_chan: notify_chan,
sched: self.opts.sched
},
gen_body: self.gen_body,
can_not_copy: None,
mut consumed: false
})
}
}
@ -258,8 +278,18 @@ impl TaskBuilder {
* the other will not be killed.
*/
fn unlinked() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan == None {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
TaskBuilder({
opts: { linked: false,.. self.opts },
opts: {
linked: false,
supervised: self.opts.supervised,
mut notify_chan: notify_chan,
sched: self.opts.sched
},
can_not_copy: None,
.. *self.consume()
})
@ -270,8 +300,18 @@ impl TaskBuilder {
* the child.
*/
fn supervised() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan == None {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
TaskBuilder({
opts: { linked: false, supervised: true,.. self.opts },
opts: {
linked: false,
supervised: true,
mut notify_chan: notify_chan,
sched: self.opts.sched
},
can_not_copy: None,
.. *self.consume()
})
@ -281,8 +321,18 @@ impl TaskBuilder {
* other will be killed.
*/
fn linked() -> TaskBuilder {
let notify_chan = if self.opts.notify_chan == None {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
TaskBuilder({
opts: { linked: true, supervised: false,.. self.opts },
opts: {
linked: true,
supervised: false,
mut notify_chan: notify_chan,
sched: self.opts.sched
},
can_not_copy: None,
.. *self.consume()
})
@ -316,11 +366,10 @@ impl TaskBuilder {
}
// Construct the future and give it to the caller.
let po = comm::Port::<Notification>();
let ch = comm::Chan(po);
let (ch, po) = stream::<Notification>();
blk(do future::from_fn {
match comm::recv(po) {
match po.recv() {
Exit(_, result) => result
}
});
@ -334,9 +383,18 @@ impl TaskBuilder {
}
/// Configure a custom scheduler mode for the task.
fn sched_mode(mode: SchedMode) -> TaskBuilder {
let notify_chan = if self.opts.notify_chan == None {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
TaskBuilder({
opts: { sched: Some({ mode: mode, foreign_stack_size: None}),
.. self.opts },
opts: {
linked: self.opts.linked,
supervised: self.opts.supervised,
mut notify_chan: notify_chan,
sched: Some({ mode: mode, foreign_stack_size: None})
},
can_not_copy: None,
.. *self.consume()
})
@ -356,7 +414,18 @@ impl TaskBuilder {
*/
fn add_wrapper(wrapper: fn@(+fn~()) -> fn~()) -> TaskBuilder {
let prev_gen_body = self.gen_body;
let notify_chan = if self.opts.notify_chan == None {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
TaskBuilder({
opts: {
linked: self.opts.linked,
supervised: self.opts.supervised,
mut notify_chan: notify_chan,
sched: self.opts.sched
},
gen_body: |body| { wrapper(prev_gen_body(body)) },
can_not_copy: None,
.. *self.consume()
@ -377,7 +446,18 @@ impl TaskBuilder {
*/
fn spawn(+f: fn~()) {
let x = self.consume();
spawn_raw(x.opts, x.gen_body(f));
let notify_chan = if self.opts.notify_chan == None {
None
} else {
Some(option::swap_unwrap(&mut self.opts.notify_chan))
};
let opts = {
linked: x.opts.linked,
supervised: x.opts.supervised,
mut notify_chan: notify_chan,
sched: x.opts.sched
};
spawn_raw(opts, x.gen_body(f));
}
/// Runs a task, while transfering ownership of one argument to the child.
fn spawn_with<A: Send>(+arg: A, +f: fn~(+A)) {
@ -394,7 +474,7 @@ impl TaskBuilder {
* child task, passes the port to child's body, and returns a channel
* linked to the port to the parent.
*
* This encapsulates some boilerplate handshaking logic that would
* This encapsulates Some boilerplate handshaking logic that would
* otherwise be required to establish communication from the parent
* to the child.
*/
@ -466,7 +546,7 @@ fn default_task_opts() -> TaskOpts {
{
linked: true,
supervised: false,
notify_chan: None,
mut notify_chan: None,
sched: None
}
}
@ -872,7 +952,7 @@ fn each_ancestor(list: &mut AncestorList,
// 'do_continue' - Did the forward_blk succeed at this point? (i.e.,
// should we recurse? or should our callers unwind?)
// The map defaults to none, because if ancestors is none, we're at
// The map defaults to None, because if ancestors is None, we're at
// the end of the list, which doesn't make sense to coalesce.
return do (**ancestors).map_default((None,false)) |ancestor_arc| {
// NB: Takes a lock! (this ancestor node)
@ -950,7 +1030,11 @@ fn each_ancestor(list: &mut AncestorList,
struct TCB {
me: *rust_task,
// List of tasks with whose fates this one's is intertwined.
<<<<<<< HEAD
tasks: TaskGroupArc, // 'none' means the group has failed.
=======
let tasks: TaskGroupArc; // 'None' means the group has failed.
>>>>>>> core: port task.rs to comm::Chan/Port to pipes::Chan/Port
// Lists of tasks who will kill us if they fail, but whom we won't kill.
mut ancestors: AncestorList,
is_main: bool,
@ -995,15 +1079,15 @@ fn TCB(me: *rust_task, +tasks: TaskGroupArc, +ancestors: AncestorList,
}
struct AutoNotify {
notify_chan: comm::Chan<Notification>,
notify_chan: Chan<Notification>,
mut failed: bool,
drop {
let result = if self.failed { Failure } else { Success };
comm::send(self.notify_chan, Exit(get_task(), result));
self.notify_chan.send(Exit(get_task(), result));
}
}
fn AutoNotify(chan: comm::Chan<Notification>) -> AutoNotify {
fn AutoNotify(chan: Chan<Notification>) -> AutoNotify {
AutoNotify {
notify_chan: chan,
failed: true // Un-set above when taskgroup successfully made.
@ -1013,7 +1097,7 @@ fn AutoNotify(chan: comm::Chan<Notification>) -> AutoNotify {
fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task,
is_member: bool) -> bool {
let newstate = util::replace(state, None);
// If 'none', the group was failing. Can't enlist.
// If 'None', the group was failing. Can't enlist.
if newstate.is_some() {
let group = option::unwrap(newstate);
taskset_insert(if is_member { &mut group.members }
@ -1028,7 +1112,7 @@ fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task,
// NB: Runs in destructor/post-exit context. Can't 'fail'.
fn leave_taskgroup(state: TaskGroupInner, me: *rust_task, is_member: bool) {
let newstate = util::replace(state, None);
// If 'none', already failing and we've already gotten a kill signal.
// If 'None', already failing and we've already gotten a kill signal.
if newstate.is_some() {
let group = option::unwrap(newstate);
taskset_remove(if is_member { &mut group.members }
@ -1048,9 +1132,9 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) {
// To do it differently, we'd have to use the runtime's task refcounting,
// but that could leave task structs around long after their task exited.
let newstate = util::replace(state, None);
// Might already be none, if somebody is failing simultaneously.
// Might already be None, if Somebody is failing simultaneously.
// That's ok; only one task needs to do the dirty work. (Might also
// see 'none' if somebody already failed and we got a kill signal.)
// see 'None' if Somebody already failed and we got a kill signal.)
if newstate.is_some() {
let group = option::unwrap(newstate);
for taskset_each(&group.members) |+sibling| {
@ -1067,7 +1151,7 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) {
if is_main {
rustrt::rust_task_kill_all(me);
}
// Do NOT restore state to Some(..)! It stays none to indicate
// Do NOT restore state to Some(..)! It stays None to indicate
// that the whole taskgroup is failing, to forbid new spawns.
}
// (note: multiple tasks may reach this point)
@ -1145,7 +1229,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool)
// Appease the borrow-checker. Really this wants to be written as:
// match ancestors
// Some(ancestor_arc) { ancestor_list(Some(ancestor_arc.clone())) }
// none { ancestor_list(none) }
// None { ancestor_list(None) }
let tmp = util::replace(&mut **ancestors, None);
if tmp.is_some() {
let ancestor_arc = option::unwrap(tmp);
@ -1175,10 +1259,15 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) {
};
assert !new_task.is_null();
// Getting killed after here would leak the task.
let mut notify_chan = if opts.notify_chan == None {
None
} else {
Some(option::swap_unwrap(&mut opts.notify_chan))
};
let child_wrapper =
make_child_wrapper(new_task, child_tg, ancestors, is_main,
opts.notify_chan, f);
notify_chan, f);
let fptr = ptr::addr_of(child_wrapper);
let closure: *rust_closure = unsafe::reinterpret_cast(&fptr);
@ -1198,17 +1287,25 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) {
// (4) ...and runs the provided body function.
fn make_child_wrapper(child: *rust_task, +child_arc: TaskGroupArc,
+ancestors: AncestorList, is_main: bool,
notify_chan: Option<comm::Chan<Notification>>,
+notify_chan: Option<Chan<Notification>>,
+f: fn~()) -> fn~() {
let child_data = ~mut Some((child_arc, ancestors));
return fn~() {
return fn~(move notify_chan) {
// Agh. Get move-mode items into the closure. FIXME (#2829)
let mut (child_arc, ancestors) = option::swap_unwrap(child_data);
// Child task runs this code.
// Even if the below code fails to kick the child off, we must
// send something on the notify channel.
let notifier = notify_chan.map(|c| AutoNotify(c));
// send Something on the notify channel.
//let mut notifier = None;//notify_chan.map(|c| AutoNotify(c));
let notifier = match notify_chan {
Some(notify_chan_value) => {
let moved_ncv = move_it!{notify_chan_value};
Some(AutoNotify(moved_ncv))
}
_ => None
};
if enlist_many(child, &child_arc, &mut ancestors) {
let group = @TCB(child, child_arc, ancestors,
@ -1221,7 +1318,7 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) {
};
// Set up membership in taskgroup and descendantship in all ancestor
// groups. If any enlistment fails, some task was already failing, so
// groups. If any enlistment fails, Some task was already failing, so
// don't let the child task run, and undo every successful enlistment.
fn enlist_many(child: *rust_task, child_arc: &TaskGroupArc,
ancestors: &mut AncestorList) -> bool {
@ -1387,7 +1484,7 @@ unsafe fn local_data_lookup<T: Owned>(
}
);
do map_pos.map |index| {
// .get() is guaranteed because of "none { false }" above.
// .get() is guaranteed because of "None { false }" above.
let (_, data_ptr, _) = (*map)[index].get();
(index, data_ptr)
}
@ -1452,7 +1549,7 @@ unsafe fn local_set<T: Owned>(
}
None => {
// Find an empty slot. If not, grow the vector.
match (*map).position(|x| x.is_none()) {
match (*map).position(|x| x.is_None()) {
Some(empty_index) => (*map).set_elt(empty_index, new_entry),
None => (*map).push(new_entry)
}
@ -1500,7 +1597,7 @@ unsafe fn local_data_set<T: Owned>(
local_set(rustrt::rust_get_task(), key, data)
}
/**
* Modify a task-local data value. If the function returns 'none', the
* Modify a task-local data value. If the function returns 'None', the
* data is removed (and its reference dropped).
*/
unsafe fn local_data_modify<T: Owned>(
@ -1583,7 +1680,7 @@ fn test_cant_dup_task_builder() {
// The following 8 tests test the following 2^3 combinations:
// {un,}linked {un,}supervised failure propagation {up,down}wards.
// !!! These tests are dangerous. If something is buggy, they will hang, !!!
// !!! These tests are dangerous. If Something is buggy, they will hang, !!!
// !!! instead of exiting cleanly. This might wedge the buildbots. !!!
#[test] #[ignore(cfg(windows))]
@ -2146,7 +2243,7 @@ fn test_tls_multitask() unsafe {
fn my_key(+_x: @~str) { }
local_data_set(my_key, @~"parent data");
do task::spawn unsafe {
assert local_data_get(my_key).is_none(); // TLS shouldn't carry over.
assert local_data_get(my_key).is_None(); // TLS shouldn't carry over.
local_data_set(my_key, @~"child data");
assert *(local_data_get(my_key).get()) == ~"child data";
// should be cleaned up for us
@ -2171,7 +2268,7 @@ fn test_tls_pop() unsafe {
local_data_set(my_key, @~"weasel");
assert *(local_data_pop(my_key).get()) == ~"weasel";
// Pop must remove the data from the map.
assert local_data_pop(my_key).is_none();
assert local_data_pop(my_key).is_None();
}
#[test]
@ -2198,7 +2295,7 @@ fn test_tls_crust_automorestack_memorial_bug() unsafe {
// This might result in a stack-canary clobber if the runtime fails to set
// sp_limit to 0 when calling the cleanup extern - it might automatically
// jump over to the rust stack, which causes next_c_sp to get recorded as
// something within a rust stack segment. Then a subsequent upcall (esp.
// Something within a rust stack segment. Then a subsequent upcall (esp.
// for logging, think vsnprintf) would run on a stack smaller than 1 MB.
fn my_key(+_x: @~str) { }
do task::spawn {