diff --git a/src/libstd/rt/comm.rs b/src/libstd/rt/comm.rs index b1533237b15..87bf5a23b93 100644 --- a/src/libstd/rt/comm.rs +++ b/src/libstd/rt/comm.rs @@ -12,13 +12,13 @@ use option::*; use cast; -use util; use ops::Drop; use rt::kill::BlockedTask; use kinds::Send; use rt::sched::Scheduler; use rt::local::Local; -use unstable::atomics::{AtomicUint, AtomicOption, Acquire, SeqCst}; +use rt::select::{Select, SelectPort}; +use unstable::atomics::{AtomicUint, AtomicOption, Acquire, Release, SeqCst}; use unstable::sync::UnsafeAtomicRcBox; use util::Void; use comm::{GenericChan, GenericSmartChan, GenericPort, Peekable}; @@ -76,6 +76,7 @@ pub fn oneshot() -> (PortOne, ChanOne) { } impl ChanOne { + #[inline] fn packet(&self) -> *mut Packet { unsafe { let p: *mut ~Packet = cast::transmute(&self.void_packet); @@ -141,7 +142,6 @@ impl ChanOne { } } - impl PortOne { fn packet(&self) -> *mut Packet { unsafe { @@ -162,46 +162,115 @@ impl PortOne { pub fn try_recv(self) -> Option { let mut this = self; - let packet = this.packet(); // Optimistic check. If data was sent already, we don't even need to block. // No release barrier needed here; we're not handing off our task pointer yet. - if unsafe { (*packet).state.load(Acquire) } != STATE_ONE { + if !this.optimistic_check() { // No data available yet. // Switch to the scheduler to put the ~Task into the Packet state. let sched = Local::take::(); do sched.deschedule_running_task_and_then |sched, task| { - unsafe { - // Atomically swap the task pointer into the Packet state, issuing - // an acquire barrier to prevent reordering of the subsequent read - // of the payload. Also issues a release barrier to prevent - // reordering of any previous writes to the task structure. - let task_as_state = task.cast_to_uint(); - let oldstate = (*packet).state.swap(task_as_state, SeqCst); - match oldstate { - STATE_BOTH => { - // Data has not been sent. Now we're blocked. - rtdebug!("non-rendezvous recv"); - sched.metrics.non_rendezvous_recvs += 1; - } - STATE_ONE => { - rtdebug!("rendezvous recv"); - sched.metrics.rendezvous_recvs += 1; - - // Channel is closed. Switch back and check the data. - // NB: We have to drop back into the scheduler event loop here - // instead of switching immediately back or we could end up - // triggering infinite recursion on the scheduler's stack. - let recvr = BlockedTask::cast_from_uint(task_as_state); - sched.enqueue_blocked_task(recvr); - } - _ => util::unreachable() - } - } + this.block_on(sched, task); } } // Task resumes. + this.recv_ready() + } +} + +impl Select for PortOne { + #[inline] + fn optimistic_check(&mut self) -> bool { + unsafe { (*self.packet()).state.load(Acquire) == STATE_ONE } + } + + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + unsafe { + // Atomically swap the task pointer into the Packet state, issuing + // an acquire barrier to prevent reordering of the subsequent read + // of the payload. Also issues a release barrier to prevent + // reordering of any previous writes to the task structure. + let task_as_state = task.cast_to_uint(); + let oldstate = (*self.packet()).state.swap(task_as_state, SeqCst); + match oldstate { + STATE_BOTH => { + // Data has not been sent. Now we're blocked. + rtdebug!("non-rendezvous recv"); + sched.metrics.non_rendezvous_recvs += 1; + false + } + STATE_ONE => { + // Re-record that we are the only owner of the packet. + // Release barrier needed in case the task gets reawoken + // on a different core (this is analogous to writing a + // payload; a barrier in enqueueing the task protects it). + // NB(#8132). This *must* occur before the enqueue below. + // FIXME(#6842, #8130) This is usually only needed for the + // assertion in recv_ready, except in the case of select(). + // This won't actually ever have cacheline contention, but + // maybe should be optimized out with a cfg(test) anyway? + (*self.packet()).state.store(STATE_ONE, Release); + + rtdebug!("rendezvous recv"); + sched.metrics.rendezvous_recvs += 1; + + // Channel is closed. Switch back and check the data. + // NB: We have to drop back into the scheduler event loop here + // instead of switching immediately back or we could end up + // triggering infinite recursion on the scheduler's stack. + let recvr = BlockedTask::cast_from_uint(task_as_state); + sched.enqueue_blocked_task(recvr); + true + } + _ => rtabort!("can't block_on; a task is already blocked") + } + } + } + + // This is the only select trait function that's not also used in recv. + fn unblock_from(&mut self) -> bool { + let packet = self.packet(); + unsafe { + // In case the data is available, the acquire barrier here matches + // the release barrier the sender used to release the payload. + match (*packet).state.load(Acquire) { + // Impossible. We removed STATE_BOTH when blocking on it, and + // no self-respecting sender would put it back. + STATE_BOTH => rtabort!("refcount already 2 in unblock_from"), + // Here, a sender already tried to wake us up. Perhaps they + // even succeeded! Data is available. + STATE_ONE => true, + // Still registered as blocked. Need to "unblock" the pointer. + task_as_state => { + // In the window between the load and the CAS, a sender + // might take the pointer and set the refcount to ONE. If + // that happens, we shouldn't clobber that with BOTH! + // Acquire barrier again for the same reason as above. + match (*packet).state.compare_and_swap(task_as_state, STATE_BOTH, + Acquire) { + STATE_BOTH => rtabort!("refcount became 2 in unblock_from"), + STATE_ONE => true, // Lost the race. Data available. + same_ptr => { + // We successfully unblocked our task pointer. + assert!(task_as_state == same_ptr); + let handle = BlockedTask::cast_from_uint(task_as_state); + // Because we are already awake, the handle we + // gave to this port shall already be empty. + handle.assert_already_awake(); + false + } + } + } + } + } + } +} + +impl SelectPort for PortOne { + fn recv_ready(self) -> Option { + let mut this = self; + let packet = this.packet(); // No further memory barrier is needed here to access the // payload. Some scenarios: @@ -213,8 +282,11 @@ impl PortOne { // 3) We encountered STATE_BOTH above and blocked, but the receiving task (this task) // is pinned to some other scheduler, so the sending task had to give us to // a different scheduler for resuming. That send synchronized memory. - unsafe { + // See corresponding store() above in block_on for rationale. + // FIXME(#8130) This can happen only in test builds. + assert!((*packet).state.load(Acquire) == STATE_ONE); + let payload = (*packet).payload.take(); // The sender has closed up shop. Drop the packet. @@ -234,7 +306,7 @@ impl Peekable for PortOne { match oldstate { STATE_BOTH => false, STATE_ONE => (*packet).payload.is_some(), - _ => util::unreachable() + _ => rtabort!("peeked on a blocked task") } } } @@ -368,6 +440,36 @@ impl Peekable for Port { } } +impl Select for Port { + #[inline] + fn optimistic_check(&mut self) -> bool { + do self.next.with_mut_ref |pone| { pone.optimistic_check() } + } + + #[inline] + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + let task = Cell::new(task); + do self.next.with_mut_ref |pone| { pone.block_on(sched, task.take()) } + } + + #[inline] + fn unblock_from(&mut self) -> bool { + do self.next.with_mut_ref |pone| { pone.unblock_from() } + } +} + +impl SelectPort<(T, Port)> for Port { + fn recv_ready(self) -> Option<(T, Port)> { + match self.next.take().recv_ready() { + Some(StreamPayload { val, next }) => { + self.next.put_back(next); + Some((val, self)) + } + None => None + } + } +} + pub struct SharedChan { // Just like Chan, but a shared AtomicOption instead of Cell priv next: UnsafeAtomicRcBox>> diff --git a/src/libstd/rt/kill.rs b/src/libstd/rt/kill.rs index 2bf4543df50..f7f11a402b8 100644 --- a/src/libstd/rt/kill.rs +++ b/src/libstd/rt/kill.rs @@ -106,8 +106,14 @@ impl Drop for KillFlag { // blocked task handle. So unblocking a task must restore that spare. unsafe fn revive_task_ptr(task_ptr: uint, spare_flag: Option) -> ~Task { let mut task: ~Task = cast::transmute(task_ptr); - rtassert!(task.death.spare_kill_flag.is_none()); - task.death.spare_kill_flag = spare_flag; + if task.death.spare_kill_flag.is_none() { + task.death.spare_kill_flag = spare_flag; + } else { + // A task's spare kill flag is not used for blocking in one case: + // when an unkillable task blocks on select. In this case, a separate + // one was created, which we now discard. + rtassert!(task.death.unkillable > 0); + } task } @@ -119,7 +125,7 @@ impl BlockedTask { Killable(flag_arc) => { let flag = unsafe { &mut **flag_arc.get() }; match flag.swap(KILL_RUNNING, SeqCst) { - KILL_RUNNING => rtabort!("tried to wake an already-running task"), + KILL_RUNNING => None, // woken from select(), perhaps KILL_KILLED => None, // a killer stole it already task_ptr => Some(unsafe { revive_task_ptr(task_ptr, Some(flag_arc)) }) @@ -162,6 +168,27 @@ impl BlockedTask { } } + /// Converts one blocked task handle to a list of many handles to the same. + pub fn make_selectable(self, num_handles: uint) -> ~[BlockedTask] { + let handles = match self { + Unkillable(task) => { + let flag = unsafe { KillFlag(AtomicUint::new(cast::transmute(task))) }; + UnsafeAtomicRcBox::newN(flag, num_handles) + } + Killable(flag_arc) => flag_arc.cloneN(num_handles), + }; + // Even if the task was unkillable before, we use 'Killable' because + // multiple pipes will have handles. It does not really mean killable. + handles.consume_iter().transform(|x| Killable(x)).collect() + } + + // This assertion has two flavours because the wake involves an atomic op. + // In the faster version, destructors will fail dramatically instead. + #[inline] #[cfg(not(test))] + pub fn assert_already_awake(self) { } + #[inline] #[cfg(test)] + pub fn assert_already_awake(self) { assert!(self.wake().is_none()); } + /// Convert to an unsafe uint value. Useful for storing in a pipe's state flag. #[inline] pub unsafe fn cast_to_uint(self) -> uint { diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 808d07ce77d..2ca7d01da49 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -142,6 +142,9 @@ pub mod tube; /// Simple reimplementation of core::comm pub mod comm; +/// Routines for select()ing on pipes. +pub mod select; + // FIXME #5248 shouldn't be pub /// The runtime needs to be able to put a pointer into thread-local storage. pub mod local_ptr; diff --git a/src/libstd/rt/select.rs b/src/libstd/rt/select.rs new file mode 100644 index 00000000000..130084fd1fc --- /dev/null +++ b/src/libstd/rt/select.rs @@ -0,0 +1,102 @@ +// Copyright 2013 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use option::*; +// use either::{Either, Left, Right}; +use rt::kill::BlockedTask; +use rt::sched::Scheduler; +use rt::local::Local; + +/// Trait for message-passing primitives that can be select()ed on. +pub trait Select { + // Returns true if data was available. + fn optimistic_check(&mut self) -> bool; + // Returns true if data was available. If so, shall also wake() the task. + fn block_on(&mut self, &mut Scheduler, BlockedTask) -> bool; + // Returns true if data was available. + fn unblock_from(&mut self) -> bool; +} + +/// Trait for message-passing primitives that can use the select2() convenience wrapper. +// (This is separate from the above trait to enable heterogeneous lists of ports +// that implement Select on different types to use select().) +pub trait SelectPort : Select { + fn recv_ready(self) -> Option; +} + +/// Receive a message from any one of many ports at once. +pub fn select(ports: &mut [A]) -> uint { + if ports.is_empty() { + fail!("can't select on an empty list"); + } + + for ports.mut_iter().enumerate().advance |(index, port)| { + if port.optimistic_check() { + return index; + } + } + + // If one of the ports already contains data when we go to block on it, we + // don't bother enqueueing on the rest of them, so we shouldn't bother + // unblocking from it either. This is just for efficiency, not correctness. + // (If not, we need to unblock from all of them. Length is a placeholder.) + let mut ready_index = ports.len(); + + let sched = Local::take::(); + do sched.deschedule_running_task_and_then |sched, task| { + let task_handles = task.make_selectable(ports.len()); + + for ports.mut_iter().zip(task_handles.consume_iter()).enumerate().advance + |(index, (port, task_handle))| { + // If one of the ports has data by now, it will wake the handle. + if port.block_on(sched, task_handle) { + ready_index = index; + break; + } + } + } + + // Task resumes. Now unblock ourselves from all the ports we blocked on. + // If the success index wasn't reset, 'take' will just take all of them. + // Iterate in reverse so the 'earliest' index that's ready gets returned. + for ports.mut_slice(0, ready_index).mut_rev_iter().enumerate().advance |(index, port)| { + if port.unblock_from() { + ready_index = index; + } + } + + assert!(ready_index < ports.len()); + return ready_index; +} + +/* FIXME(#5121, #7914) This all should be legal, but rust is not clever enough yet. + +impl <'self> Select for &'self mut Select { + fn optimistic_check(&mut self) -> bool { self.optimistic_check() } + fn block_on(&mut self, sched: &mut Scheduler, task: BlockedTask) -> bool { + self.block_on(sched, task) + } + fn unblock_from(&mut self) -> bool { self.unblock_from() } +} + +pub fn select2, TB, B: SelectPort>(mut a: A, mut b: B) + -> Either<(Option, B), (A, Option)> { + let result = { + let mut ports = [&mut a as &mut Select, &mut b as &mut Select]; + select(ports) + }; + match result { + 0 => Left ((a.recv_ready(), b)), + 1 => Right((a, b.recv_ready())), + x => fail!("impossible case in select2: %?", x) + } +} + +*/ diff --git a/src/libstd/unstable/sync.rs b/src/libstd/unstable/sync.rs index b6fc5b1f662..41a179c35b3 100644 --- a/src/libstd/unstable/sync.rs +++ b/src/libstd/unstable/sync.rs @@ -22,6 +22,7 @@ use unstable::finally::Finally; use ops::Drop; use clone::Clone; use kinds::Send; +use vec; /// An atomically reference counted pointer. /// @@ -41,26 +42,53 @@ struct AtomicRcBoxData { data: Option, } +unsafe fn new_inner(data: T, refcount: uint) -> *mut libc::c_void { + let data = ~AtomicRcBoxData { count: AtomicUint::new(refcount), + unwrapper: AtomicOption::empty(), + data: Some(data) }; + cast::transmute(data) +} + impl UnsafeAtomicRcBox { pub fn new(data: T) -> UnsafeAtomicRcBox { - unsafe { - let data = ~AtomicRcBoxData { count: AtomicUint::new(1), - unwrapper: AtomicOption::empty(), - data: Some(data) }; - let ptr = cast::transmute(data); - return UnsafeAtomicRcBox { data: ptr }; - } + unsafe { UnsafeAtomicRcBox { data: new_inner(data, 1) } } } /// As new(), but returns an extra pre-cloned handle. pub fn new2(data: T) -> (UnsafeAtomicRcBox, UnsafeAtomicRcBox) { unsafe { - let data = ~AtomicRcBoxData { count: AtomicUint::new(2), - unwrapper: AtomicOption::empty(), - data: Some(data) }; - let ptr = cast::transmute(data); - return (UnsafeAtomicRcBox { data: ptr }, - UnsafeAtomicRcBox { data: ptr }); + let ptr = new_inner(data, 2); + (UnsafeAtomicRcBox { data: ptr }, UnsafeAtomicRcBox { data: ptr }) + } + } + + /// As new(), but returns a vector of as many pre-cloned handles as requested. + pub fn newN(data: T, num_handles: uint) -> ~[UnsafeAtomicRcBox] { + unsafe { + if num_handles == 0 { + ~[] // need to free data here + } else { + let ptr = new_inner(data, num_handles); + vec::from_fn(num_handles, |_| UnsafeAtomicRcBox { data: ptr }) + } + } + } + + /// As newN(), but from an already-existing handle. Uses one xadd. + pub fn cloneN(self, num_handles: uint) -> ~[UnsafeAtomicRcBox] { + if num_handles == 0 { + ~[] // The "num_handles - 1" trick (below) fails in the 0 case. + } else { + unsafe { + let mut data: ~AtomicRcBoxData = cast::transmute(self.data); + // Minus one because we are recycling the given handle's refcount. + let old_count = data.count.fetch_add(num_handles - 1, Acquire); + // let old_count = data.count.fetch_add(num_handles, Acquire); + assert!(old_count >= 1); + let ptr = cast::transmute(data); + cast::forget(self); // Don't run the destructor on this handle. + vec::from_fn(num_handles, |_| UnsafeAtomicRcBox { data: ptr }) + } } }