diff --git a/src/doc/guide-tasks.md b/src/doc/guide-tasks.md index 387f481025d..9ff712df021 100644 --- a/src/doc/guide-tasks.md +++ b/src/doc/guide-tasks.md @@ -232,7 +232,7 @@ Instead we can use a `SharedChan`, a type that allows a single ~~~ # use std::task::spawn; -let (port, chan) = SharedChan::new(); +let (port, chan) = Chan::new(); for init_val in range(0u, 3) { // Create a new channel handle to distribute to the child task diff --git a/src/libextra/test.rs b/src/libextra/test.rs index d207bd2298b..9ebd91bdfb6 100644 --- a/src/libextra/test.rs +++ b/src/libextra/test.rs @@ -767,7 +767,7 @@ fn run_tests(opts: &TestOpts, remaining.reverse(); let mut pending = 0; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); while pending > 0 || !remaining.is_empty() { while pending < concurrency && !remaining.is_empty() { @@ -878,7 +878,7 @@ pub fn filter_tests( pub fn run_test(force_ignore: bool, test: TestDescAndFn, - monitor_ch: SharedChan) { + monitor_ch: Chan) { let TestDescAndFn {desc, testfn} = test; @@ -888,7 +888,7 @@ pub fn run_test(force_ignore: bool, } fn run_test_inner(desc: TestDesc, - monitor_ch: SharedChan, + monitor_ch: Chan, testfn: proc()) { spawn(proc() { let mut task = task::task(); @@ -1260,7 +1260,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert!(res != TrOk); @@ -1277,7 +1277,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrIgnored); @@ -1294,7 +1294,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrOk); @@ -1311,7 +1311,7 @@ mod tests { }, testfn: DynTestFn(proc() f()), }; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); run_test(false, desc, ch); let (_, res) = p.recv(); assert_eq!(res, TrFailed); diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index 495f6ead1ca..834bf7951ef 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -315,7 +315,7 @@ pub struct SchedPool { #[deriving(Clone)] struct TaskState { cnt: UnsafeArc, - done: SharedChan<()>, + done: Chan<()>, } impl SchedPool { @@ -469,7 +469,7 @@ impl SchedPool { impl TaskState { fn new() -> (Port<()>, TaskState) { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); (p, TaskState { cnt: UnsafeArc::new(AtomicUint::new(0)), done: c, diff --git a/src/libnative/io/mod.rs b/src/libnative/io/mod.rs index c94554f510e..69ef10ac11b 100644 --- a/src/libnative/io/mod.rs +++ b/src/libnative/io/mod.rs @@ -22,7 +22,6 @@ //! that you would find on the respective platform. use std::c_str::CString; -use std::comm::SharedChan; use std::io; use std::io::IoError; use std::io::net::ip::SocketAddr; @@ -289,7 +288,7 @@ impl rtio::IoFactory for IoFactory { }) } } - fn signal(&mut self, _signal: Signum, _channel: SharedChan) + fn signal(&mut self, _signal: Signum, _channel: Chan) -> IoResult<~RtioSignal> { Err(unimpl()) } diff --git a/src/libnative/io/timer_helper.rs b/src/libnative/io/timer_helper.rs index c00b0efadb5..2c976e67d25 100644 --- a/src/libnative/io/timer_helper.rs +++ b/src/libnative/io/timer_helper.rs @@ -33,7 +33,7 @@ use task; // only torn down after everything else has exited. This means that these // variables are read-only during use (after initialization) and both of which // are safe to use concurrently. -static mut HELPER_CHAN: *mut SharedChan = 0 as *mut SharedChan; +static mut HELPER_CHAN: *mut Chan = 0 as *mut Chan; static mut HELPER_SIGNAL: imp::signal = 0 as imp::signal; pub fn boot(helper: fn(imp::signal, Port)) { @@ -43,7 +43,9 @@ pub fn boot(helper: fn(imp::signal, Port)) { unsafe { LOCK.lock(); if !INITIALIZED { - let (msgp, msgc) = SharedChan::new(); + let (msgp, msgc) = Chan::new(); + // promote this to a shared channel + drop(msgc.clone()); HELPER_CHAN = cast::transmute(~msgc); let (receive, send) = imp::new(); HELPER_SIGNAL = send; @@ -84,8 +86,8 @@ fn shutdown() { // Clean up after ther helper thread unsafe { imp::close(HELPER_SIGNAL); - let _chan: ~SharedChan = cast::transmute(HELPER_CHAN); - HELPER_CHAN = 0 as *mut SharedChan; + let _chan: ~Chan = cast::transmute(HELPER_CHAN); + HELPER_CHAN = 0 as *mut Chan; HELPER_SIGNAL = 0 as imp::signal; } } diff --git a/src/librustuv/signal.rs b/src/librustuv/signal.rs index 2fcc61be79b..0a66c3445ee 100644 --- a/src/librustuv/signal.rs +++ b/src/librustuv/signal.rs @@ -10,7 +10,6 @@ use std::libc::c_int; use std::io::signal::Signum; -use std::comm::SharedChan; use std::rt::rtio::RtioSignal; use homing::{HomingIO, HomeHandle}; @@ -22,13 +21,13 @@ pub struct SignalWatcher { handle: *uvll::uv_signal_t, home: HomeHandle, - channel: SharedChan, + channel: Chan, signal: Signum, } impl SignalWatcher { pub fn new(io: &mut UvIoFactory, signum: Signum, - channel: SharedChan) -> Result<~SignalWatcher, UvError> { + channel: Chan) -> Result<~SignalWatcher, UvError> { let s = ~SignalWatcher { handle: UvHandle::alloc(None::, uvll::UV_SIGNAL), home: io.make_handle(), @@ -81,7 +80,7 @@ mod test { #[test] fn closing_channel_during_drop_doesnt_kill_everything() { // see issue #10375, relates to timers as well. - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let _signal = SignalWatcher::new(local_loop(), signal::Interrupt, chan); diff --git a/src/librustuv/uvio.rs b/src/librustuv/uvio.rs index 8a8ef4a41ec..54db4b4d3d1 100644 --- a/src/librustuv/uvio.rs +++ b/src/librustuv/uvio.rs @@ -10,7 +10,6 @@ use std::c_str::CString; use std::cast; -use std::comm::SharedChan; use std::io::IoError; use std::io::net::ip::SocketAddr; use std::io::process::ProcessConfig; @@ -304,7 +303,7 @@ impl IoFactory for UvIoFactory { } } - fn signal(&mut self, signum: Signum, channel: SharedChan) + fn signal(&mut self, signum: Signum, channel: Chan) -> Result<~rtio::RtioSignal, IoError> { match SignalWatcher::new(self, signum, channel) { Ok(s) => Ok(s as ~rtio::RtioSignal), diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 63d69100671..9b320dfe62b 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -15,17 +15,16 @@ //! communication between concurrent tasks. The primitives defined in this //! module are the building blocks for synchronization in rust. //! -//! This module currently provides three main types: +//! This module currently provides two types: //! //! * `Chan` //! * `Port` -//! * `SharedChan` //! -//! The `Chan` and `SharedChan` types are used to send data to a `Port`. A -//! `SharedChan` is clone-able such that many tasks can send simultaneously to -//! one receiving port. These communication primitives are *task blocking*, not -//! *thread blocking*. This means that if one task is blocked on a channel, -//! other tasks can continue to make progress. +//! `Chan` is used to send data to a `Port`. A `Chan` is clone-able such that +//! many tasks can send simultaneously to one receiving port. These +//! communication primitives are *task blocking*, not *thread blocking*. This +//! means that if one task is blocked on a channel, other tasks can continue to +//! make progress. //! //! Rust channels can be used as if they have an infinite internal buffer. What //! this means is that the `send` operation will never block. `Port`s, on the @@ -39,8 +38,8 @@ //! next operation `fail!`. The purpose of this is to allow propagation of //! failure among tasks that are linked to one another via channels. //! -//! There are methods on all of `Chan`, `SharedChan`, and `Port` to perform -//! their respective operations without failing, however. +//! There are methods on both of `Chan` and `Port` to perform their respective +//! operations without failing, however. //! //! ## Outside the Runtime //! @@ -66,7 +65,7 @@ //! assert_eq!(port.recv(), 10); //! //! // Create a shared channel which can be sent along from many tasks -//! let (port, chan) = SharedChan::new(); +//! let (port, chan) = Chan::new(); //! for i in range(0, 10) { //! let chan = chan.clone(); //! spawn(proc() { @@ -100,10 +99,22 @@ // // ## Flavors of channels // -// Rust channels come in two flavors: streams and shared channels. A stream has -// one sender and one receiver while a shared channel could have multiple -// senders. This choice heavily influences the design of the protocol set -// forth for both senders/receivers. +// From the perspective of a consumer of this library, there is only one flavor +// of channel. This channel can be used as a stream and cloned to allow multiple +// senders. Under the hood, however, there are actually three flavors of +// channels in play. +// +// * Oneshots - these channels are highly optimized for the one-send use case. +// They contain as few atomics as possible and involve one and +// exactly one allocation. +// * Streams - these channels are optimized for the non-shared use case. They +// use a different concurrent queue which is more tailored for this +// use case. The initial allocation of this flavor of channel is not +// optimized. +// * Shared - this is the most general form of channel that this module offers, +// a channel with multiple senders. This type is as optimized as it +// can be, but the previous two types mentioned are much faster for +// their use-cases. // // ## Concurrent queues // @@ -226,26 +237,20 @@ // here's the code for you to find some more! use cast; +use cell::Cell; use clone::Clone; -use container::Container; -use int; use iter::Iterator; -use kinds::marker; use kinds::Send; +use kinds::marker; use ops::Drop; -use option::{Option, Some, None}; -use result::{Ok, Err}; +use option::{Some, None, Option}; +use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; -use rt::thread::Thread; use sync::arc::UnsafeArc; -use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed}; -use vec::OwnedVector; +use util; -use spsc = sync::spsc_queue; -use mpsc = sync::mpsc_queue; - -pub use self::select::{Select, Handle}; +pub use comm::select::{Select, Handle}; macro_rules! test ( { fn $name:ident() $b:block $($a:attr)*} => ( @@ -273,15 +278,19 @@ macro_rules! test ( ) mod select; +mod oneshot; +mod stream; +mod shared; -/////////////////////////////////////////////////////////////////////////////// -// Public structs -/////////////////////////////////////////////////////////////////////////////// +// Use a power of 2 to allow LLVM to optimize to something that's not a +// division, this is hit pretty regularly. +static RESCHED_FREQ: int = 256; /// The receiving-half of Rust's channel type. This half can only be owned by /// one task pub struct Port { - priv inner: PortInner, + priv inner: Flavor, + priv receives: Cell, // can't share in an arc priv marker: marker::NoFreeze, } @@ -296,20 +305,12 @@ pub struct Messages<'a, T> { /// The sending-half of Rust's channel type. This half can only be owned by one /// task pub struct Chan { - priv inner: UnsafeArc>, + priv inner: Flavor, + priv sends: Cell, // can't share in an arc priv marker: marker::NoFreeze, } -/// The sending-half of Rust's channel type. This half can be shared among many -/// tasks by creating copies of itself through the `clone` method. -pub struct SharedChan { - // can't share in an arc -- technically this implementation is - // shareable, but it shouldn't be required to be shareable in an - // arc - priv marker: marker::NoFreeze, -} - /// This enumeration is the list of the possible reasons that try_recv could not /// return data when called. #[deriving(Eq, Clone)] @@ -324,221 +325,10 @@ pub enum TryRecvResult { Data(T), } -/////////////////////////////////////////////////////////////////////////////// -// Internal struct definitions -/////////////////////////////////////////////////////////////////////////////// - -enum PortInner { - Single(UnsafeArc>), - Shared(UnsafeArc>), -} - -struct SingleInner { - queue: spsc::Queue, - packet: Packet, -} - -struct SharedInner { - queue: mpsc::Queue, - packet: Packet, -} - -struct Packet { - cnt: AtomicInt, // How many items are on this channel - steals: int, // How many times has a port received without blocking? - to_wake: Option, // Task to wake up - - // The number of channels which are currently using this packet. This is - // used to reference count shared channels. - channels: AtomicInt, - - selecting: AtomicBool, - selection_id: uint, - select_next: *mut Packet, - select_prev: *mut Packet, - recv_cnt: int, -} - -/////////////////////////////////////////////////////////////////////////////// -// All implementations -- the fun part -/////////////////////////////////////////////////////////////////////////////// - -static DISCONNECTED: int = int::MIN; -static RESCHED_FREQ: int = 200; - -impl PortInner { - fn packet<'a>(&'a mut self) -> &'a mut Packet { - match *self { - Single(ref arc) => unsafe { &mut (*arc.get()).packet }, - Shared(ref arc) => unsafe { &mut (*arc.get()).packet }, - } - } -} - -impl Packet { - fn new() -> Packet { - Packet { - cnt: AtomicInt::new(0), - steals: 0, - to_wake: None, - channels: AtomicInt::new(1), - - selecting: AtomicBool::new(false), - selection_id: 0, - select_next: 0 as *mut Packet, - select_prev: 0 as *mut Packet, - recv_cnt: 0, - } - } - - // Increments the channel size count, preserving the disconnected state if - // the other end has disconnected. - fn increment(&mut self) -> int { - match self.cnt.fetch_add(1, SeqCst) { - DISCONNECTED => { - // see the comment in 'try' for a shared channel for why this - // window of "not disconnected" is "ok". - self.cnt.store(DISCONNECTED, SeqCst); - DISCONNECTED - } - n => n - } - } - - // Decrements the reference count of the channel, returning whether the task - // should block or not. This assumes that the task is ready to sleep in that - // the `to_wake` field has already been filled in. Once this decrement - // happens, the task could wake up on the other end. - // - // From an implementation perspective, this is also when our "steal count" - // gets merged into the "channel count". Our steal count is reset to 0 after - // this function completes. - // - // As with increment(), this preserves the disconnected state if the - // channel is disconnected. - fn decrement(&mut self) -> bool { - let steals = self.steals; - self.steals = 0; - match self.cnt.fetch_sub(1 + steals, SeqCst) { - DISCONNECTED => { - self.cnt.store(DISCONNECTED, SeqCst); - false - } - n => { - assert!(n >= 0); - n - steals <= 0 - } - } - } - - // Helper function for select, tests whether this port can receive without - // blocking (obviously not an atomic decision). - fn can_recv(&self) -> bool { - let cnt = self.cnt.load(SeqCst); - cnt == DISCONNECTED || cnt - self.steals > 0 - } - - // This function must have had at least an acquire fence before it to be - // properly called. - fn wakeup(&mut self) { - match self.to_wake.take_unwrap().wake() { - Some(task) => task.reawaken(), - None => {} - } - self.selecting.store(false, Relaxed); - } - - // Aborts the selection process for a port. This happens as part of select() - // once the task has reawoken. This will place the channel back into a - // consistent state which is ready to be received from again. - // - // The method of doing this is a little subtle. These channels have the - // invariant that if -1 is seen, then to_wake is always Some(..) and should - // be woken up. This aborting process at least needs to add 1 to the - // reference count, but that is not guaranteed to make the count positive - // (our steal count subtraction could mean that after the addition the - // channel count is still negative). - // - // In order to get around this, we force our channel count to go above 0 by - // adding a large number >= 1 to it. This way no sender will see -1 unless - // we are indeed blocking. This "extra lump" we took out of the channel - // becomes our steal count (which will get re-factored into the count on the - // next blocking recv) - // - // The return value of this method is whether there is data on this channel - // to receive or not. - fn abort_selection(&mut self, take_to_wake: bool) -> bool { - // make sure steals + 1 makes the count go non-negative - let steals = { - let cnt = self.cnt.load(SeqCst); - if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} - }; - let prev = self.cnt.fetch_add(steals + 1, SeqCst); - - // If we were previously disconnected, then we know for sure that there - // is no task in to_wake, so just keep going - if prev == DISCONNECTED { - assert!(self.to_wake.is_none()); - self.cnt.store(DISCONNECTED, SeqCst); - self.selecting.store(false, SeqCst); - true // there is data, that data is that we're disconnected - } else { - let cur = prev + steals + 1; - assert!(cur >= 0); - - // If the previous count was negative, then we just made things go - // positive, hence we passed the -1 boundary and we're responsible - // for removing the to_wake() field and trashing it. - if prev < 0 { - if take_to_wake { - self.to_wake.take_unwrap().trash(); - } else { - assert!(self.to_wake.is_none()); - } - - // We woke ourselves up, we're responsible for cancelling - assert!(self.selecting.load(Relaxed)); - self.selecting.store(false, Relaxed); - } - assert_eq!(self.steals, 0); - self.steals = steals; - - // if we were previously positive, then there's surely data to - // receive - prev >= 0 - } - } - - // Decrement the reference count on a channel. This is called whenever a - // Chan is dropped and may end up waking up a receiver. It's the receiver's - // responsibility on the other end to figure out that we've disconnected. - unsafe fn drop_chan(&mut self) { - match self.channels.fetch_sub(1, SeqCst) { - 1 => { - match self.cnt.swap(DISCONNECTED, SeqCst) { - -1 => { self.wakeup(); } - DISCONNECTED => {} - n => { assert!(n >= 0); } - } - } - n if n > 1 => {}, - n => fail!("bad number of channels left {}", n), - } - } -} - -impl Drop for Packet { - fn drop(&mut self) { - unsafe { - // Note that this load is not only an assert for correctness about - // disconnection, but also a proper fence before the read of - // `to_wake`, so this assert cannot be removed with also removing - // the `to_wake` assert. - assert_eq!(self.cnt.load(SeqCst), DISCONNECTED); - assert!(self.to_wake.is_none()); - assert_eq!(self.channels.load(SeqCst), 0); - } - } +enum Flavor { + Oneshot(UnsafeArc>), + Stream(UnsafeArc>), + Shared(UnsafeArc>), } impl Chan { @@ -546,14 +336,12 @@ impl Chan { /// will become available on the port as well. See the documentation of /// `Port` and `Chan` to see what's possible with them. pub fn new() -> (Port, Chan) { - // arbitrary 128 size cache -- this is just a max cache size, not a - // maximum buffer size - let (a, b) = UnsafeArc::new2(SingleInner { - queue: spsc::Queue::new(128), - packet: Packet::new(), - }); - (Port { inner: Single(a), marker: marker::NoFreeze }, - Chan { inner: b, marker: marker::NoFreeze }) + let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); + (Port::my_new(Oneshot(a)), Chan::my_new(Oneshot(b))) + } + + fn my_new(inner: Flavor) -> Chan { + Chan { inner: inner, sends: Cell::new(0), marker: marker::NoFreeze } } /// Sends a value along this channel to be received by the corresponding @@ -595,132 +383,105 @@ impl Chan { /// Like `send`, this method will never block. If the failure of send cannot /// be tolerated, then this method should be used instead. pub fn try_send(&self, t: T) -> bool { - unsafe { - let inner = self.inner.get(); - (*inner).queue.push(t); - match (*inner).packet.increment() { - // As described above, -1 == wakeup - -1 => { (*inner).packet.wakeup(); true } - // Also as above, SPSC queues must be >= -2 - -2 => true, - // We succeeded if we sent data - DISCONNECTED => (*inner).queue.is_empty(), - // In order to prevent starvation of other tasks in situations - // where a task sends repeatedly without ever receiving, we - // occassionally yield instead of doing a send immediately. - // Only doing this if we're doing a rescheduling send, otherwise - // the caller is expecting not to context switch. - // - // Note that we don't unconditionally attempt to yield because - // the TLS overhead can be a bit much. - n => { - assert!(n >= 0); - if n > 0 && n % RESCHED_FREQ == 0 { - let task: ~Task = Local::take(); - task.maybe_yield(); + // In order to prevent starvation of other tasks in situations where + // a task sends repeatedly without ever receiving, we occassionally + // yield instead of doing a send immediately. Only doing this if + // we're doing a rescheduling send, otherwise the caller is + // expecting not to context switch. + // + // Note that we don't unconditionally attempt to yield because the + // TLS overhead can be a bit much. + let cnt = self.sends.get() + 1; + self.sends.set(cnt); + if cnt % (RESCHED_FREQ as uint) == 0 { + let task: ~Task = Local::take(); + task.maybe_yield(); + } + + let (new_inner, ret) = match self.inner { + Oneshot(ref p) => { + let p = p.get(); + unsafe { + if !(*p).sent() { + return (*p).send(t); + } else { + let (a, b) = UnsafeArc::new2(stream::Packet::new()); + match (*p).upgrade(Port::my_new(Stream(b))) { + oneshot::UpSuccess => { + (*a.get()).send(t); + (a, true) + } + oneshot::UpDisconnected => (a, false), + oneshot::UpWoke(task) => { + (*a.get()).send(t); + task.wake().map(|t| t.reawaken()); + (a, true) + } + } } - true } } + Stream(ref p) => return unsafe { (*p.get()).send(t) }, + Shared(ref p) => return unsafe { (*p.get()).send(t) }, + }; + + unsafe { + let mut tmp = Chan::my_new(Stream(new_inner)); + util::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } + return ret; + } +} + +impl Clone for Chan { + fn clone(&self) -> Chan { + let (packet, sleeper) = match self.inner { + Oneshot(ref p) => { + let (a, b) = UnsafeArc::new2(shared::Packet::new()); + match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } { + oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), + oneshot::UpWoke(task) => (b, Some(task)) + } + } + Stream(ref p) => { + let (a, b) = UnsafeArc::new2(shared::Packet::new()); + match unsafe { (*p.get()).upgrade(Port::my_new(Shared(a))) } { + stream::UpSuccess | stream::UpDisconnected => (b, None), + stream::UpWoke(task) => (b, Some(task)), + } + } + Shared(ref p) => { + unsafe { (*p.get()).clone_chan(); } + return Chan::my_new(Shared(p.clone())); + } + }; + + unsafe { + (*packet.get()).inherit_blocker(sleeper); + + let mut tmp = Chan::my_new(Shared(packet.clone())); + util::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); + } + Chan::my_new(Shared(packet)) } } #[unsafe_destructor] impl Drop for Chan { fn drop(&mut self) { - unsafe { (*self.inner.get()).packet.drop_chan(); } - } -} - -impl SharedChan { - /// Creates a new shared channel and port pair. The purpose of a shared - /// channel is to be cloneable such that many tasks can send data at the - /// same time. All data sent on any channel will become available on the - /// provided port as well. - pub fn new() -> (Port, SharedChan) { - let (a, b) = UnsafeArc::new2(SharedInner { - queue: mpsc::Queue::new(), - packet: Packet::new(), - }); - (Port { inner: Shared(a), marker: marker::NoFreeze }, - SharedChan { inner: b, marker: marker::NoFreeze }) - (Port { inner: Shared(a) }, SharedChan { inner: b }) - } - - /// Equivalent method to `send` on the `Chan` type (using the same - /// semantics) - pub fn send(&self, t: T) { - if !self.try_send(t) { - fail!("sending on a closed channel"); + match self.inner { + Oneshot(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Stream(ref mut p) => unsafe { (*p.get()).drop_chan(); }, + Shared(ref mut p) => unsafe { (*p.get()).drop_chan(); }, } } - - /// Equivalent method to `try_send` on the `Chan` type (using the same - /// semantics) - pub fn try_send(&self, t: T) -> bool { - unsafe { - // Note that the multiple sender case is a little tricker - // semantically than the single sender case. The logic for - // incrementing is "add and if disconnected store disconnected". - // This could end up leading some senders to believe that there - // wasn't a disconnect if in fact there was a disconnect. This means - // that while one thread is attempting to re-store the disconnected - // states, other threads could walk through merrily incrementing - // this very-negative disconnected count. To prevent senders from - // spuriously attempting to send when the channels is actually - // disconnected, the count has a ranged check here. - // - // This is also done for another reason. Remember that the return - // value of this function is: - // - // `true` == the data *may* be received, this essentially has no - // meaning - // `false` == the data will *never* be received, this has a lot of - // meaning - // - // In the SPSC case, we have a check of 'queue.is_empty()' to see - // whether the data was actually received, but this same condition - // means nothing in a multi-producer context. As a result, this - // preflight check serves as the definitive "this will never be - // received". Once we get beyond this check, we have permanently - // entered the realm of "this may be received" - let inner = self.inner.get(); - if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 { - return false - } - - (*inner).queue.push(t); - match (*inner).packet.increment() { - DISCONNECTED => {} // oh well, we tried - -1 => { (*inner).packet.wakeup(); } - n => { - if n > 0 && n % RESCHED_FREQ == 0 { - let task: ~Task = Local::take(); - task.maybe_yield(); - } - } - } - true - } - } -} - -impl Clone for SharedChan { - fn clone(&self) -> SharedChan { - unsafe { (*self.inner.get()).packet.channels.fetch_add(1, SeqCst); } - SharedChan { inner: self.inner.clone(), marker: marker::NoFreeze } - } -} - -#[unsafe_destructor] -impl Drop for SharedChan { - fn drop(&mut self) { - unsafe { (*self.inner.get()).packet.drop_chan(); } - } } impl Port { + fn my_new(inner: Flavor) -> Port { + Port { inner: inner, receives: Cell::new(0), marker: marker::NoFreeze } + } + /// Blocks waiting for a value on this port /// /// This function will block if necessary to wait for a corresponding send @@ -758,100 +519,45 @@ impl Port { /// /// This function cannot fail. pub fn try_recv(&self) -> TryRecvResult { - self.try_recv_inc(true) - } - - fn try_recv_inc(&self, increment: bool) -> TryRecvResult { - // This is a "best effort" situation, so if a queue is inconsistent just - // don't worry about it. - let this = unsafe { cast::transmute_mut(self) }; - - // See the comment about yielding on sends, but the same applies here. - // If a thread is spinning in try_recv we should try - { - let packet = this.inner.packet(); - packet.recv_cnt += 1; - if packet.recv_cnt % RESCHED_FREQ == 0 { - let task: ~Task = Local::take(); - task.maybe_yield(); - } + // If a thread is spinning in try_recv, we should take the opportunity + // to reschedule things occasionally. See notes above in scheduling on + // sends for why this doesn't always hit TLS. + let cnt = self.receives.get() + 1; + self.receives.set(cnt); + if cnt % (RESCHED_FREQ as uint) == 0 { + let task: ~Task = Local::take(); + task.maybe_yield(); } - let ret = match this.inner { - Single(ref mut arc) => unsafe { (*arc.get()).queue.pop() }, - Shared(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } { - mpsc::Data(t) => Some(t), - mpsc::Empty => None, - - // This is a bit of an interesting case. The channel is - // reported as having data available, but our pop() has - // failed due to the queue being in an inconsistent state. - // This means that there is some pusher somewhere which has - // yet to complete, but we are guaranteed that a pop will - // eventually succeed. In this case, we spin in a yield loop - // because the remote sender should finish their enqueue - // operation "very quickly". - // - // Note that this yield loop does *not* attempt to do a green - // yield (regardless of the context), but *always* performs an - // OS-thread yield. The reasoning for this is that the pusher in - // question which is causing the inconsistent state is - // guaranteed to *not* be a blocked task (green tasks can't get - // pre-empted), so it must be on a different OS thread. Also, - // `try_recv` is normally a "guaranteed no rescheduling" context - // in a green-thread situation. By yielding control of the - // thread, we will hopefully allow time for the remote task on - // the other OS thread to make progress. - // - // Avoiding this yield loop would require a different queue - // abstraction which provides the guarantee that after M - // pushes have succeeded, at least M pops will succeed. The - // current queues guarantee that if there are N active - // pushes, you can pop N times once all N have finished. - mpsc::Inconsistent => { - let data; - loop { - Thread::yield_now(); - match unsafe { (*arc.get()).queue.pop() } { - mpsc::Data(t) => { data = t; break } - mpsc::Empty => fail!("inconsistent => empty"), - mpsc::Inconsistent => {} - } + loop { + let mut new_port = match self.inner { + Oneshot(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Data(t), + Err(oneshot::Empty) => return Empty, + Err(oneshot::Disconnected) => return Disconnected, + Err(oneshot::Upgraded(port)) => port, } - Some(data) } - } - }; - if increment && ret.is_some() { - this.inner.packet().steals += 1; - } - match ret { - Some(t) => Data(t), - None => { - // It's possible that between the time that we saw the queue was - // empty and here the other side disconnected. It's also - // possible for us to see the disconnection here while there is - // data in the queue. It's pretty backwards-thinking to return - // Disconnected when there's actually data on the queue, so if - // we see a disconnected state be sure to check again to be 100% - // sure that there's no data in the queue. - let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) }; - if cnt != DISCONNECTED { return Empty } - - let ret = match this.queue { - SPSC(ref mut queue) => queue.pop(), - MPSC(ref mut queue) => match queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty => None, - mpsc::Inconsistent => { - fail!("inconsistent with no senders?!"); - } + Stream(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Data(t), + Err(stream::Empty) => return Empty, + Err(stream::Disconnected) => return Disconnected, + Err(stream::Upgraded(port)) => port, } - }; - match ret { - Some(data) => Data(data), - None => Disconnected, } + Shared(ref p) => { + match unsafe { (*p.get()).try_recv() } { + Ok(t) => return Data(t), + Err(shared::Empty) => return Empty, + Err(shared::Disconnected) => return Disconnected, + } + } + }; + unsafe { + util::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); } } } @@ -869,34 +575,36 @@ impl Port { /// If the channel has hung up, then `None` is returned. Otherwise `Some` of /// the value found on the port is returned. pub fn recv_opt(&self) -> Option { - // optimistic preflight check (scheduling is expensive) - match self.try_recv() { - Empty => {}, - Disconnected => return None, - Data(t) => return Some(t), - } - - let packet; - let this; - unsafe { - this = cast::transmute_mut(self); - packet = this.inner.packet(); - let task: ~Task = Local::take(); - task.deschedule(1, |task| { - assert!((*packet).to_wake.is_none()); - (*packet).to_wake = Some(task); - if (*packet).decrement() { - Ok(()) - } else { - Err((*packet).to_wake.take_unwrap()) + loop { + let mut new_port = match self.inner { + Oneshot(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Some(t), + Err(oneshot::Empty) => return unreachable!(), + Err(oneshot::Disconnected) => return None, + Err(oneshot::Upgraded(port)) => port, + } } - }); - } - - match self.try_recv_inc(false) { - Data(t) => Some(t), - Empty => fail!("bug: woke up too soon"), - Disconnected => None, + Stream(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Some(t), + Err(stream::Empty) => return unreachable!(), + Err(stream::Disconnected) => return None, + Err(stream::Upgraded(port)) => port, + } + } + Shared(ref p) => { + match unsafe { (*p.get()).recv() } { + Ok(t) => return Some(t), + Err(shared::Empty) => return unreachable!(), + Err(shared::Disconnected) => return None, + } + } + }; + unsafe { + util::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); + } } } @@ -907,6 +615,84 @@ impl Port { } } +impl select::Packet for Port { + fn can_recv(&self) -> bool { + loop { + let mut new_port = match self.inner { + Oneshot(ref p) => { + match unsafe { (*p.get()).can_recv() } { + Ok(ret) => return ret, + Err(upgrade) => upgrade, + } + } + Stream(ref p) => { + match unsafe { (*p.get()).can_recv() } { + Ok(ret) => return ret, + Err(upgrade) => upgrade, + } + } + Shared(ref p) => { + return unsafe { (*p.get()).can_recv() }; + } + }; + unsafe { + util::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); + } + } + } + + fn start_selection(&self, mut task: BlockedTask) -> Result<(), BlockedTask>{ + loop { + let (t, mut new_port) = match self.inner { + Oneshot(ref p) => { + match unsafe { (*p.get()).start_selection(task) } { + oneshot::SelSuccess => return Ok(()), + oneshot::SelCanceled(task) => return Err(task), + oneshot::SelUpgraded(t, port) => (t, port), + } + } + Stream(ref p) => { + match unsafe { (*p.get()).start_selection(task) } { + stream::SelSuccess => return Ok(()), + stream::SelCanceled(task) => return Err(task), + stream::SelUpgraded(t, port) => (t, port), + } + } + Shared(ref p) => { + return unsafe { (*p.get()).start_selection(task) }; + } + }; + task = t; + unsafe { + util::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); + } + } + } + + fn abort_selection(&self) -> bool { + let mut was_upgrade = false; + loop { + let result = match self.inner { + Oneshot(ref p) => unsafe { (*p.get()).abort_selection() }, + Stream(ref p) => unsafe { + (*p.get()).abort_selection(was_upgrade) + }, + Shared(ref p) => return unsafe { + (*p.get()).abort_selection(was_upgrade) + }, + }; + let mut new_port = match result { Ok(b) => return b, Err(p) => p }; + was_upgrade = true; + unsafe { + util::swap(&mut cast::transmute_mut(self).inner, + &mut new_port.inner); + } + } + } +} + impl<'a, T: Send> Iterator for Messages<'a, T> { fn next(&mut self) -> Option { self.port.recv_opt() } } @@ -914,10 +700,11 @@ impl<'a, T: Send> Iterator for Messages<'a, T> { #[unsafe_destructor] impl Drop for Port { fn drop(&mut self) { - // All we need to do is store that we're disconnected. If the channel - // half has already disconnected, then we'll just deallocate everything - // when the shared packet is deallocated. - self.inner.packet().cnt.store(DISCONNECTED, SeqCst); + match self.inner { + Oneshot(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Stream(ref mut p) => unsafe { (*p.get()).drop_port(); }, + Shared(ref mut p) => unsafe { (*p.get()).drop_port(); }, + } } } @@ -948,12 +735,12 @@ mod test { }) test!(fn drop_full_shared() { - let (_p, c) = SharedChan::new(); + let (_p, c) = Chan::new(); c.send(~1); }) test!(fn smoke_shared() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); c.send(1); assert_eq!(p.recv(), 1); let c = c.clone(); @@ -976,13 +763,13 @@ mod test { } #[should_fail]) test!(fn smoke_shared_port_gone() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); drop(p); c.send(1); } #[should_fail]) test!(fn smoke_shared_port_gone2() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); drop(p); let c2 = c.clone(); drop(c); @@ -998,7 +785,7 @@ mod test { } #[should_fail]) test!(fn port_gone_concurrent_shared() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); let c1 = c.clone(); spawn(proc() { p.recv(); @@ -1016,7 +803,7 @@ mod test { } #[should_fail]) test!(fn smoke_chan_gone_shared() { - let (p, c) = SharedChan::<()>::new(); + let (p, c) = Chan::<()>::new(); let c2 = c.clone(); drop(c); drop(c2); @@ -1045,7 +832,7 @@ mod test { test!(fn stress_shared() { static AMT: uint = 10000; static NTHREADS: uint = 8; - let (p, c) = SharedChan::::new(); + let (p, c) = Chan::::new(); let (p1, c1) = Chan::new(); spawn(proc() { @@ -1072,7 +859,7 @@ mod test { fn send_from_outside_runtime() { let (p, c) = Chan::::new(); let (p1, c1) = Chan::new(); - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let chan2 = chan.clone(); spawn(proc() { c1.send(()); @@ -1112,7 +899,7 @@ mod test { fn no_runtime() { let (p1, c1) = Chan::::new(); let (p2, c2) = Chan::::new(); - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let chan2 = chan.clone(); native::task::spawn(proc() { assert_eq!(p1.recv(), 1); @@ -1315,7 +1102,7 @@ mod test { }) test!(fn shared_chan_stress() { - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let total = stress_factor() + 100; for _ in range(0, total) { let chan_clone = chan.clone(); @@ -1394,4 +1181,26 @@ mod test { p2.recv(); assert_eq!(p.try_recv(), Disconnected); }) + + // This bug used to end up in a livelock inside of the Port destructor + // because the internal state of the Shared port was corrupted + test!(fn destroy_upgraded_shared_port_when_sender_still_active() { + let (p, c) = Chan::new(); + let (p1, c2) = Chan::new(); + spawn(proc() { + p.recv(); // wait on a oneshot port + drop(p); // destroy a shared port + c2.send(()); + }); + // make sure the other task has gone to sleep + for _ in range(0, 5000) { task::deschedule(); } + + // upgrade to a shared chan and send a message + let t = c.clone(); + drop(c); + t.send(()); + + // wait for the child task to exit before we exit + p1.recv(); + }) } diff --git a/src/libstd/comm/oneshot.rs b/src/libstd/comm/oneshot.rs new file mode 100644 index 00000000000..e58405ebe2d --- /dev/null +++ b/src/libstd/comm/oneshot.rs @@ -0,0 +1,382 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Oneshot channels/ports +/// +/// This is the initial flavor of channels/ports used for comm module. This is +/// an optimization for the one-use case of a channel. The major optimization of +/// this type is to have one and exactly one allocation when the chan/port pair +/// is created. +/// +/// Another possible optimization would be to not use an UnsafeArc box because +/// in theory we know when the shared packet can be deallocated (no real need +/// for the atomic reference counting), but I was having trouble how to destroy +/// the data early in a drop of a Port. +/// +/// # Implementation +/// +/// Oneshots are implemented around one atomic uint variable. This variable +/// indicates both the state of the port/chan but also contains any tasks +/// blocked on the port. All atomic operations happen on this one word. +/// +/// In order to upgrade a oneshot channel, an upgrade is considered a disconnect +/// on behalf of the channel side of things (it can be mentally thought of as +/// consuming the port). This upgrade is then also stored in the shared packet. +/// The one caveat to consider is that when a port sees a disconnected channel +/// it must check for data because there is no "data plus upgrade" state. + +use comm::Port; +use kinds::Send; +use ops::Drop; +use option::{Some, None, Option}; +use result::{Result, Ok, Err}; +use rt::local::Local; +use rt::task::{Task, BlockedTask}; +use sync::atomics; +use util; + +// Various states you can find a port in. +static EMPTY: uint = 0; +static DATA: uint = 1; +static DISCONNECTED: uint = 2; + +pub struct Packet { + // Internal state of the chan/port pair (stores the blocked task as well) + state: atomics::AtomicUint, + // One-shot data slot location + data: Option, + // when used for the second time, a oneshot channel must be upgraded, and + // this contains the slot for the upgrade + upgrade: MyUpgrade, +} + +pub enum Failure { + Empty, + Disconnected, + Upgraded(Port), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(BlockedTask), +} + +pub enum SelectionResult { + SelCanceled(BlockedTask), + SelUpgraded(BlockedTask, Port), + SelSuccess, +} + +enum MyUpgrade { + NothingSent, + SendUsed, + GoUp(Port), +} + +impl Packet { + pub fn new() -> Packet { + Packet { + data: None, + upgrade: NothingSent, + state: atomics::AtomicUint::new(EMPTY), + } + } + + pub fn send(&mut self, t: T) -> bool { + // Sanity check + match self.upgrade { + NothingSent => {} + _ => fail!("sending on a oneshot that's already sent on "), + } + assert!(self.data.is_none()); + self.data = Some(t); + self.upgrade = SendUsed; + + // This atomic swap uses a "Release" memory ordering to ensure that all + // our previous memory writes are visible to the other thread (notably + // the write of data/upgrade) + match self.state.swap(DATA, atomics::Release) { + // Sent the data, no one was waiting + EMPTY => true, + + // Couldn't send the data, the port hung up first. We need to be + // sure to deallocate the sent data (to not leave it stuck in the + // queue) + DISCONNECTED => { + self.data.take_unwrap(); + false + } + + // Not possible, these are one-use channels + DATA => unreachable!(), + + // Anything else means that there was a task waiting on the other + // end. We leave the 'DATA' state inside so it'll pick it up on the + // other end. + n => unsafe { + let t = BlockedTask::cast_from_uint(n); + t.wake().map(|t| t.reawaken()); + true + } + } + } + + // Just tests whether this channel has been sent on or not, this is only + // safe to use from the sender. + pub fn sent(&self) -> bool { + match self.upgrade { + NothingSent => false, + _ => true, + } + } + + pub fn recv(&mut self) -> Result> { + // Attempt to not block the task (it's a little expensive). If it looks + // like we're not empty, then immediately go through to `try_recv`. + // + // These atomics use an Acquire memory ordering in order to have all the + // previous writes of the releasing thread visible to us. + if self.state.load(atomics::Acquire) == EMPTY { + let t: ~Task = Local::take(); + t.deschedule(1, |task| { + let n = unsafe { task.cast_to_uint() }; + match self.state.compare_and_swap(EMPTY, n, atomics::Acquire) { + // Nothing on the channel, we legitimately block + EMPTY => Ok(()), + + // If there's data or it's a disconnected channel, then we + // failed the cmpxchg, so we just wake ourselves back up + DATA | DISCONNECTED => { + unsafe { Err(BlockedTask::cast_from_uint(n)) } + } + + // Only one thread is allowed to sleep on this port + _ => unreachable!() + } + }); + } + + self.try_recv() + } + + pub fn try_recv(&mut self) -> Result> { + // see above for why Acquire is used. + match self.state.load(atomics::Acquire) { + EMPTY => Err(Empty), + + // We saw some data on the channel, but the channel can be used + // again to send us an upgrade. As a result, we need to re-insert + // into the channel that there's no data available (otherwise we'll + // just see DATA next time). This is done as a cmpxchg because if + // the state changes under our feet we'd rather just see that state + // change. + DATA => { + self.state.compare_and_swap(DATA, EMPTY, atomics::Acquire); + match self.data.take() { + Some(data) => Ok(data), + None => unreachable!(), + } + } + + // There's no guarantee that we receive before an upgrade happens, + // and an upgrade flags the channel as disconnected, so when we see + // this we first need to check if there's data available and *then* + // we go through and process the upgrade. + DISCONNECTED => { + match self.data.take() { + Some(data) => Ok(data), + None => { + match util::replace(&mut self.upgrade, SendUsed) { + SendUsed | NothingSent => Err(Disconnected), + GoUp(upgrade) => Err(Upgraded(upgrade)) + } + } + } + } + _ => unreachable!() + } + } + + // Returns whether the upgrade was completed. If the upgrade wasn't + // completed, then the port couldn't get sent to the other half (it will + // never receive it). + pub fn upgrade(&mut self, up: Port) -> UpgradeResult { + let prev = match self.upgrade { + NothingSent => NothingSent, + SendUsed => SendUsed, + _ => fail!("upgrading again"), + }; + self.upgrade = GoUp(up); + + // Use a Release memory ordering in order to make sure that our write to + // `upgrade` is visible to the other thread. + match self.state.swap(DISCONNECTED, atomics::Release) { + // If the channel is empty or has data on it, then we're good to go. + // Senders will check the data before the upgrade (in case we + // plastered over the DATA state). + DATA | EMPTY => UpSuccess, + + // If the other end is already disconnected, then we failed the + // upgrade. Be sure to trash the port we were given. + DISCONNECTED => { self.upgrade = prev; UpDisconnected } + + // If someone's waiting, we gotta wake them up + n => UpWoke(unsafe { BlockedTask::cast_from_uint(n) }) + } + } + + pub fn drop_chan(&mut self) { + match self.state.swap(DISCONNECTED, atomics::SeqCst) { + DATA | DISCONNECTED | EMPTY => {} + + // If someone's waiting, we gotta wake them up + n => unsafe { + let t = BlockedTask::cast_from_uint(n); + t.wake().map(|t| t.reawaken()); + } + } + } + + pub fn drop_port(&mut self) { + // Use an Acquire memory ordering in order to see the data that the + // senders are sending. + match self.state.swap(DISCONNECTED, atomics::Acquire) { + // An empty channel has nothing to do, and a remotely disconnected + // channel also has nothing to do b/c we're about to run the drop + // glue + DISCONNECTED | EMPTY => {} + + // There's data on the channel, so make sure we destroy it promptly. + // This is why not using an arc is a little difficult (need the box + // to stay valid while we take the data). + DATA => { self.data.take_unwrap(); } + + // We're the only ones that can block on this port + _ => unreachable!() + } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // If Ok, the value is whether this port has data, if Err, then the upgraded + // port needs to be checked instead of this one. + pub fn can_recv(&mut self) -> Result> { + // Use Acquire so we can see all previous memory writes + match self.state.load(atomics::Acquire) { + EMPTY => Ok(false), // Welp, we tried + DATA => Ok(true), // we have some un-acquired data + DISCONNECTED if self.data.is_some() => Ok(true), // we have data + DISCONNECTED => { + match util::replace(&mut self.upgrade, SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => Err(upgrade), + + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { self.upgrade = up; Ok(true) } + } + } + _ => unreachable!(), // we're the "one blocker" + } + } + + // Attempts to start selection on this port. This can either succeed, fail + // because there is data, or fail because there is an upgrade pending. + pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult { + let n = unsafe { task.cast_to_uint() }; + match self.state.compare_and_swap(EMPTY, n, atomics::SeqCst) { + EMPTY => SelSuccess, + DATA => SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }), + DISCONNECTED if self.data.is_some() => { + SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }) + } + DISCONNECTED => { + match util::replace(&mut self.upgrade, SendUsed) { + // The other end sent us an upgrade, so we need to + // propagate upwards whether the upgrade can receive + // data + GoUp(upgrade) => { + SelUpgraded(unsafe { BlockedTask::cast_from_uint(n) }, + upgrade) + } + + // If the other end disconnected without sending an + // upgrade, then we have data to receive (the channel is + // disconnected). + up => { + self.upgrade = up; + SelCanceled(unsafe { BlockedTask::cast_from_uint(n) }) + } + } + } + _ => unreachable!(), // we're the "one blocker" + } + } + + // Remove a previous selecting task from this port. This ensures that the + // blocked task will no longer be visible to any other threads. + // + // The return value indicates whether there's data on this port. + pub fn abort_selection(&mut self) -> Result> { + // use Acquire to make sure we see all previous memory writes + let state = match self.state.load(atomics::Acquire) { + // Each of these states means that no further activity will happen + // with regard to abortion selection + s @ EMPTY | + s @ DATA | + s @ DISCONNECTED => s, + + // If we've got a blocked task, then use an atomic to gain ownership + // of it (may fail) + n => self.state.compare_and_swap(n, EMPTY, atomics::SeqCst) + }; + + // Now that we've got ownership of our state, figure out what to do + // about it. + match state { + EMPTY => unreachable!(), + // our task used for select was stolen + DATA => Ok(true), + + // If the other end has hung up, then we have complete ownership + // of the port. We need to check to see if there was an upgrade + // requested, and if so, the other end needs to have its selection + // aborted. + DISCONNECTED => { + assert!(self.data.is_none()); + match util::replace(&mut self.upgrade, SendUsed) { + GoUp(port) => Err(port), + _ => Ok(true), + } + } + + // We woke ourselves up from select. Assert that the task should be + // trashed and returne that we don't have any data. + n => { + let t = unsafe { BlockedTask::cast_from_uint(n) }; + t.trash(); + Ok(false) + } + } + } +} + +#[unsafe_destructor] +impl Drop for Packet { + fn drop(&mut self) { + assert_eq!(self.state.load(atomics::SeqCst), DISCONNECTED); + } +} diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index cf8df863817..b6b35ccc357 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -45,19 +45,17 @@ #[allow(dead_code)]; use cast; -use comm; +use cell::Cell; use iter::Iterator; use kinds::marker; use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use result::{Ok, Err}; +use result::{Ok, Err, Result}; use rt::local::Local; -use rt::task::Task; -use super::{Packet, Port}; -use sync::atomics::{Relaxed, SeqCst}; -use task; +use rt::task::{Task, BlockedTask}; +use super::Port; use uint; macro_rules! select { @@ -67,8 +65,12 @@ macro_rules! select { ) => ({ use std::comm::Select; let sel = Select::new(); - let mut $port1 = sel.add(&mut $port1); - $( let mut $port = sel.add(&mut $port); )* + let mut $port1 = sel.handle(&$port1); + $( let mut $port = sel.handle(&$port); )* + unsafe { + $port1.add(); + $( $port.add(); )* + } let ret = sel.wait(); if ret == $port1.id { let $name1 = $port1.$meth1(); $code1 } $( else if ret == $port.id { let $name = $port.$meth(); $code } )* @@ -79,9 +81,9 @@ macro_rules! select { /// The "port set" of the select interface. This structure is used to manage a /// set of ports which are being selected over. pub struct Select { - priv head: *mut Packet, - priv tail: *mut Packet, - priv next_id: uint, + priv head: *mut Handle<'static, ()>, + priv tail: *mut Handle<'static, ()>, + priv next_id: Cell, priv marker1: marker::NoSend, priv marker2: marker::NoFreeze, } @@ -90,13 +92,28 @@ pub struct Select { /// This handle is used to keep the port in the set as well as interact with the /// underlying port. pub struct Handle<'port, T> { - /// A unique ID for this Handle. + /// The ID of this handle, used to compare against the return value of + /// `Select::wait()` id: uint, priv selector: &'port Select, - priv port: &'port mut Port, + priv next: *mut Handle<'static, ()>, + priv prev: *mut Handle<'static, ()>, + priv added: bool, + priv packet: &'port Packet, + + // due to our fun transmutes, we be sure to place this at the end. (nothing + // previous relies on T) + priv port: &'port Port, } -struct Packets { cur: *mut Packet } +struct Packets { cur: *mut Handle<'static, ()> } + +#[doc(hidden)] +pub trait Packet { + fn can_recv(&self) -> bool; + fn start_selection(&self, task: BlockedTask) -> Result<(), BlockedTask>; + fn abort_selection(&self) -> bool; +} impl Select { /// Creates a new selection structure. This set is initially empty and @@ -106,45 +123,29 @@ impl Select { /// rather much easier through the `select!` macro. pub fn new() -> Select { Select { - head: 0 as *mut Packet, - tail: 0 as *mut Packet, - next_id: 1, marker1: marker::NoSend, marker2: marker::NoFreeze, + head: 0 as *mut Handle<'static, ()>, + tail: 0 as *mut Handle<'static, ()>, + next_id: Cell::new(1), } } - /// Adds a new port to this set, returning a handle which is then used to - /// receive on the port. - /// - /// Note that this port parameter takes `&mut Port` instead of `&Port`. None - /// of the methods of receiving on a port require `&mut self`, but `&mut` is - /// used here in order to have the compiler guarantee that the same port is - /// not added to this set more than once. - /// - /// When the returned handle falls out of scope, the port will be removed - /// from this set. While the handle is in this set, usage of the port can be - /// done through the `Handle`'s receiving methods. - pub fn add<'a, T: Send>(&'a self, port: &'a mut Port) -> Handle<'a, T> { - let this = unsafe { cast::transmute_mut(self) }; - let id = this.next_id; - this.next_id += 1; - unsafe { - let packet = port.inner.packet(); - assert!(!(*packet).selecting.load(Relaxed)); - assert_eq!((*packet).selection_id, 0); - (*packet).selection_id = id; - if this.head.is_null() { - this.head = packet as *mut Packet; - this.tail = packet as *mut Packet; - } else { - (*packet).select_prev = this.tail; - assert!((*packet).select_next.is_null()); - (*this.tail).select_next = packet as *mut Packet; - this.tail = packet as *mut Packet; - } + /// Creates a new handle into this port set for a new port. Note that this + /// does *not* add the port to the port set, for that you must call the + /// `add` method on the handle itself. + pub fn handle<'a, T: Send>(&'a self, port: &'a Port) -> Handle<'a, T> { + let id = self.next_id.get(); + self.next_id.set(id + 1); + Handle { + id: id, + selector: self, + next: 0 as *mut Handle<'static, ()>, + prev: 0 as *mut Handle<'static, ()>, + added: false, + port: port, + packet: port, } - Handle { id: id, selector: this, port: port } } /// Waits for an event on this port set. The returned valus is *not* and @@ -177,10 +178,9 @@ impl Select { unsafe { let mut amt = 0; for p in self.iter() { - assert!(!(*p).selecting.load(Relaxed)); amt += 1; - if (*p).can_recv() { - return (*p).selection_id; + if (*p).packet.can_recv() { + return (*p).id; } } assert!(amt > 0); @@ -195,22 +195,14 @@ impl Select { let task: ~Task = Local::take(); task.deschedule(amt, |task| { // Prepare for the block - let (i, packet) = iter.next().unwrap(); - assert!((*packet).to_wake.is_none()); - (*packet).to_wake = Some(task); - (*packet).selecting.store(true, SeqCst); - - if (*packet).decrement() { - Ok(()) - } else { - // Empty to_wake first to avoid tripping an assertion in - // abort_selection in the disconnected case. - let task = (*packet).to_wake.take_unwrap(); - (*packet).abort_selection(false); - (*packet).selecting.store(false, SeqCst); - ready_index = i; - ready_id = (*packet).selection_id; - Err(task) + let (i, handle) = iter.next().unwrap(); + match (*handle).packet.start_selection(task) { + Ok(()) => Ok(()), + Err(task) => { + ready_index = i; + ready_id = (*handle).id; + Err(task) + } } }); @@ -235,45 +227,17 @@ impl Select { // A rewrite should focus on avoiding a yield loop, and for now this // implementation is tying us over to a more efficient "don't // iterate over everything every time" implementation. - for packet in self.iter().take(ready_index) { - if (*packet).abort_selection(true) { - ready_id = (*packet).selection_id; - while (*packet).selecting.load(Relaxed) { - task::deschedule(); - } + for handle in self.iter().take(ready_index) { + if (*handle).packet.abort_selection() { + ready_id = (*handle).id; } } - // Sanity check for now to make sure that everyone is turned off. - for packet in self.iter() { - assert!(!(*packet).selecting.load(Relaxed)); - } - assert!(ready_id != uint::MAX); return ready_id; } } - unsafe fn remove(&self, packet: *mut Packet) { - let this = cast::transmute_mut(self); - assert!(!(*packet).selecting.load(Relaxed)); - if (*packet).select_prev.is_null() { - assert_eq!(packet, this.head); - this.head = (*packet).select_next; - } else { - (*(*packet).select_prev).select_next = (*packet).select_next; - } - if (*packet).select_next.is_null() { - assert_eq!(packet, this.tail); - this.tail = (*packet).select_prev; - } else { - (*(*packet).select_next).select_prev = (*packet).select_prev; - } - (*packet).select_next = 0 as *mut Packet; - (*packet).select_prev = 0 as *mut Packet; - (*packet).selection_id = 0; - } - fn iter(&self) -> Packets { Packets { cur: self.head } } } @@ -285,10 +249,56 @@ impl<'port, T: Send> Handle<'port, T> { /// success or `None` if the channel disconnects. This function has the same /// semantics as `Port.recv_opt` pub fn recv_opt(&mut self) -> Option { self.port.recv_opt() } - /// Immediately attempt to receive a value on a port, this function will - /// never block. Has the same semantics as `Port.try_recv`. - pub fn try_recv(&mut self) -> comm::TryRecvResult { - self.port.try_recv() + + /// Adds this handle to the port set that the handle was created from. This + /// method can be called multiple times, but it has no effect if `add` was + /// called previously. + /// + /// This method is unsafe because it requires that the `Handle` is not moved + /// while it is added to the `Select` set. + pub unsafe fn add(&mut self) { + if self.added { return } + let selector: &mut Select = cast::transmute(&*self.selector); + let me: *mut Handle<'static, ()> = cast::transmute(&*self); + + if selector.head.is_null() { + selector.head = me; + selector.tail = me; + } else { + (*me).prev = selector.tail; + assert!((*me).next.is_null()); + (*selector.tail).next = me; + selector.tail = me; + } + self.added = true; + } + + /// Removes this handle from the `Select` set. This method is unsafe because + /// it has no guarantee that the `Handle` was not moved since `add` was + /// called. + pub unsafe fn remove(&mut self) { + if !self.added { return } + + let selector: &mut Select = cast::transmute(&*self.selector); + let me: *mut Handle<'static, ()> = cast::transmute(&*self); + + if self.prev.is_null() { + assert_eq!(selector.head, me); + selector.head = self.next; + } else { + (*self.prev).next = self.next; + } + if self.next.is_null() { + assert_eq!(selector.tail, me); + selector.tail = self.prev; + } else { + (*self.next).prev = self.prev; + } + + self.next = 0 as *mut Handle<'static, ()>; + self.prev = 0 as *mut Handle<'static, ()>; + + self.added = false; } } @@ -303,17 +313,17 @@ impl Drop for Select { #[unsafe_destructor] impl<'port, T: Send> Drop for Handle<'port, T> { fn drop(&mut self) { - unsafe { self.selector.remove(self.port.inner.packet()) } + unsafe { self.remove() } } } -impl Iterator<*mut Packet> for Packets { - fn next(&mut self) -> Option<*mut Packet> { +impl Iterator<*mut Handle<'static, ()>> for Packets { + fn next(&mut self) -> Option<*mut Handle<'static, ()>> { if self.cur.is_null() { None } else { let ret = Some(self.cur); - unsafe { self.cur = (*self.cur).select_next; } + unsafe { self.cur = (*self.cur).next; } ret } } @@ -326,8 +336,8 @@ mod test { use prelude::*; test!(fn smoke() { - let (mut p1, c1) = Chan::::new(); - let (mut p2, c2) = Chan::::new(); + let (p1, c1) = Chan::::new(); + let (p2, c2) = Chan::::new(); c1.send(1); select! ( foo = p1.recv() => { assert_eq!(foo, 1); }, @@ -350,11 +360,11 @@ mod test { }) test!(fn smoke2() { - let (mut p1, _c1) = Chan::::new(); - let (mut p2, _c2) = Chan::::new(); - let (mut p3, _c3) = Chan::::new(); - let (mut p4, _c4) = Chan::::new(); - let (mut p5, c5) = Chan::::new(); + let (p1, _c1) = Chan::::new(); + let (p2, _c2) = Chan::::new(); + let (p3, _c3) = Chan::::new(); + let (p4, _c4) = Chan::::new(); + let (p5, c5) = Chan::::new(); c5.send(4); select! ( _foo = p1.recv() => { fail!("1") }, @@ -366,8 +376,8 @@ mod test { }) test!(fn closed() { - let (mut p1, _c1) = Chan::::new(); - let (mut p2, c2) = Chan::::new(); + let (p1, _c1) = Chan::::new(); + let (p2, c2) = Chan::::new(); drop(c2); select! ( @@ -377,8 +387,8 @@ mod test { }) test!(fn unblocks() { - let (mut p1, c1) = Chan::::new(); - let (mut p2, _c2) = Chan::::new(); + let (p1, c1) = Chan::::new(); + let (p2, _c2) = Chan::::new(); let (p3, c3) = Chan::::new(); spawn(proc() { @@ -400,8 +410,8 @@ mod test { }) test!(fn both_ready() { - let (mut p1, c1) = Chan::::new(); - let (mut p2, c2) = Chan::::new(); + let (p1, c1) = Chan::::new(); + let (p2, c2) = Chan::::new(); let (p3, c3) = Chan::<()>::new(); spawn(proc() { @@ -426,8 +436,8 @@ mod test { test!(fn stress() { static AMT: int = 10000; - let (mut p1, c1) = Chan::::new(); - let (mut p2, c2) = Chan::::new(); + let (p1, c1) = Chan::::new(); + let (p2, c2) = Chan::::new(); let (p3, c3) = Chan::<()>::new(); spawn(proc() { @@ -449,4 +459,66 @@ mod test { c3.send(()); } }) + + test!(fn cloning() { + let (p1, c1) = Chan::::new(); + let (p2, _c2) = Chan::::new(); + let (p3, c3) = Chan::<()>::new(); + + spawn(proc() { + p3.recv(); + c1.clone(); + assert_eq!(p3.try_recv(), Empty); + c1.send(2); + p3.recv(); + }); + + c3.send(()); + select!( + _i1 = p1.recv() => {}, + _i2 = p2.recv() => fail!() + ) + c3.send(()); + }) + + test!(fn cloning2() { + let (p1, c1) = Chan::::new(); + let (p2, _c2) = Chan::::new(); + let (p3, c3) = Chan::<()>::new(); + + spawn(proc() { + p3.recv(); + c1.clone(); + assert_eq!(p3.try_recv(), Empty); + c1.send(2); + p3.recv(); + }); + + c3.send(()); + select!( + _i1 = p1.recv() => {}, + _i2 = p2.recv() => fail!() + ) + c3.send(()); + }) + + test!(fn cloning3() { + let (p1, c1) = Chan::<()>::new(); + let (p2, c2) = Chan::<()>::new(); + let (p, c) = Chan::new(); + spawn(proc() { + let mut s = Select::new(); + let mut h1 = s.handle(&p1); + let mut h2 = s.handle(&p2); + unsafe { h2.add(); } + unsafe { h1.add(); } + assert_eq!(s.wait(), h2.id); + c.send(()); + }); + + for _ in range(0, 1000) { task::deschedule(); } + drop(c1.clone()); + c2.send(()); + p.recv(); + }) } diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs new file mode 100644 index 00000000000..30e061bb7b9 --- /dev/null +++ b/src/libstd/comm/shared.rs @@ -0,0 +1,483 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Shared channels +/// +/// This is the flavor of channels which are not necessarily optimized for any +/// particular use case, but are the most general in how they are used. Shared +/// channels are cloneable allowing for multiple senders. +/// +/// High level implementation details can be found in the comment of the parent +/// module. You'll also note that the implementation of the shared and stream +/// channels are quite similar, and this is no coincidence! + +use int; +use iter::Iterator; +use kinds::Send; +use ops::Drop; +use option::{Some, None, Option}; +use result::{Ok, Err, Result}; +use rt::local::Local; +use rt::task::{Task, BlockedTask}; +use rt::thread::Thread; +use sync::atomics; +use unstable::mutex::Mutex; +use vec::OwnedVector; + +use mpsc = sync::mpsc_queue; + +static DISCONNECTED: int = int::MIN; +static FUDGE: int = 1024; +static MAX_STEALS: int = 1 << 20; + +pub struct Packet { + queue: mpsc::Queue, + cnt: atomics::AtomicInt, // How many items are on this channel + steals: int, // How many times has a port received without blocking? + to_wake: atomics::AtomicUint, // Task to wake up + + // The number of channels which are currently using this packet. + channels: atomics::AtomicInt, + + // See the discussion in Port::drop and the channel send methods for what + // these are used for + port_dropped: atomics::AtomicBool, + sender_drain: atomics::AtomicInt, + + // this lock protects various portions of this implementation during + // select() + select_lock: Mutex, +} + +pub enum Failure { + Empty, + Disconnected, +} + +impl Packet { + // Creation of a packet *must* be followed by a call to inherit_blocker + pub fn new() -> Packet { + let mut p = Packet { + queue: mpsc::Queue::new(), + cnt: atomics::AtomicInt::new(0), + steals: 0, + to_wake: atomics::AtomicUint::new(0), + channels: atomics::AtomicInt::new(2), + port_dropped: atomics::AtomicBool::new(false), + sender_drain: atomics::AtomicInt::new(0), + select_lock: unsafe { Mutex::new() }, + }; + // see comments in inherit_blocker about why we grab this lock + unsafe { p.select_lock.lock() } + return p; + } + + // This function is used at the creation of a shared packet to inherit a + // previously blocked task. This is done to prevent spurious wakeups of + // tasks in select(). + // + // This can only be called at channel-creation time + pub fn inherit_blocker(&mut self, task: Option) { + match task { + Some(task) => { + assert_eq!(self.cnt.load(atomics::SeqCst), 0); + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + self.to_wake.store(unsafe { task.cast_to_uint() }, + atomics::SeqCst); + self.cnt.store(-1, atomics::SeqCst); + + // This store is a little sketchy. What's happening here is + // that we're transferring a blocker from a oneshot or stream + // channel to this shared channel. In doing so, we never + // spuriously wake them up and rather only wake them up at the + // appropriate time. This implementation of shared channels + // assumes that any blocking recv() will undo the increment of + // steals performed in try_recv() once the recv is complete. + // This thread that we're inheriting, however, is not in the + // middle of recv. Hence, the first time we wake them up, + // they're going to wake up from their old port, move on to the + // upgraded port, and then call the block recv() function. + // + // When calling this function, they'll find there's data + // immediately available, counting it as a steal. This in fact + // wasn't a steal because we appropriately blocked them waiting + // for data. + // + // To offset this bad increment, we initially set the steal + // count to -1. You'll find some special code in + // abort_selection() as well to ensure that this -1 steal count + // doesn't escape too far. + self.steals = -1; + } + None => {} + } + + // When the shared packet is constructed, we grabbed this lock. The + // purpose of this lock is to ensure that abort_selection() doesn't + // interfere with this method. After we unlock this lock, we're + // signifying that we're done modifying self.cnt and self.to_wake and + // the port is ready for the world to continue using it. + unsafe { self.select_lock.unlock() } + } + + pub fn send(&mut self, t: T) -> bool { + // See Port::drop for what's going on + if self.port_dropped.load(atomics::SeqCst) { return false } + + // Note that the multiple sender case is a little tricker + // semantically than the single sender case. The logic for + // incrementing is "add and if disconnected store disconnected". + // This could end up leading some senders to believe that there + // wasn't a disconnect if in fact there was a disconnect. This means + // that while one thread is attempting to re-store the disconnected + // states, other threads could walk through merrily incrementing + // this very-negative disconnected count. To prevent senders from + // spuriously attempting to send when the channels is actually + // disconnected, the count has a ranged check here. + // + // This is also done for another reason. Remember that the return + // value of this function is: + // + // `true` == the data *may* be received, this essentially has no + // meaning + // `false` == the data will *never* be received, this has a lot of + // meaning + // + // In the SPSC case, we have a check of 'queue.is_empty()' to see + // whether the data was actually received, but this same condition + // means nothing in a multi-producer context. As a result, this + // preflight check serves as the definitive "this will never be + // received". Once we get beyond this check, we have permanently + // entered the realm of "this may be received" + if self.cnt.load(atomics::SeqCst) < DISCONNECTED + FUDGE { + return false + } + + self.queue.push(t); + match self.cnt.fetch_add(1, atomics::SeqCst) { + -1 => { + self.take_to_wake().wake().map(|t| t.reawaken()); + } + + // In this case, we have possibly failed to send our data, and + // we need to consider re-popping the data in order to fully + // destroy it. We must arbitrate among the multiple senders, + // however, because the queues that we're using are + // single-consumer queues. In order to do this, all exiting + // pushers will use an atomic count in order to count those + // flowing through. Pushers who see 0 are required to drain as + // much as possible, and then can only exit when they are the + // only pusher (otherwise they must try again). + n if n < DISCONNECTED + FUDGE => { + // see the comment in 'try' for a shared channel for why this + // window of "not disconnected" is ok. + self.cnt.store(DISCONNECTED, atomics::SeqCst); + + if self.sender_drain.fetch_add(1, atomics::SeqCst) == 0 { + loop { + // drain the queue, for info on the thread yield see the + // discussion in try_recv + loop { + match self.queue.pop() { + mpsc::Data(..) => {} + mpsc::Empty => break, + mpsc::Inconsistent => Thread::yield_now(), + } + } + // maybe we're done, if we're not the last ones + // here, then we need to go try again. + if self.sender_drain.fetch_sub(1, atomics::SeqCst) == 1 { + break + } + } + + // At this point, there may still be data on the queue, + // but only if the count hasn't been incremented and + // some other sender hasn't finished pushing data just + // yet. That sender in question will drain its own data. + } + } + + // Can't make any assumptions about this case like in the SPSC case. + _ => {} + } + + true + } + + pub fn recv(&mut self) -> Result { + // This code is essentially the exact same as that found in the stream + // case (see stream.rs) + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + let task: ~Task = Local::take(); + task.deschedule(1, |task| { + self.decrement(task) + }); + + match self.try_recv() { + data @ Ok(..) => { self.steals -= 1; data } + data => data, + } + } + + // Essentially the exact same thing as the stream decrement function. + fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> { + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + let n = unsafe { task.cast_to_uint() }; + self.to_wake.store(n, atomics::SeqCst); + + let steals = self.steals; + self.steals = 0; + + match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Ok(()) } + } + } + + self.to_wake.store(0, atomics::SeqCst); + Err(unsafe { BlockedTask::cast_from_uint(n) }) + } + + pub fn try_recv(&mut self) -> Result { + let ret = match self.queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + + // This is a bit of an interesting case. The channel is + // reported as having data available, but our pop() has + // failed due to the queue being in an inconsistent state. + // This means that there is some pusher somewhere which has + // yet to complete, but we are guaranteed that a pop will + // eventually succeed. In this case, we spin in a yield loop + // because the remote sender should finish their enqueue + // operation "very quickly". + // + // Note that this yield loop does *not* attempt to do a green + // yield (regardless of the context), but *always* performs an + // OS-thread yield. The reasoning for this is that the pusher in + // question which is causing the inconsistent state is + // guaranteed to *not* be a blocked task (green tasks can't get + // pre-empted), so it must be on a different OS thread. Also, + // `try_recv` is normally a "guaranteed no rescheduling" context + // in a green-thread situation. By yielding control of the + // thread, we will hopefully allow time for the remote task on + // the other OS thread to make progress. + // + // Avoiding this yield loop would require a different queue + // abstraction which provides the guarantee that after M + // pushes have succeeded, at least M pops will succeed. The + // current queues guarantee that if there are N active + // pushes, you can pop N times once all N have finished. + mpsc::Inconsistent => { + let data; + loop { + Thread::yield_now(); + match self.queue.pop() { + mpsc::Data(t) => { data = t; break } + mpsc::Empty => fail!("inconsistent => empty"), + mpsc::Inconsistent => {} + } + } + Some(data) + } + }; + match ret { + // See the discussion in the stream implementation for why we we + // might decrement steals. + Some(data) => { + self.steals += 1; + if self.steals > MAX_STEALS { + match self.cnt.swap(0, atomics::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomics::SeqCst); + } + n => { self.steals -= n; } + } + assert!(self.steals >= 0); + } + Ok(data) + } + + // See the discussion in the stream implementation for why we try + // again. + None => { + match self.cnt.load(atomics::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + _ => { + match self.queue.pop() { + mpsc::Data(t) => Ok(t), + mpsc::Empty => Err(Disconnected), + // with no senders, an inconsistency is impossible. + mpsc::Inconsistent => unreachable!(), + } + } + } + } + } + } + + // Prepares this shared packet for a channel clone, essentially just bumping + // a refcount. + pub fn clone_chan(&mut self) { + self.channels.fetch_add(1, atomics::SeqCst); + } + + // Decrement the reference count on a channel. This is called whenever a + // Chan is dropped and may end up waking up a receiver. It's the receiver's + // responsibility on the other end to figure out that we've disconnected. + pub fn drop_chan(&mut self) { + match self.channels.fetch_sub(1, atomics::SeqCst) { + 1 => {} + n if n > 1 => return, + n => fail!("bad number of channels left {}", n), + } + + match self.cnt.swap(DISCONNECTED, atomics::SeqCst) { + -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); } + DISCONNECTED => {} + n => { assert!(n >= 0); } + } + } + + // See the long discussion inside of stream.rs for why the queue is drained, + // and why it is done in this fashion. + pub fn drop_port(&mut self) { + self.port_dropped.store(true, atomics::SeqCst); + let mut steals = self.steals; + while { + let cnt = self.cnt.compare_and_swap( + steals, DISCONNECTED, atomics::SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + // See the discussion in 'try_recv' for why we yield + // control of this thread. + loop { + match self.queue.pop() { + mpsc::Data(..) => { steals += 1; } + mpsc::Empty | mpsc::Inconsistent => break, + } + } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&mut self) -> BlockedTask { + let task = self.to_wake.load(atomics::SeqCst); + self.to_wake.store(0, atomics::SeqCst); + assert!(task != 0); + unsafe { BlockedTask::cast_from_uint(task) } + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Helper function for select, tests whether this port can receive without + // blocking (obviously not an atomic decision). + // + // This is different than the stream version because there's no need to peek + // at the queue, we can just look at the local count. + pub fn can_recv(&mut self) -> bool { + let cnt = self.cnt.load(atomics::SeqCst); + cnt == DISCONNECTED || cnt - self.steals > 0 + } + + // Inserts the blocked task for selection on this port, returning it back if + // the port already has data on it. + // + // The code here is the same as in stream.rs, except that it doesn't need to + // peek at the channel to see if an upgrade is pending. + pub fn start_selection(&mut self, + task: BlockedTask) -> Result<(), BlockedTask> { + match self.decrement(task) { + Ok(()) => Ok(()), + Err(task) => { + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + assert!(prev >= 0); + return Err(task); + } + } + } + + // Cancels a previous task waiting on this port, returning whether there's + // data on the port. + // + // This is similar to the stream implementation (hence fewer comments), but + // uses a different value for the "steals" variable. + pub fn abort_selection(&mut self, _was_upgrade: bool) -> bool { + // Before we do anything else, we bounce on this lock. The reason for + // doing this is to ensure that any upgrade-in-progress is gone and + // done with. Without this bounce, we can race with inherit_blocker + // about looking at and dealing with to_wake. Once we have acquired the + // lock, we are guaranteed that inherit_blocker is done. + unsafe { + self.select_lock.lock(); + self.select_lock.unlock(); + } + + // Like the stream implementation, we want to make sure that the count + // on the channel goes non-negative. We don't know how negative the + // stream currently is, so instead of using a steal value of 1, we load + // the channel count and figure out what we should do to make it + // positive. + let steals = { + let cnt = self.cnt.load(atomics::SeqCst); + if cnt < 0 && cnt != DISCONNECTED {-cnt} else {0} + }; + let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst); + + if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + self.cnt.store(DISCONNECTED, atomics::SeqCst); + true + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + if prev < 0 { + self.take_to_wake().trash(); + } else { + while self.to_wake.load(atomics::SeqCst) != 0 { + Thread::yield_now(); + } + } + // if the number of steals is -1, it was the pre-emptive -1 steal + // count from when we inherited a blocker. This is fine because + // we're just going to overwrite it with a real value. + assert!(self.steals == 0 || self.steals == -1); + self.steals = steals; + prev >= 0 + } + } +} + +#[unsafe_destructor] +impl Drop for Packet { + fn drop(&mut self) { + unsafe { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + assert_eq!(self.channels.load(atomics::SeqCst), 0); + self.select_lock.destroy(); + } + } +} diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs new file mode 100644 index 00000000000..0e249a55f87 --- /dev/null +++ b/src/libstd/comm/stream.rs @@ -0,0 +1,460 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +/// Stream channels +/// +/// This is the flavor of channels which are optimized for one sender and one +/// receiver. The sender will be upgraded to a shared channel if the channel is +/// cloned. +/// +/// High level implementation details can be found in the comment of the parent +/// module. + +use comm::Port; +use int; +use iter::Iterator; +use kinds::Send; +use ops::Drop; +use option::{Some, None}; +use result::{Ok, Err, Result}; +use rt::local::Local; +use rt::task::{Task, BlockedTask}; +use rt::thread::Thread; +use spsc = sync::spsc_queue; +use sync::atomics; +use vec::OwnedVector; + +static DISCONNECTED: int = int::MIN; +static MAX_STEALS: int = 1 << 20; + +pub struct Packet { + queue: spsc::Queue>, // internal queue for all message + + cnt: atomics::AtomicInt, // How many items are on this channel + steals: int, // How many times has a port received without blocking? + to_wake: atomics::AtomicUint, // Task to wake up + + port_dropped: atomics::AtomicBool, // flag if the channel has been destroyed. +} + +pub enum Failure { + Empty, + Disconnected, + Upgraded(Port), +} + +pub enum UpgradeResult { + UpSuccess, + UpDisconnected, + UpWoke(BlockedTask), +} + +pub enum SelectionResult { + SelSuccess, + SelCanceled(BlockedTask), + SelUpgraded(BlockedTask, Port), +} + +// Any message could contain an "upgrade request" to a new shared port, so the +// internal queue it's a queue of T, but rather Message +enum Message { + Data(T), + GoUp(Port), +} + +impl Packet { + pub fn new() -> Packet { + Packet { + queue: spsc::Queue::new(128), + + cnt: atomics::AtomicInt::new(0), + steals: 0, + to_wake: atomics::AtomicUint::new(0), + + port_dropped: atomics::AtomicBool::new(false), + } + } + + + pub fn send(&mut self, t: T) -> bool { + match self.do_send(Data(t)) { + UpSuccess => true, + UpDisconnected => false, + UpWoke(task) => { + task.wake().map(|t| t.reawaken()); + true + } + } + } + pub fn upgrade(&mut self, up: Port) -> UpgradeResult { + self.do_send(GoUp(up)) + } + + fn do_send(&mut self, t: Message) -> UpgradeResult { + // Use an acquire/release ordering to maintain the same position with + // respect to the atomic loads below + if self.port_dropped.load(atomics::SeqCst) { return UpDisconnected } + + self.queue.push(t); + match self.cnt.fetch_add(1, atomics::SeqCst) { + // As described in the mod's doc comment, -1 == wakeup + -1 => UpWoke(self.take_to_wake()), + // As as described before, SPSC queues must be >= -2 + -2 => UpSuccess, + + // Be sure to preserve the disconnected state, and the return value + // in this case is going to be whether our data was received or not. + // This manifests itself on whether we have an empty queue or not. + // + // Primarily, are required to drain the queue here because the port + // will never remove this data. We can only have at most one item to + // drain (the port drains the rest). + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomics::SeqCst); + let first = self.queue.pop(); + let second = self.queue.pop(); + assert!(second.is_none()); + + match first { + Some(..) => UpSuccess, // we failed to send the data + None => UpDisconnected, // we successfully sent data + } + } + + // Otherwise we just sent some data on a non-waiting queue, so just + // make sure the world is sane and carry on! + n => { assert!(n >= 0); UpSuccess } + } + } + + // Consumes ownership of the 'to_wake' field. + fn take_to_wake(&mut self) -> BlockedTask { + let task = self.to_wake.load(atomics::SeqCst); + self.to_wake.store(0, atomics::SeqCst); + assert!(task != 0); + unsafe { BlockedTask::cast_from_uint(task) } + } + + // Decrements the count on the channel for a sleeper, returning the sleeper + // back if it shouldn't sleep. Note that this is the location where we take + // steals into account. + fn decrement(&mut self, task: BlockedTask) -> Result<(), BlockedTask> { + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + let n = unsafe { task.cast_to_uint() }; + self.to_wake.store(n, atomics::SeqCst); + + let steals = self.steals; + self.steals = 0; + + match self.cnt.fetch_sub(1 + steals, atomics::SeqCst) { + DISCONNECTED => { self.cnt.store(DISCONNECTED, atomics::SeqCst); } + // If we factor in our steals and notice that the channel has no + // data, we successfully sleep + n => { + assert!(n >= 0); + if n - steals <= 0 { return Ok(()) } + } + } + + self.to_wake.store(0, atomics::SeqCst); + Err(unsafe { BlockedTask::cast_from_uint(n) }) + } + + pub fn recv(&mut self) -> Result> { + // Optimistic preflight check (scheduling is expensive). + match self.try_recv() { + Err(Empty) => {} + data => return data, + } + + // Welp, our channel has no data. Deschedule the current task and + // initiate the blocking protocol. + let task: ~Task = Local::take(); + task.deschedule(1, |task| { + self.decrement(task) + }); + + match self.try_recv() { + // Messages which actually popped from the queue shouldn't count as + // a steal, so offset the decrement here (we already have our + // "steal" factored into the channel count above). + data @ Ok(..) | + data @ Err(Upgraded(..)) => { + self.steals -= 1; + data + } + + data => data, + } + } + + pub fn try_recv(&mut self) -> Result> { + match self.queue.pop() { + // If we stole some data, record to that effect (this will be + // factored into cnt later on). Note that we don't allow steals to + // grow without bound in order to prevent eventual overflow of + // either steals or cnt as an overflow would have catastrophic + // results. Also note that we don't unconditionally set steals to 0 + // because it can be true that steals > cnt. + Some(data) => { + self.steals += 1; + if self.steals > MAX_STEALS { + match self.cnt.swap(0, atomics::SeqCst) { + DISCONNECTED => { + self.cnt.store(DISCONNECTED, atomics::SeqCst); + } + n => { self.steals -= n; } + } + assert!(self.steals >= 0); + } + match data { + Data(t) => Ok(t), + GoUp(up) => Err(Upgraded(up)), + } + } + + None => { + match self.cnt.load(atomics::SeqCst) { + n if n != DISCONNECTED => Err(Empty), + + // This is a little bit of a tricky case. We failed to pop + // data above, and then we have viewed that the channel is + // disconnected. In this window more data could have been + // sent on the channel. It doesn't really make sense to + // return that the channel is disconnected when there's + // actually data on it, so be extra sure there's no data by + // popping one more time. + // + // We can ignore steals because the other end is + // disconnected and we'll never need to really factor in our + // steals again. + _ => { + match self.queue.pop() { + Some(Data(t)) => Ok(t), + Some(GoUp(up)) => Err(Upgraded(up)), + None => Err(Disconnected), + } + } + } + } + } + } + + pub fn drop_chan(&mut self) { + // Dropping a channel is pretty simple, we just flag it as disconnected + // and then wakeup a blocker if there is one. + match self.cnt.swap(DISCONNECTED, atomics::SeqCst) { + -1 => { self.take_to_wake().wake().map(|t| t.reawaken()); } + DISCONNECTED => {} + n => { assert!(n >= 0); } + } + } + + pub fn drop_port(&mut self) { + // Dropping a port seems like a fairly trivial thing. In theory all we + // need to do is flag that we're disconnected and then everything else + // can take over (we don't have anyone to wake up). + // + // The catch for Ports is that we want to drop the entire contents of + // the queue. There are multiple reasons for having this property, the + // largest of which is that if another chan is waiting in this channel + // (but not received yet), then waiting on that port will cause a + // deadlock. + // + // So if we accept that we must now destroy the entire contents of the + // queue, this code may make a bit more sense. The tricky part is that + // we can't let any in-flight sends go un-dropped, we have to make sure + // *everything* is dropped and nothing new will come onto the channel. + + // The first thing we do is set a flag saying that we're done for. All + // sends are gated on this flag, so we're immediately guaranteed that + // there are a bounded number of active sends that we'll have to deal + // with. + self.port_dropped.store(true, atomics::SeqCst); + + // Now that we're guaranteed to deal with a bounded number of senders, + // we need to drain the queue. This draining process happens atomically + // with respect to the "count" of the channel. If the count is nonzero + // (with steals taken into account), then there must be data on the + // channel. In this case we drain everything and then try again. We will + // continue to fail while active senders send data while we're dropping + // data, but eventually we're guaranteed to break out of this loop + // (because there is a bounded number of senders). + let mut steals = self.steals; + while { + let cnt = self.cnt.compare_and_swap( + steals, DISCONNECTED, atomics::SeqCst); + cnt != DISCONNECTED && cnt != steals + } { + loop { + match self.queue.pop() { + Some(..) => { steals += 1; } + None => break + } + } + } + + // At this point in time, we have gated all future senders from sending, + // and we have flagged the channel as being disconnected. The senders + // still have some responsibility, however, because some sends may not + // complete until after we flag the disconnection. There are more + // details in the sending methods that see DISCONNECTED + } + + //////////////////////////////////////////////////////////////////////////// + // select implementation + //////////////////////////////////////////////////////////////////////////// + + // Tests to see whether this port can receive without blocking. If Ok is + // returned, then that's the answer. If Err is returned, then the returned + // port needs to be queried instead (an upgrade happened) + pub fn can_recv(&mut self) -> Result> { + // We peek at the queue to see if there's anything on it, and we use + // this return value to determine if we should pop from the queue and + // upgrade this channel immediately. If it looks like we've got an + // upgrade pending, then go through the whole recv rigamarole to update + // the internal state. + match self.queue.peek() { + Some(&GoUp(..)) => { + match self.recv() { + Err(Upgraded(port)) => Err(port), + _ => unreachable!(), + } + } + Some(..) => Ok(true), + None => Ok(false) + } + } + + // Attempts to start selecting on this port. Like a oneshot, this can fail + // immediately because of an upgrade. + pub fn start_selection(&mut self, task: BlockedTask) -> SelectionResult { + match self.decrement(task) { + Ok(()) => SelSuccess, + Err(task) => { + let ret = match self.queue.peek() { + Some(&GoUp(..)) => { + match self.queue.pop() { + Some(GoUp(port)) => SelUpgraded(task, port), + _ => unreachable!(), + } + } + Some(..) => SelCanceled(task), + None => SelCanceled(task), + }; + // Undo our decrement above, and we should be guaranteed that the + // previous value is positive because we're not going to sleep + let prev = self.cnt.fetch_add(1, atomics::SeqCst); + assert!(prev >= 0); + return ret; + } + } + } + + // Removes a previous task from being blocked in this port + pub fn abort_selection(&mut self, + was_upgrade: bool) -> Result> { + // If we're aborting selection after upgrading from a oneshot, then + // we're guarantee that no one is waiting. The only way that we could + // have seen the upgrade is if data was actually sent on the channel + // half again. For us, this means that there is guaranteed to be data on + // this channel. Furthermore, we're guaranteed that there was no + // start_selection previously, so there's no need to modify `self.cnt` + // at all. + // + // Hence, because of these invariants, we immediately return `Ok(true)`. + // Note that the data may not actually be sent on the channel just yet. + // The other end could have flagged the upgrade but not sent data to + // this end. This is fine because we know it's a small bounded windows + // of time until the data is actually sent. + if was_upgrade { + assert_eq!(self.steals, 0); + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + return Ok(true) + } + + // We want to make sure that the count on the channel goes non-negative, + // and in the stream case we can have at most one steal, so just assume + // that we had one steal. + let steals = 1; + let prev = self.cnt.fetch_add(steals + 1, atomics::SeqCst); + + // If we were previously disconnected, then we know for sure that there + // is no task in to_wake, so just keep going + let has_data = if prev == DISCONNECTED { + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + self.cnt.store(DISCONNECTED, atomics::SeqCst); + true // there is data, that data is that we're disconnected + } else { + let cur = prev + steals + 1; + assert!(cur >= 0); + + // If the previous count was negative, then we just made things go + // positive, hence we passed the -1 boundary and we're responsible + // for removing the to_wake() field and trashing it. + // + // If the previous count was positive then we're in a tougher + // situation. A possible race is that a sender just incremented + // through -1 (meaning it's going to try to wake a task up), but it + // hasn't yet read the to_wake. In order to prevent a future recv() + // from waking up too early (this sender picking up the plastered + // over to_wake), we spin loop here waiting for to_wake to be 0. + // Note that this entire select() implementation needs an overhaul, + // and this is *not* the worst part of it, so this is not done as a + // final solution but rather out of necessity for now to get + // something working. + if prev < 0 { + self.take_to_wake().trash(); + } else { + while self.to_wake.load(atomics::SeqCst) != 0 { + Thread::yield_now(); + } + } + assert_eq!(self.steals, 0); + self.steals = steals; + + // if we were previously positive, then there's surely data to + // receive + prev >= 0 + }; + + // Now that we've determined that this queue "has data", we peek at the + // queue to see if the data is an upgrade or not. If it's an upgrade, + // then we need to destroy this port and abort selection on the + // upgraded port. + if has_data { + match self.queue.peek() { + Some(&GoUp(..)) => { + match self.queue.pop() { + Some(GoUp(port)) => Err(port), + _ => unreachable!(), + } + } + _ => Ok(true), + } + } else { + Ok(false) + } + } +} + +#[unsafe_destructor] +impl Drop for Packet { + fn drop(&mut self) { + unsafe { + // Note that this load is not only an assert for correctness about + // disconnection, but also a proper fence before the read of + // `to_wake`, so this assert cannot be removed with also removing + // the `to_wake` assert. + assert_eq!(self.cnt.load(atomics::SeqCst), DISCONNECTED); + assert_eq!(self.to_wake.load(atomics::SeqCst), 0); + } + } +} diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 75804c40c58..46c106234db 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -21,7 +21,7 @@ definitions for a number of signals. use clone::Clone; use result::{Ok, Err}; -use comm::{Port, SharedChan}; +use comm::{Port, Chan}; use container::{Map, MutableMap}; use hashmap; use io; @@ -81,7 +81,7 @@ pub struct Listener { priv handles: hashmap::HashMap, /// chan is where all the handles send signums, which are received by /// the clients from port. - priv chan: SharedChan, + priv chan: Chan, /// Clients of Listener can `recv()` from this port. This is exposed to /// allow selection over this port as well as manipulation of the port @@ -93,7 +93,7 @@ impl Listener { /// Creates a new listener for signals. Once created, signals are bound via /// the `register` method (otherwise nothing will ever be received) pub fn new() -> Listener { - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); Listener { chan: chan, port: port, diff --git a/src/libstd/prelude.rs b/src/libstd/prelude.rs index 471ec050192..4849b83037f 100644 --- a/src/libstd/prelude.rs +++ b/src/libstd/prelude.rs @@ -80,7 +80,7 @@ pub use vec::{MutableVector, MutableTotalOrdVector}; pub use vec::{Vector, VectorVector, CloneableVector, ImmutableVector}; // Reexported runtime types -pub use comm::{Port, Chan, SharedChan}; +pub use comm::{Port, Chan}; pub use task::spawn; // Reexported statics diff --git a/src/libstd/rt/mod.rs b/src/libstd/rt/mod.rs index 376d685c8ac..b751c57c0fa 100644 --- a/src/libstd/rt/mod.rs +++ b/src/libstd/rt/mod.rs @@ -73,6 +73,7 @@ pub use self::unwind::{begin_unwind, begin_unwind_raw, begin_unwind_fmt}; // FIXME: these probably shouldn't be public... #[doc(hidden)] pub mod shouldnt_be_public { + #[cfg(not(test))] pub use super::local_ptr::native::maybe_tls_key; #[cfg(not(windows), not(target_os = "android"))] pub use super::local_ptr::compiled::RT_TLS_PTR; diff --git a/src/libstd/rt/rtio.rs b/src/libstd/rt/rtio.rs index 8d02048d55c..39623e329ea 100644 --- a/src/libstd/rt/rtio.rs +++ b/src/libstd/rt/rtio.rs @@ -10,7 +10,7 @@ use c_str::CString; use cast; -use comm::{SharedChan, Port}; +use comm::{Chan, Port}; use libc::c_int; use libc; use ops::Drop; @@ -181,7 +181,7 @@ pub trait IoFactory { fn pipe_open(&mut self, fd: c_int) -> Result<~RtioPipe, IoError>; fn tty_open(&mut self, fd: c_int, readable: bool) -> Result<~RtioTTY, IoError>; - fn signal(&mut self, signal: Signum, channel: SharedChan) + fn signal(&mut self, signal: Signum, channel: Chan) -> Result<~RtioSignal, IoError>; } diff --git a/src/libstd/rt/task.rs b/src/libstd/rt/task.rs index a7648dd2d19..e2b94e655e8 100644 --- a/src/libstd/rt/task.rs +++ b/src/libstd/rt/task.rs @@ -449,7 +449,7 @@ mod test { #[test] fn comm_shared_chan() { - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); chan.send(10); assert!(port.recv() == 10); } diff --git a/src/libstd/rt/unwind.rs b/src/libstd/rt/unwind.rs index 2f4e705735e..9f89becaef9 100644 --- a/src/libstd/rt/unwind.rs +++ b/src/libstd/rt/unwind.rs @@ -73,6 +73,7 @@ use unstable::intrinsics; use uw = self::libunwind; +#[allow(dead_code)] mod libunwind { //! Unwind library interface diff --git a/src/libstd/run.rs b/src/libstd/run.rs index fdd26c6c383..6f684f23d47 100644 --- a/src/libstd/run.rs +++ b/src/libstd/run.rs @@ -13,7 +13,7 @@ #[allow(missing_doc)]; #[deny(unused_must_use)]; -use comm::SharedChan; +use comm::Chan; use io::Reader; use io::process::ProcessExit; use io::process; @@ -225,7 +225,7 @@ impl Process { // in parallel so we don't deadlock while blocking on one // or the other. FIXME (#2625): Surely there's a much more // clever way to do this. - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let ch_clone = ch.clone(); spawn(proc() { diff --git a/src/libstd/sync/mpmc_bounded_queue.rs b/src/libstd/sync/mpmc_bounded_queue.rs index 74f3a6f6918..44825a1ef94 100644 --- a/src/libstd/sync/mpmc_bounded_queue.rs +++ b/src/libstd/sync/mpmc_bounded_queue.rs @@ -172,7 +172,7 @@ mod tests { let nmsgs = 1000u; let mut q = Queue::with_capacity(nthreads*nmsgs); assert_eq!(None, q.pop()); - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); for _ in range(0, nthreads) { let q = q.clone(); diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 258162069d9..b5a55f3f8c9 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -156,14 +156,15 @@ impl Drop for Queue { mod tests { use prelude::*; - use super::{Queue, Data, Empty, Inconsistent}; use native; + use super::{Queue, Data, Empty, Inconsistent}; + use sync::arc::UnsafeArc; #[test] fn test_full() { let mut q = Queue::new(); - p.push(~1); - p.push(~2); + q.push(~1); + q.push(~2); } #[test] @@ -171,11 +172,11 @@ mod tests { let nthreads = 8u; let nmsgs = 1000u; let mut q = Queue::new(); - match c.pop() { + match q.pop() { Empty => {} Inconsistent | Data(..) => fail!() } - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); let q = UnsafeArc::new(q); for _ in range(0, nthreads) { diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index d1fde759cc1..a2c61a2b135 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -194,14 +194,16 @@ impl Queue { } } - /// Tests whether this queue is empty or not. Remember that there can only - /// be one tester/popper, and also keep in mind that the answer returned - /// from this is likely to change if it is `false`. - pub fn is_empty(&self) -> bool { + /// Attempts to peek at the head of the queue, returning `None` if the queue + /// has no data currently + pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> { + // This is essentially the same as above with all the popping bits + // stripped out. unsafe { let tail = self.tail; let next = (*tail).next.load(Acquire); - return next.is_null(); + if next.is_null() { return None } + return (*next).value.as_mut(); } } } @@ -223,8 +225,9 @@ impl Drop for Queue { #[cfg(test)] mod test { use prelude::*; - use super::Queue; use native; + use super::Queue; + use sync::arc::UnsafeArc; #[test] fn smoke() { @@ -272,7 +275,6 @@ mod test { let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); native::task::spawn(proc() { - let mut c = c; for _ in range(0, 100000) { loop { match unsafe { (*b.get()).pop() } { diff --git a/src/libstd/task.rs b/src/libstd/task.rs index 921d0feaa8b..5d8c4a87b39 100644 --- a/src/libstd/task.rs +++ b/src/libstd/task.rs @@ -65,7 +65,6 @@ use rt::task::Task; use str::{Str, SendStr, IntoMaybeOwned}; #[cfg(test)] use any::{AnyOwnExt, AnyRefExt}; -#[cfg(test)] use comm::SharedChan; #[cfg(test)] use ptr; #[cfg(test)] use result; @@ -474,9 +473,9 @@ fn test_try_fail() { fn test_spawn_sched() { use clone::Clone; - let (po, ch) = SharedChan::new(); + let (po, ch) = Chan::new(); - fn f(i: int, ch: SharedChan<()>) { + fn f(i: int, ch: Chan<()>) { let ch = ch.clone(); spawn(proc() { if i == 0 { diff --git a/src/libsync/sync/mod.rs b/src/libsync/sync/mod.rs index 0ac385ea1d1..7078f01945e 100644 --- a/src/libsync/sync/mod.rs +++ b/src/libsync/sync/mod.rs @@ -764,7 +764,7 @@ mod tests { use std::cast; use std::result; use std::task; - use std::comm::{SharedChan, Empty}; + use std::comm::Empty; /************************************************************************ * Semaphore tests @@ -1393,7 +1393,7 @@ mod tests { #[test] fn test_barrier() { let barrier = Barrier::new(10); - let (port, chan) = SharedChan::new(); + let (port, chan) = Chan::new(); for _ in range(0, 9) { let c = barrier.clone(); diff --git a/src/libsync/sync/mutex.rs b/src/libsync/sync/mutex.rs index f1a81d65c1d..3726528a5e9 100644 --- a/src/libsync/sync/mutex.rs +++ b/src/libsync/sync/mutex.rs @@ -531,7 +531,7 @@ mod test { } } - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); for _ in range(0, N) { let c2 = c.clone(); native::task::spawn(proc() { inc(); c2.send(()); }); diff --git a/src/libsync/sync/one.rs b/src/libsync/sync/one.rs index 93d818b704d..a651f3b9d4c 100644 --- a/src/libsync/sync/one.rs +++ b/src/libsync/sync/one.rs @@ -137,7 +137,7 @@ mod test { static mut o: Once = ONCE_INIT; static mut run: bool = false; - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); for _ in range(0, 10) { let c = c.clone(); spawn(proc() { diff --git a/src/test/bench/msgsend-pipes-shared.rs b/src/test/bench/msgsend-pipes-shared.rs index aa4e0f1ae58..b766be88d23 100644 --- a/src/test/bench/msgsend-pipes-shared.rs +++ b/src/test/bench/msgsend-pipes-shared.rs @@ -53,7 +53,7 @@ fn server(requests: &Port, responses: &Chan) { fn run(args: &[~str]) { let (from_child, to_parent) = Chan::new(); - let (from_parent, to_child) = SharedChan::new(); + let (from_parent, to_child) = Chan::new(); let size = from_str::(args[1]).unwrap(); let workers = from_str::(args[2]).unwrap(); diff --git a/src/test/bench/msgsend-pipes.rs b/src/test/bench/msgsend-pipes.rs index 6ce0f9de8d0..89e0bcf3326 100644 --- a/src/test/bench/msgsend-pipes.rs +++ b/src/test/bench/msgsend-pipes.rs @@ -67,7 +67,7 @@ fn run(args: &[~str]) { }); from_parent } else { - let (from_parent, to_child) = SharedChan::new(); + let (from_parent, to_child) = Chan::new(); for _ in range(0u, workers) { let to_child = to_child.clone(); let mut builder = task::task(); diff --git a/src/test/bench/shootout-chameneos-redux.rs b/src/test/bench/shootout-chameneos-redux.rs index 7281667e676..5c237b306fb 100644 --- a/src/test/bench/shootout-chameneos-redux.rs +++ b/src/test/bench/shootout-chameneos-redux.rs @@ -100,8 +100,8 @@ fn creature( name: uint, color: color, from_rendezvous: Port>, - to_rendezvous: SharedChan, - to_rendezvous_log: SharedChan<~str> + to_rendezvous: Chan, + to_rendezvous_log: Chan<~str> ) { let mut color = color; let mut creatures_met = 0; @@ -137,8 +137,8 @@ fn creature( fn rendezvous(nn: uint, set: ~[color]) { // these ports will allow us to hear from the creatures - let (from_creatures, to_rendezvous) = SharedChan::::new(); - let (from_creatures_log, to_rendezvous_log) = SharedChan::<~str>::new(); + let (from_creatures, to_rendezvous) = Chan::::new(); + let (from_creatures_log, to_rendezvous_log) = Chan::<~str>::new(); // these channels will be passed to the creatures so they can talk to us diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index 86a2043527e..7f4fd3cf94c 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -28,13 +28,13 @@ use std::task; use std::uint; fn fib(n: int) -> int { - fn pfib(c: &SharedChan, n: int) { + fn pfib(c: &Chan, n: int) { if n == 0 { c.send(0); } else if n <= 2 { c.send(1); } else { - let (pp, cc) = SharedChan::new(); + let (pp, cc) = Chan::new(); let ch = cc.clone(); task::spawn(proc() pfib(&ch, n - 1)); let ch = cc.clone(); @@ -43,7 +43,7 @@ fn fib(n: int) -> int { } } - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let _t = task::spawn(proc() pfib(&ch, n) ); p.recv() } diff --git a/src/test/bench/task-perf-linked-failure.rs b/src/test/bench/task-perf-linked-failure.rs index 2a012ef19fa..189a3ac7448 100644 --- a/src/test/bench/task-perf-linked-failure.rs +++ b/src/test/bench/task-perf-linked-failure.rs @@ -33,7 +33,7 @@ // Creates in the background 'num_tasks' tasks, all blocked forever. // Doesn't return until all such tasks are ready, but doesn't block forever itself. -use std::comm::{stream, SharedChan}; +use std::comm::{stream, Chan}; use std::os; use std::result; use std::task; @@ -41,7 +41,7 @@ use std::uint; fn grandchild_group(num_tasks: uint) { let (po, ch) = stream(); - let ch = SharedChan::new(ch); + let ch = Chan::new(ch); for _ in range(0, num_tasks) { let ch = ch.clone(); diff --git a/src/test/compile-fail/comm-not-freeze.rs b/src/test/compile-fail/comm-not-freeze.rs index 2b85068d470..ef5bd21f913 100644 --- a/src/test/compile-fail/comm-not-freeze.rs +++ b/src/test/compile-fail/comm-not-freeze.rs @@ -13,5 +13,5 @@ fn test() {} fn main() { test::>(); //~ ERROR: does not fulfill `Freeze` test::>(); //~ ERROR: does not fulfill `Freeze` - test::>(); //~ ERROR: does not fulfill `Freeze` + test::>(); //~ ERROR: does not fulfill `Freeze` } diff --git a/src/test/run-pass/hashmap-memory.rs b/src/test/run-pass/hashmap-memory.rs index b15c3dca855..9c05dae46bd 100644 --- a/src/test/run-pass/hashmap-memory.rs +++ b/src/test/run-pass/hashmap-memory.rs @@ -31,7 +31,7 @@ mod map_reduce { enum ctrl_proto { find_reducer(~[u8], Chan), mapper_done, } - fn start_mappers(ctrl: SharedChan, inputs: ~[~str]) { + fn start_mappers(ctrl: Chan, inputs: ~[~str]) { for i in inputs.iter() { let ctrl = ctrl.clone(); let i = i.clone(); @@ -39,11 +39,11 @@ mod map_reduce { } } - fn map_task(ctrl: SharedChan, input: ~str) { + fn map_task(ctrl: Chan, input: ~str) { let mut intermediates = HashMap::new(); fn emit(im: &mut HashMap<~str, int>, - ctrl: SharedChan, key: ~str, + ctrl: Chan, key: ~str, _val: ~str) { if im.contains_key(&key) { return; @@ -63,7 +63,7 @@ mod map_reduce { } pub fn map_reduce(inputs: ~[~str]) { - let (ctrl_port, ctrl_chan) = SharedChan::new(); + let (ctrl_port, ctrl_chan) = Chan::new(); // This task becomes the master control task. It spawns others // to do the rest. diff --git a/src/test/run-pass/task-comm-14.rs b/src/test/run-pass/task-comm-14.rs index b51f626f3c2..0403284e55f 100644 --- a/src/test/run-pass/task-comm-14.rs +++ b/src/test/run-pass/task-comm-14.rs @@ -13,7 +13,7 @@ use std::task; pub fn main() { - let (po, ch) = SharedChan::new(); + let (po, ch) = Chan::new(); // Spawn 10 tasks each sending us back one int. let mut i = 10; @@ -37,7 +37,7 @@ pub fn main() { info!("main thread exiting"); } -fn child(x: int, ch: &SharedChan) { +fn child(x: int, ch: &Chan) { info!("{}", x); ch.send(x); } diff --git a/src/test/run-pass/task-comm-3.rs b/src/test/run-pass/task-comm-3.rs index 049f2d71946..f5374e7df05 100644 --- a/src/test/run-pass/task-comm-3.rs +++ b/src/test/run-pass/task-comm-3.rs @@ -16,7 +16,7 @@ use std::task; pub fn main() { info!("===== WITHOUT THREADS ====="); test00(); } -fn test00_start(ch: &SharedChan, message: int, count: int) { +fn test00_start(ch: &Chan, message: int, count: int) { info!("Starting test00_start"); let mut i: int = 0; while i < count { @@ -33,7 +33,7 @@ fn test00() { info!("Creating tasks"); - let (po, ch) = SharedChan::new(); + let (po, ch) = Chan::new(); let mut i: int = 0; diff --git a/src/test/run-pass/task-comm-6.rs b/src/test/run-pass/task-comm-6.rs index 45994e78d94..c63bf8bc856 100644 --- a/src/test/run-pass/task-comm-6.rs +++ b/src/test/run-pass/task-comm-6.rs @@ -15,7 +15,7 @@ pub fn main() { test00(); } fn test00() { let mut r: int = 0; let mut sum: int = 0; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let mut c0 = ch.clone(); let mut c1 = ch.clone(); let mut c2 = ch.clone(); diff --git a/src/test/run-pass/task-comm-7.rs b/src/test/run-pass/task-comm-7.rs index 159962e1857..ff43a80adac 100644 --- a/src/test/run-pass/task-comm-7.rs +++ b/src/test/run-pass/task-comm-7.rs @@ -18,7 +18,7 @@ use std::task; pub fn main() { test00(); } -fn test00_start(c: &SharedChan, start: int, +fn test00_start(c: &Chan, start: int, number_of_messages: int) { let mut i: int = 0; while i < number_of_messages { c.send(start + i); i += 1; } @@ -27,7 +27,7 @@ fn test00_start(c: &SharedChan, start: int, fn test00() { let mut r: int = 0; let mut sum: int = 0; - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let number_of_messages: int = 10; let c = ch.clone(); diff --git a/src/test/run-pass/unique-send-2.rs b/src/test/run-pass/unique-send-2.rs index d1c45a336fa..299fed735ab 100644 --- a/src/test/run-pass/unique-send-2.rs +++ b/src/test/run-pass/unique-send-2.rs @@ -10,12 +10,12 @@ use std::task; -fn child(c: &SharedChan<~uint>, i: uint) { +fn child(c: &Chan<~uint>, i: uint) { c.send(~i); } pub fn main() { - let (p, ch) = SharedChan::new(); + let (p, ch) = Chan::new(); let n = 100u; let mut expected = 0u; for i in range(0u, n) { diff --git a/src/test/run-pass/unwind-resource.rs b/src/test/run-pass/unwind-resource.rs index 4679f65c43c..e643a20436e 100644 --- a/src/test/run-pass/unwind-resource.rs +++ b/src/test/run-pass/unwind-resource.rs @@ -15,7 +15,7 @@ extern mod extra; use std::task; struct complainer { - c: SharedChan, + c: Chan, } impl Drop for complainer { @@ -26,20 +26,20 @@ impl Drop for complainer { } } -fn complainer(c: SharedChan) -> complainer { +fn complainer(c: Chan) -> complainer { error!("Hello!"); complainer { c: c } } -fn f(c: SharedChan) { +fn f(c: Chan) { let _c = complainer(c); fail!(); } pub fn main() { - let (p, c) = SharedChan::new(); + let (p, c) = Chan::new(); task::spawn(proc() f(c.clone())); error!("hiiiiiiiii"); assert!(p.recv());