From 63e25946f0ae4e776a853ebd3429d0cd67f437e1 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Sat, 18 Aug 2012 08:05:27 -0700 Subject: [PATCH] core: port task.rs to comm::Chan/Port to pipes::Chan/Port --- src/libcore/task.rs | 173 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 135 insertions(+), 38 deletions(-) diff --git a/src/libcore/task.rs b/src/libcore/task.rs index 32ce963ebbb..b694118e86f 100644 --- a/src/libcore/task.rs +++ b/src/libcore/task.rs @@ -29,6 +29,7 @@ use cmp::Eq; use result::Result; +use pipes::{stream, Chan, Port}; export Task; export TaskResult; @@ -77,6 +78,10 @@ export ThreadPerTask; export ManualThreads; export PlatformThread; +macro_rules! move_it { + { $x:expr } => { unsafe { let y <- *ptr::addr_of($x); y } } +} + /* Data types */ /// A handle to a task @@ -203,8 +208,8 @@ type SchedOpts = { type TaskOpts = { linked: bool, supervised: bool, - notify_chan: Option>, - sched: Option, + mut notify_chan: Option>, + sched: option, }; /** @@ -214,7 +219,7 @@ type TaskOpts = { */ // NB: Builders are designed to be single-use because they do stateful // things that get weird when reusing - e.g. if you create a result future -// it only applies to a single task, so then you have to maintain some +// it only applies to a single task, so then you have to maintain Some // potentially tricky state to ensure that everything behaves correctly // when you try to reuse the builder to spawn a new task. We'll just // sidestep that whole issue by making builders uncopyable and making @@ -248,7 +253,22 @@ priv impl TaskBuilder { fail ~"Cannot copy a task_builder"; // Fake move mode on self } self.consumed = true; - TaskBuilder({ can_not_copy: None, mut consumed: false,.. *self }) + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; + TaskBuilder({ + opts: { + linked: self.opts.linked, + supervised: self.opts.supervised, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, + gen_body: self.gen_body, + can_not_copy: None, + mut consumed: false + }) } } @@ -258,8 +278,18 @@ impl TaskBuilder { * the other will not be killed. */ fn unlinked() -> TaskBuilder { + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ - opts: { linked: false,.. self.opts }, + opts: { + linked: false, + supervised: self.opts.supervised, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, can_not_copy: None, .. *self.consume() }) @@ -270,8 +300,18 @@ impl TaskBuilder { * the child. */ fn supervised() -> TaskBuilder { + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ - opts: { linked: false, supervised: true,.. self.opts }, + opts: { + linked: false, + supervised: true, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, can_not_copy: None, .. *self.consume() }) @@ -281,8 +321,18 @@ impl TaskBuilder { * other will be killed. */ fn linked() -> TaskBuilder { + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ - opts: { linked: true, supervised: false,.. self.opts }, + opts: { + linked: true, + supervised: false, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, can_not_copy: None, .. *self.consume() }) @@ -316,11 +366,10 @@ impl TaskBuilder { } // Construct the future and give it to the caller. - let po = comm::Port::(); - let ch = comm::Chan(po); + let (ch, po) = stream::(); blk(do future::from_fn { - match comm::recv(po) { + match po.recv() { Exit(_, result) => result } }); @@ -334,9 +383,18 @@ impl TaskBuilder { } /// Configure a custom scheduler mode for the task. fn sched_mode(mode: SchedMode) -> TaskBuilder { + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ - opts: { sched: Some({ mode: mode, foreign_stack_size: None}), - .. self.opts }, + opts: { + linked: self.opts.linked, + supervised: self.opts.supervised, + mut notify_chan: notify_chan, + sched: Some({ mode: mode, foreign_stack_size: None}) + }, can_not_copy: None, .. *self.consume() }) @@ -356,7 +414,18 @@ impl TaskBuilder { */ fn add_wrapper(wrapper: fn@(+fn~()) -> fn~()) -> TaskBuilder { let prev_gen_body = self.gen_body; + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; TaskBuilder({ + opts: { + linked: self.opts.linked, + supervised: self.opts.supervised, + mut notify_chan: notify_chan, + sched: self.opts.sched + }, gen_body: |body| { wrapper(prev_gen_body(body)) }, can_not_copy: None, .. *self.consume() @@ -377,7 +446,18 @@ impl TaskBuilder { */ fn spawn(+f: fn~()) { let x = self.consume(); - spawn_raw(x.opts, x.gen_body(f)); + let notify_chan = if self.opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut self.opts.notify_chan)) + }; + let opts = { + linked: x.opts.linked, + supervised: x.opts.supervised, + mut notify_chan: notify_chan, + sched: x.opts.sched + }; + spawn_raw(opts, x.gen_body(f)); } /// Runs a task, while transfering ownership of one argument to the child. fn spawn_with(+arg: A, +f: fn~(+A)) { @@ -394,7 +474,7 @@ impl TaskBuilder { * child task, passes the port to child's body, and returns a channel * linked to the port to the parent. * - * This encapsulates some boilerplate handshaking logic that would + * This encapsulates Some boilerplate handshaking logic that would * otherwise be required to establish communication from the parent * to the child. */ @@ -466,7 +546,7 @@ fn default_task_opts() -> TaskOpts { { linked: true, supervised: false, - notify_chan: None, + mut notify_chan: None, sched: None } } @@ -872,7 +952,7 @@ fn each_ancestor(list: &mut AncestorList, // 'do_continue' - Did the forward_blk succeed at this point? (i.e., // should we recurse? or should our callers unwind?) - // The map defaults to none, because if ancestors is none, we're at + // The map defaults to None, because if ancestors is None, we're at // the end of the list, which doesn't make sense to coalesce. return do (**ancestors).map_default((None,false)) |ancestor_arc| { // NB: Takes a lock! (this ancestor node) @@ -950,7 +1030,11 @@ fn each_ancestor(list: &mut AncestorList, struct TCB { me: *rust_task, // List of tasks with whose fates this one's is intertwined. +<<<<<<< HEAD tasks: TaskGroupArc, // 'none' means the group has failed. +======= + let tasks: TaskGroupArc; // 'None' means the group has failed. +>>>>>>> core: port task.rs to comm::Chan/Port to pipes::Chan/Port // Lists of tasks who will kill us if they fail, but whom we won't kill. mut ancestors: AncestorList, is_main: bool, @@ -995,15 +1079,15 @@ fn TCB(me: *rust_task, +tasks: TaskGroupArc, +ancestors: AncestorList, } struct AutoNotify { - notify_chan: comm::Chan, + notify_chan: Chan, mut failed: bool, drop { let result = if self.failed { Failure } else { Success }; - comm::send(self.notify_chan, Exit(get_task(), result)); + self.notify_chan.send(Exit(get_task(), result)); } } -fn AutoNotify(chan: comm::Chan) -> AutoNotify { +fn AutoNotify(chan: Chan) -> AutoNotify { AutoNotify { notify_chan: chan, failed: true // Un-set above when taskgroup successfully made. @@ -1013,7 +1097,7 @@ fn AutoNotify(chan: comm::Chan) -> AutoNotify { fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task, is_member: bool) -> bool { let newstate = util::replace(state, None); - // If 'none', the group was failing. Can't enlist. + // If 'None', the group was failing. Can't enlist. if newstate.is_some() { let group = option::unwrap(newstate); taskset_insert(if is_member { &mut group.members } @@ -1028,7 +1112,7 @@ fn enlist_in_taskgroup(state: TaskGroupInner, me: *rust_task, // NB: Runs in destructor/post-exit context. Can't 'fail'. fn leave_taskgroup(state: TaskGroupInner, me: *rust_task, is_member: bool) { let newstate = util::replace(state, None); - // If 'none', already failing and we've already gotten a kill signal. + // If 'None', already failing and we've already gotten a kill signal. if newstate.is_some() { let group = option::unwrap(newstate); taskset_remove(if is_member { &mut group.members } @@ -1048,9 +1132,9 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) { // To do it differently, we'd have to use the runtime's task refcounting, // but that could leave task structs around long after their task exited. let newstate = util::replace(state, None); - // Might already be none, if somebody is failing simultaneously. + // Might already be None, if Somebody is failing simultaneously. // That's ok; only one task needs to do the dirty work. (Might also - // see 'none' if somebody already failed and we got a kill signal.) + // see 'None' if Somebody already failed and we got a kill signal.) if newstate.is_some() { let group = option::unwrap(newstate); for taskset_each(&group.members) |+sibling| { @@ -1067,7 +1151,7 @@ fn kill_taskgroup(state: TaskGroupInner, me: *rust_task, is_main: bool) { if is_main { rustrt::rust_task_kill_all(me); } - // Do NOT restore state to Some(..)! It stays none to indicate + // Do NOT restore state to Some(..)! It stays None to indicate // that the whole taskgroup is failing, to forbid new spawns. } // (note: multiple tasks may reach this point) @@ -1145,7 +1229,7 @@ fn gen_child_taskgroup(linked: bool, supervised: bool) // Appease the borrow-checker. Really this wants to be written as: // match ancestors // Some(ancestor_arc) { ancestor_list(Some(ancestor_arc.clone())) } - // none { ancestor_list(none) } + // None { ancestor_list(None) } let tmp = util::replace(&mut **ancestors, None); if tmp.is_some() { let ancestor_arc = option::unwrap(tmp); @@ -1175,10 +1259,15 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) { }; assert !new_task.is_null(); // Getting killed after here would leak the task. + let mut notify_chan = if opts.notify_chan == None { + None + } else { + Some(option::swap_unwrap(&mut opts.notify_chan)) + }; let child_wrapper = make_child_wrapper(new_task, child_tg, ancestors, is_main, - opts.notify_chan, f); + notify_chan, f); let fptr = ptr::addr_of(child_wrapper); let closure: *rust_closure = unsafe::reinterpret_cast(&fptr); @@ -1198,17 +1287,25 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) { // (4) ...and runs the provided body function. fn make_child_wrapper(child: *rust_task, +child_arc: TaskGroupArc, +ancestors: AncestorList, is_main: bool, - notify_chan: Option>, + +notify_chan: Option>, +f: fn~()) -> fn~() { let child_data = ~mut Some((child_arc, ancestors)); - return fn~() { + return fn~(move notify_chan) { // Agh. Get move-mode items into the closure. FIXME (#2829) let mut (child_arc, ancestors) = option::swap_unwrap(child_data); // Child task runs this code. // Even if the below code fails to kick the child off, we must - // send something on the notify channel. - let notifier = notify_chan.map(|c| AutoNotify(c)); + // send Something on the notify channel. + + //let mut notifier = None;//notify_chan.map(|c| AutoNotify(c)); + let notifier = match notify_chan { + Some(notify_chan_value) => { + let moved_ncv = move_it!{notify_chan_value}; + Some(AutoNotify(moved_ncv)) + } + _ => None + }; if enlist_many(child, &child_arc, &mut ancestors) { let group = @TCB(child, child_arc, ancestors, @@ -1221,7 +1318,7 @@ fn spawn_raw(+opts: TaskOpts, +f: fn~()) { }; // Set up membership in taskgroup and descendantship in all ancestor - // groups. If any enlistment fails, some task was already failing, so + // groups. If any enlistment fails, Some task was already failing, so // don't let the child task run, and undo every successful enlistment. fn enlist_many(child: *rust_task, child_arc: &TaskGroupArc, ancestors: &mut AncestorList) -> bool { @@ -1387,7 +1484,7 @@ unsafe fn local_data_lookup( } ); do map_pos.map |index| { - // .get() is guaranteed because of "none { false }" above. + // .get() is guaranteed because of "None { false }" above. let (_, data_ptr, _) = (*map)[index].get(); (index, data_ptr) } @@ -1452,7 +1549,7 @@ unsafe fn local_set( } None => { // Find an empty slot. If not, grow the vector. - match (*map).position(|x| x.is_none()) { + match (*map).position(|x| x.is_None()) { Some(empty_index) => (*map).set_elt(empty_index, new_entry), None => (*map).push(new_entry) } @@ -1500,7 +1597,7 @@ unsafe fn local_data_set( local_set(rustrt::rust_get_task(), key, data) } /** - * Modify a task-local data value. If the function returns 'none', the + * Modify a task-local data value. If the function returns 'None', the * data is removed (and its reference dropped). */ unsafe fn local_data_modify( @@ -1583,7 +1680,7 @@ fn test_cant_dup_task_builder() { // The following 8 tests test the following 2^3 combinations: // {un,}linked {un,}supervised failure propagation {up,down}wards. -// !!! These tests are dangerous. If something is buggy, they will hang, !!! +// !!! These tests are dangerous. If Something is buggy, they will hang, !!! // !!! instead of exiting cleanly. This might wedge the buildbots. !!! #[test] #[ignore(cfg(windows))] @@ -2146,7 +2243,7 @@ fn test_tls_multitask() unsafe { fn my_key(+_x: @~str) { } local_data_set(my_key, @~"parent data"); do task::spawn unsafe { - assert local_data_get(my_key).is_none(); // TLS shouldn't carry over. + assert local_data_get(my_key).is_None(); // TLS shouldn't carry over. local_data_set(my_key, @~"child data"); assert *(local_data_get(my_key).get()) == ~"child data"; // should be cleaned up for us @@ -2171,7 +2268,7 @@ fn test_tls_pop() unsafe { local_data_set(my_key, @~"weasel"); assert *(local_data_pop(my_key).get()) == ~"weasel"; // Pop must remove the data from the map. - assert local_data_pop(my_key).is_none(); + assert local_data_pop(my_key).is_None(); } #[test] @@ -2198,7 +2295,7 @@ fn test_tls_crust_automorestack_memorial_bug() unsafe { // This might result in a stack-canary clobber if the runtime fails to set // sp_limit to 0 when calling the cleanup extern - it might automatically // jump over to the rust stack, which causes next_c_sp to get recorded as - // something within a rust stack segment. Then a subsequent upcall (esp. + // Something within a rust stack segment. Then a subsequent upcall (esp. // for logging, think vsnprintf) would run on a stack smaller than 1 MB. fn my_key(+_x: @~str) { } do task::spawn {