From adb895a34f6d0b925b8ef877289ca6e3c4d854d4 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Sat, 21 Dec 2013 22:15:04 -0800 Subject: [PATCH] Allow more "error" values in try_recv() This should allow callers to know whether the channel was empty or disconnected without having to block. Closes #11087 --- src/libextra/comm.rs | 14 ++--- src/libextra/sync.rs | 9 ++-- src/libgreen/sched.rs | 5 +- src/libstd/comm/mod.rs | 104 ++++++++++++++++++++++++++++++++------ src/libstd/comm/select.rs | 9 ++-- src/libstd/io/signal.rs | 3 +- src/libstd/io/timer.rs | 5 +- 7 files changed, 114 insertions(+), 35 deletions(-) diff --git a/src/libextra/comm.rs b/src/libextra/comm.rs index 52b5bedb7ea..bd1a46ae9bf 100644 --- a/src/libextra/comm.rs +++ b/src/libextra/comm.rs @@ -16,6 +16,8 @@ Higher level communication abstractions. #[allow(missing_doc)]; +use std::comm; + /// An extension of `pipes::stream` that allows both sending and receiving. pub struct DuplexStream { priv chan: Chan, @@ -40,7 +42,7 @@ impl DuplexStream { pub fn recv(&self) -> U { self.port.recv() } - pub fn try_recv(&self) -> Option { + pub fn try_recv(&self) -> comm::TryRecvResult { self.port.try_recv() } pub fn recv_opt(&self) -> Option { @@ -77,11 +79,11 @@ impl SyncPort { }) } - pub fn try_recv(&self) -> Option { - self.duplex_stream.try_recv().map(|val| { - self.duplex_stream.try_send(()); - val - }) + pub fn try_recv(&self) -> comm::TryRecvResult { + match self.duplex_stream.try_recv() { + comm::Data(t) => { self.duplex_stream.try_send(()); comm::Data(t) } + state => state, + } } } diff --git a/src/libextra/sync.rs b/src/libextra/sync.rs index 12566ac8551..044e5e9e509 100644 --- a/src/libextra/sync.rs +++ b/src/libextra/sync.rs @@ -19,6 +19,7 @@ use std::borrow; +use std::comm; use std::unstable::sync::Exclusive; use std::sync::arc::UnsafeArc; use std::sync::atomics; @@ -49,7 +50,7 @@ impl WaitQueue { // Signals one live task from the queue. fn signal(&self) -> bool { match self.head.try_recv() { - Some(ch) => { + comm::Data(ch) => { // Send a wakeup signal. If the waiter was killed, its port will // have closed. Keep trying until we get a live task. if ch.try_send_deferred(()) { @@ -58,7 +59,7 @@ impl WaitQueue { self.signal() } } - None => false + _ => false } } @@ -66,12 +67,12 @@ impl WaitQueue { let mut count = 0; loop { match self.head.try_recv() { - None => break, - Some(ch) => { + comm::Data(ch) => { if ch.try_send_deferred(()) { count += 1; } } + _ => break } } count diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index b0b88e4be79..1ae4d07af18 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -958,6 +958,7 @@ fn new_sched_rng() -> XorShiftRng { #[cfg(test)] mod test { + use std::comm; use std::task::TaskOpts; use std::rt::Runtime; use std::rt::task::Task; @@ -1376,7 +1377,7 @@ mod test { // This task should not be able to starve the sender; // The sender should get stolen to another thread. do spawn { - while port.try_recv().is_none() { } + while port.try_recv() != comm::Data(()) { } } chan.send(()); @@ -1393,7 +1394,7 @@ mod test { // This task should not be able to starve the other task. // The sends should eventually yield. do spawn { - while port.try_recv().is_none() { + while port.try_recv() != comm::Data(()) { chan2.send(()); } } diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index bf37e5fca6a..bf9e28f3e97 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -251,6 +251,7 @@ macro_rules! test ( #[allow(unused_imports)]; use native; + use comm::*; use prelude::*; use super::*; use super::super::*; @@ -323,6 +324,20 @@ pub struct SharedChan { priv queue: mpsc::Producer, } +/// This enumeration is the list of the possible reasons that try_recv could not +/// return data when called. +#[deriving(Eq, Clone)] +pub enum TryRecvResult { + /// This channel is currently empty, but the sender(s) have not yet + /// disconnected, so data may yet become available. + Empty, + /// This channel's sending half has become disconnected, and there will + /// never be any more data received on this channel + Disconnected, + /// The channel had some data and we successfully popped it + Data(T), +} + /////////////////////////////////////////////////////////////////////////////// // Internal struct definitions /////////////////////////////////////////////////////////////////////////////// @@ -739,11 +754,11 @@ impl Port { /// block on a port. /// /// This function cannot fail. - pub fn try_recv(&self) -> Option { + pub fn try_recv(&self) -> TryRecvResult { self.try_recv_inc(true) } - fn try_recv_inc(&self, increment: bool) -> Option { + fn try_recv_inc(&self, increment: bool) -> TryRecvResult { // This is a "best effort" situation, so if a queue is inconsistent just // don't worry about it. let this = unsafe { cast::transmute_mut(self) }; @@ -807,7 +822,35 @@ impl Port { if increment && ret.is_some() { unsafe { (*this.queue.packet()).steals += 1; } } - return ret; + match ret { + Some(t) => Data(t), + None => { + // It's possible that between the time that we saw the queue was + // empty and here the other side disconnected. It's also + // possible for us to see the disconnection here while there is + // data in the queue. It's pretty backwards-thinking to return + // Disconnected when there's actually data on the queue, so if + // we see a disconnected state be sure to check again to be 100% + // sure that there's no data in the queue. + let cnt = unsafe { (*this.queue.packet()).cnt.load(Relaxed) }; + if cnt != DISCONNECTED { return Empty } + + let ret = match this.queue { + SPSC(ref mut queue) => queue.pop(), + MPSC(ref mut queue) => match queue.pop() { + mpsc::Data(t) => Some(t), + mpsc::Empty => None, + mpsc::Inconsistent => { + fail!("inconsistent with no senders?!"); + } + } + }; + match ret { + Some(data) => Data(data), + None => Disconnected, + } + } + } } /// Attempt to wait for a value on this port, but does not fail if the @@ -824,7 +867,11 @@ impl Port { /// the value found on the port is returned. pub fn recv_opt(&self) -> Option { // optimistic preflight check (scheduling is expensive) - match self.try_recv() { None => {}, data => return data } + match self.try_recv() { + Empty => {}, + Disconnected => return None, + Data(t) => return Some(t), + } let packet; let this; @@ -843,12 +890,11 @@ impl Port { }); } - let data = self.try_recv_inc(false); - if data.is_none() && - unsafe { (*packet).cnt.load(SeqCst) } != DISCONNECTED { - fail!("bug: woke up too soon {}", unsafe { (*packet).cnt.load(SeqCst) }); + match self.try_recv_inc(false) { + Data(t) => Some(t), + Empty => fail!("bug: woke up too soon"), + Disconnected => None, } - return data; } /// Returns an iterator which will block waiting for messages, but never @@ -1005,7 +1051,10 @@ mod test { for _ in range(0, AMT * NTHREADS) { assert_eq!(p.recv(), 1); } - assert_eq!(p.try_recv(), None); + match p.try_recv() { + Data(..) => fail!(), + _ => {} + } c1.send(()); } @@ -1129,7 +1178,7 @@ mod test { test!(fn oneshot_single_thread_try_recv_open() { let (port, chan) = Chan::::new(); chan.send(10); - assert!(port.try_recv() == Some(10)); + assert!(port.recv_opt() == Some(10)); }) test!(fn oneshot_single_thread_try_recv_closed() { @@ -1140,21 +1189,21 @@ mod test { test!(fn oneshot_single_thread_peek_data() { let (port, chan) = Chan::::new(); - assert!(port.try_recv().is_none()); + assert_eq!(port.try_recv(), Empty) chan.send(10); - assert!(port.try_recv().is_some()); + assert_eq!(port.try_recv(), Data(10)); }) test!(fn oneshot_single_thread_peek_close() { let (port, chan) = Chan::::new(); { let _c = chan; } - assert!(port.try_recv().is_none()); - assert!(port.try_recv().is_none()); + assert_eq!(port.try_recv(), Disconnected); + assert_eq!(port.try_recv(), Disconnected); }) test!(fn oneshot_single_thread_peek_open() { let (port, _) = Chan::::new(); - assert!(port.try_recv().is_none()); + assert_eq!(port.try_recv(), Empty); }) test!(fn oneshot_multi_task_recv_then_send() { @@ -1321,4 +1370,27 @@ mod test { drop(chan); assert_eq!(count_port.recv(), 4); }) + + test!(fn try_recv_states() { + let (p, c) = Chan::::new(); + let (p1, c1) = Chan::<()>::new(); + let (p2, c2) = Chan::<()>::new(); + do spawn { + p1.recv(); + c.send(1); + c2.send(()); + p1.recv(); + drop(c); + c2.send(()); + } + + assert_eq!(p.try_recv(), Empty); + c1.send(()); + p2.recv(); + assert_eq!(p.try_recv(), Data(1)); + assert_eq!(p.try_recv(), Empty); + c1.send(()); + p2.recv(); + assert_eq!(p.try_recv(), Disconnected); + }) } diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index 302c9d9ea46..fa5ec1d3e30 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -45,6 +45,7 @@ #[allow(dead_code)]; use cast; +use comm; use iter::Iterator; use kinds::Send; use ops::Drop; @@ -279,7 +280,9 @@ impl<'port, T: Send> Handle<'port, T> { pub fn recv_opt(&mut self) -> Option { self.port.recv_opt() } /// Immediately attempt to receive a value on a port, this function will /// never block. Has the same semantics as `Port.try_recv`. - pub fn try_recv(&mut self) -> Option { self.port.try_recv() } + pub fn try_recv(&mut self) -> comm::TryRecvResult { + self.port.try_recv() + } } #[unsafe_destructor] @@ -409,8 +412,8 @@ mod test { a = p1.recv() => { assert_eq!(a, 1); }, a = p2.recv() => { assert_eq!(a, 2); } ) - assert_eq!(p1.try_recv(), None); - assert_eq!(p2.try_recv(), None); + assert_eq!(p1.try_recv(), Empty); + assert_eq!(p2.try_recv(), Empty); c3.send(()); }) diff --git a/src/libstd/io/signal.rs b/src/libstd/io/signal.rs index 34b4ed5e1ef..0f05254b034 100644 --- a/src/libstd/io/signal.rs +++ b/src/libstd/io/signal.rs @@ -144,6 +144,7 @@ impl Listener { #[cfg(test)] mod test { use libc; + use comm::Empty; use io::timer; use super::{Listener, Interrupt}; @@ -194,7 +195,7 @@ mod test { s2.unregister(Interrupt); sigint(); timer::sleep(10); - assert!(s2.port.try_recv().is_none()); + assert_eq!(s2.port.try_recv(), Empty); } #[cfg(windows)] diff --git a/src/libstd/io/timer.rs b/src/libstd/io/timer.rs index 7c9aa28bfe9..d156a7460e1 100644 --- a/src/libstd/io/timer.rs +++ b/src/libstd/io/timer.rs @@ -123,7 +123,7 @@ mod test { let port1 = timer.oneshot(10000); let port = timer.oneshot(1); port.recv(); - assert_eq!(port1.try_recv(), None); + assert!(port1.recv_opt().is_none()); } #[test] @@ -131,8 +131,7 @@ mod test { let mut timer = Timer::new().unwrap(); let port = timer.oneshot(100000000000); timer.sleep(1); // this should invalidate the port - - assert_eq!(port.try_recv(), None); + assert!(port.recv_opt().is_none()); } #[test]