diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index b6738f25c68..495f6ead1ca 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -193,6 +193,7 @@ use task::GreenTask; mod macros; mod simple; +mod message_queue; pub mod basic; pub mod context; diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs new file mode 100644 index 00000000000..3a118476aff --- /dev/null +++ b/src/libgreen/message_queue.rs @@ -0,0 +1,61 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use mpsc = std::sync::mpsc_queue; +use std::sync::arc::UnsafeArc; + +pub enum PopResult { + Inconsistent, + Empty, + Data(T), +} + +pub fn queue() -> (Consumer, Producer) { + let (a, b) = UnsafeArc::new2(mpsc::Queue::new()); + (Consumer { inner: a }, Producer { inner: b }) +} + +pub struct Producer { + priv inner: UnsafeArc>, +} + +pub struct Consumer { + priv inner: UnsafeArc>, +} + +impl Consumer { + pub fn pop(&mut self) -> PopResult { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => Inconsistent, + mpsc::Empty => Empty, + mpsc::Data(t) => Data(t), + } + } + + pub fn casual_pop(&mut self) -> Option { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => None, + mpsc::Empty => None, + mpsc::Data(t) => Some(t), + } + } +} + +impl Producer { + pub fn push(&mut self, t: T) { + unsafe { (*self.inner.get()).push(t); } + } +} + +impl Clone for Producer { + fn clone(&self) -> Producer { + Producer { inner: self.inner.clone() } + } +} diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 2b090ac94d7..bf6e0c3430e 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -17,7 +17,6 @@ use std::rt::task::Task; use std::sync::deque; use std::unstable::mutex::Mutex; use std::unstable::raw; -use mpsc = std::sync::mpsc_queue; use TaskState; use context::Context; @@ -25,6 +24,7 @@ use coroutine::Coroutine; use sleeper_list::SleeperList; use stack::StackPool; use task::{TypeSched, GreenTask, HomeSched, AnySched}; +use msgq = message_queue; /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified @@ -47,9 +47,9 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - message_queue: mpsc::Consumer, + message_queue: msgq::Consumer, /// Producer used to clone sched handles from - message_producer: mpsc::Producer, + message_producer: msgq::Producer, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -143,7 +143,7 @@ impl Scheduler { state: TaskState) -> Scheduler { - let (consumer, producer) = mpsc::queue(()); + let (consumer, producer) = msgq::queue(); let mut sched = Scheduler { pool_id: pool_id, sleeper_list: sleeper_list, @@ -215,7 +215,7 @@ impl Scheduler { // Should not have any messages let message = stask.sched.get_mut_ref().message_queue.pop(); - rtassert!(match message { mpsc::Empty => true, _ => false }); + rtassert!(match message { msgq::Empty => true, _ => false }); stask.task.get_mut_ref().destroyed = true; } @@ -340,8 +340,8 @@ impl Scheduler { // // I have chosen to take route #2. match self.message_queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty | mpsc::Inconsistent => None + msgq::Data(t) => Some(t), + msgq::Empty | msgq::Inconsistent => None } }; @@ -849,7 +849,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: mpsc::Producer, + priv queue: msgq::Producer, sched_id: uint } diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 358582d436b..5b697e0d73d 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -24,6 +24,7 @@ use std::cast; use std::libc::{c_void, c_int}; use std::rt::task::BlockedTask; use std::unstable::sync::LittleLock; +use std::sync::arc::UnsafeArc; use mpsc = std::sync::mpsc_queue; use async::AsyncWatcher; @@ -39,46 +40,46 @@ enum Message { struct State { handle: *uvll::uv_async_t, lock: LittleLock, // see comments in async_cb for why this is needed + queue: mpsc::Queue, } /// This structure is intended to be stored next to the event loop, and it is /// used to create new `Queue` structures. pub struct QueuePool { - priv producer: mpsc::Producer, - priv consumer: mpsc::Consumer, + priv queue: UnsafeArc, priv refcnt: uint, } /// This type is used to send messages back to the original event loop. pub struct Queue { - priv queue: mpsc::Producer, + priv queue: UnsafeArc, } extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { assert_eq!(status, 0); - let state: &mut QueuePool = unsafe { + let pool: &mut QueuePool = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; - let packet = unsafe { state.consumer.packet() }; + let state: &mut State = unsafe { cast::transmute(pool.queue.get()) }; // Remember that there is no guarantee about how many times an async // callback is called with relation to the number of sends, so process the // entire queue in a loop. loop { - match state.consumer.pop() { + match state.queue.pop() { mpsc::Data(Task(task)) => { let _ = task.wake().map(|t| t.reawaken()); } mpsc::Data(Increment) => unsafe { - if state.refcnt == 0 { - uvll::uv_ref((*packet).handle); + if pool.refcnt == 0 { + uvll::uv_ref(state.handle); } - state.refcnt += 1; + pool.refcnt += 1; }, mpsc::Data(Decrement) => unsafe { - state.refcnt -= 1; - if state.refcnt == 0 { - uvll::uv_unref((*packet).handle); + pool.refcnt -= 1; + if pool.refcnt == 0 { + uvll::uv_unref(state.handle); } }, mpsc::Empty | mpsc::Inconsistent => break @@ -99,9 +100,9 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { // If we acquire the mutex here, then we are guaranteed that there are no // longer any senders which are holding on to their handles, so we can // safely allow the event loop to exit. - if state.refcnt == 0 { + if pool.refcnt == 0 { unsafe { - let _l = (*packet).lock.lock(); + let _l = state.lock.lock(); } } } @@ -109,14 +110,14 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { impl QueuePool { pub fn new(loop_: &mut Loop) -> ~QueuePool { let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); - let (c, p) = mpsc::queue(State { + let state = UnsafeArc::new(State { handle: handle, lock: LittleLock::new(), + queue: mpsc::Queue::new(), }); let q = ~QueuePool { - producer: p, - consumer: c, refcnt: 0, + queue: state, }; unsafe { @@ -132,23 +133,23 @@ impl QueuePool { pub fn queue(&mut self) -> Queue { unsafe { if self.refcnt == 0 { - uvll::uv_ref((*self.producer.packet()).handle); + uvll::uv_ref((*self.queue.get()).handle); } self.refcnt += 1; } - Queue { queue: self.producer.clone() } + Queue { queue: self.queue.clone() } } pub fn handle(&self) -> *uvll::uv_async_t { - unsafe { (*self.producer.packet()).handle } + unsafe { (*self.queue.get()).handle } } } impl Queue { pub fn push(&mut self, task: BlockedTask) { - self.queue.push(Task(task)); unsafe { - uvll::uv_async_send((*self.queue.packet()).handle); + (*self.queue.get()).queue.push(Task(task)); + uvll::uv_async_send((*self.queue.get()).handle); } } } @@ -161,7 +162,7 @@ impl Clone for Queue { // and if the queue is dropped later on it'll see the increment for the // decrement anyway. unsafe { - cast::transmute_mut(self).queue.push(Increment); + (*self.queue.get()).queue.push(Increment); } Queue { queue: self.queue.clone() } } @@ -172,9 +173,9 @@ impl Drop for Queue { // See the comments in the async_cb function for why there is a lock // that is acquired only on a drop. unsafe { - let state = self.queue.packet(); + let state = self.queue.get(); let _l = (*state).lock.lock(); - self.queue.push(Decrement); + (*state).queue.push(Decrement); uvll::uv_async_send((*state).handle); } } diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 34bf83ac491..63d69100671 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -238,6 +238,7 @@ use result::{Ok, Err}; use rt::local::Local; use rt::task::{Task, BlockedTask}; use rt::thread::Thread; +use sync::arc::UnsafeArc; use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed}; use vec::OwnedVector; @@ -273,24 +274,6 @@ macro_rules! test ( mod select; -/////////////////////////////////////////////////////////////////////////////// -// Helper type to abstract ports for channels and shared channels -/////////////////////////////////////////////////////////////////////////////// - -enum Consumer { - SPSC(spsc::Consumer), - MPSC(mpsc::Consumer), -} - -impl Consumer{ - unsafe fn packet(&self) -> *mut Packet { - match *self { - SPSC(ref c) => c.packet(), - MPSC(ref c) => c.packet(), - } - } -} - /////////////////////////////////////////////////////////////////////////////// // Public structs /////////////////////////////////////////////////////////////////////////////// @@ -298,8 +281,7 @@ impl Consumer{ /// The receiving-half of Rust's channel type. This half can only be owned by /// one task pub struct Port { - priv queue: Consumer, - + priv inner: PortInner, // can't share in an arc priv marker: marker::NoFreeze, } @@ -314,8 +296,7 @@ pub struct Messages<'a, T> { /// The sending-half of Rust's channel type. This half can only be owned by one /// task pub struct Chan { - priv queue: spsc::Producer, - + priv inner: UnsafeArc>, // can't share in an arc priv marker: marker::NoFreeze, } @@ -323,8 +304,6 @@ pub struct Chan { /// The sending-half of Rust's channel type. This half can be shared among many /// tasks by creating copies of itself through the `clone` method. pub struct SharedChan { - priv queue: mpsc::Producer, - // can't share in an arc -- technically this implementation is // shareable, but it shouldn't be required to be shareable in an // arc @@ -349,16 +328,26 @@ pub enum TryRecvResult { // Internal struct definitions /////////////////////////////////////////////////////////////////////////////// +enum PortInner { + Single(UnsafeArc>), + Shared(UnsafeArc>), +} + +struct SingleInner { + queue: spsc::Queue, + packet: Packet, +} + +struct SharedInner { + queue: mpsc::Queue, + packet: Packet, +} + struct Packet { cnt: AtomicInt, // How many items are on this channel steals: int, // How many times has a port received without blocking? to_wake: Option, // Task to wake up - // This lock is used to wake up native threads blocked in select. The - // `lock` field is not used because the thread blocking in select must - // block on only one mutex. - //selection_lock: Option>, - // The number of channels which are currently using this packet. This is // used to reference count shared channels. channels: AtomicInt, @@ -377,6 +366,15 @@ struct Packet { static DISCONNECTED: int = int::MIN; static RESCHED_FREQ: int = 200; +impl PortInner { + fn packet<'a>(&'a mut self) -> &'a mut Packet { + match *self { + Single(ref arc) => unsafe { &mut (*arc.get()).packet }, + Shared(ref arc) => unsafe { &mut (*arc.get()).packet }, + } + } +} + impl Packet { fn new() -> Packet { Packet { @@ -550,10 +548,12 @@ impl Chan { pub fn new() -> (Port, Chan) { // arbitrary 128 size cache -- this is just a max cache size, not a // maximum buffer size - let (c, p) = spsc::queue(128, Packet::new()); - let c = SPSC(c); - (Port { queue: c, marker: marker::NoFreeze }, - Chan { queue: p, marker: marker::NoFreeze }) + let (a, b) = UnsafeArc::new2(SingleInner { + queue: spsc::Queue::new(128), + packet: Packet::new(), + }); + (Port { inner: Single(a), marker: marker::NoFreeze }, + Chan { inner: b, marker: marker::NoFreeze }) } /// Sends a value along this channel to be received by the corresponding @@ -596,16 +596,15 @@ impl Chan { /// be tolerated, then this method should be used instead. pub fn try_send(&self, t: T) -> bool { unsafe { - let this = cast::transmute_mut(self); - this.queue.push(t); - let packet = this.queue.packet(); - match (*packet).increment() { + let inner = self.inner.get(); + (*inner).queue.push(t); + match (*inner).packet.increment() { // As described above, -1 == wakeup - -1 => { (*packet).wakeup(); true } + -1 => { (*inner).packet.wakeup(); true } // Also as above, SPSC queues must be >= -2 -2 => true, // We succeeded if we sent data - DISCONNECTED => this.queue.is_empty(), + DISCONNECTED => (*inner).queue.is_empty(), // In order to prevent starvation of other tasks in situations // where a task sends repeatedly without ever receiving, we // occassionally yield instead of doing a send immediately. @@ -630,7 +629,7 @@ impl Chan { #[unsafe_destructor] impl Drop for Chan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + unsafe { (*self.inner.get()).packet.drop_chan(); } } } @@ -640,10 +639,13 @@ impl SharedChan { /// same time. All data sent on any channel will become available on the /// provided port as well. pub fn new() -> (Port, SharedChan) { - let (c, p) = mpsc::queue(Packet::new()); - let c = MPSC(c); - (Port { queue: c, marker: marker::NoFreeze }, - SharedChan { queue: p, marker: marker::NoFreeze }) + let (a, b) = UnsafeArc::new2(SharedInner { + queue: mpsc::Queue::new(), + packet: Packet::new(), + }); + (Port { inner: Shared(a), marker: marker::NoFreeze }, + SharedChan { inner: b, marker: marker::NoFreeze }) + (Port { inner: Shared(a) }, SharedChan { inner: b }) } /// Equivalent method to `send` on the `Chan` type (using the same @@ -683,17 +685,15 @@ impl SharedChan { // preflight check serves as the definitive "this will never be // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" - let packet = self.queue.packet(); - if (*packet).cnt.load(Relaxed) < DISCONNECTED + 1024 { + let inner = self.inner.get(); + if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 { return false } - let this = cast::transmute_mut(self); - this.queue.push(t); - - match (*packet).increment() { + (*inner).queue.push(t); + match (*inner).packet.increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(); } + -1 => { (*inner).packet.wakeup(); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); @@ -708,15 +708,15 @@ impl SharedChan { impl Clone for SharedChan { fn clone(&self) -> SharedChan { - unsafe { (*self.queue.packet()).channels.fetch_add(1, SeqCst); } - SharedChan { queue: self.queue.clone(), marker: marker::NoFreeze } + unsafe { (*self.inner.get()).packet.channels.fetch_add(1, SeqCst); } + SharedChan { inner: self.inner.clone(), marker: marker::NoFreeze } } } #[unsafe_destructor] impl Drop for SharedChan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + unsafe { (*self.inner.get()).packet.drop_chan(); } } } @@ -768,18 +768,18 @@ impl Port { // See the comment about yielding on sends, but the same applies here. // If a thread is spinning in try_recv we should try - unsafe { - let packet = this.queue.packet(); - (*packet).recv_cnt += 1; - if (*packet).recv_cnt % RESCHED_FREQ == 0 { + { + let packet = this.inner.packet(); + packet.recv_cnt += 1; + if packet.recv_cnt % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); task.maybe_yield(); } } - let ret = match this.queue { - SPSC(ref mut queue) => queue.pop(), - MPSC(ref mut queue) => match queue.pop() { + let ret = match this.inner { + Single(ref mut arc) => unsafe { (*arc.get()).queue.pop() }, + Shared(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => Some(t), mpsc::Empty => None, @@ -812,7 +812,7 @@ impl Port { let data; loop { Thread::yield_now(); - match queue.pop() { + match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => { data = t; break } mpsc::Empty => fail!("inconsistent => empty"), mpsc::Inconsistent => {} @@ -823,7 +823,7 @@ impl Port { } }; if increment && ret.is_some() { - unsafe { (*this.queue.packet()).steals += 1; } + this.inner.packet().steals += 1; } match ret { Some(t) => Data(t), @@ -880,7 +880,7 @@ impl Port { let this; unsafe { this = cast::transmute_mut(self); - packet = this.queue.packet(); + packet = this.inner.packet(); let task: ~Task = Local::take(); task.deschedule(1, |task| { assert!((*packet).to_wake.is_none()); @@ -917,9 +917,7 @@ impl Drop for Port { // All we need to do is store that we're disconnected. If the channel // half has already disconnected, then we'll just deallocate everything // when the shared packet is deallocated. - unsafe { - (*self.queue.packet()).cnt.store(DISCONNECTED, SeqCst); - } + self.inner.packet().cnt.store(DISCONNECTED, SeqCst); } } diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index a369ecba86b..cf8df863817 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -130,18 +130,18 @@ impl Select { let id = this.next_id; this.next_id += 1; unsafe { - let packet = port.queue.packet(); + let packet = port.inner.packet(); assert!(!(*packet).selecting.load(Relaxed)); assert_eq!((*packet).selection_id, 0); (*packet).selection_id = id; if this.head.is_null() { - this.head = packet; - this.tail = packet; + this.head = packet as *mut Packet; + this.tail = packet as *mut Packet; } else { (*packet).select_prev = this.tail; assert!((*packet).select_next.is_null()); - (*this.tail).select_next = packet; - this.tail = packet; + (*this.tail).select_next = packet as *mut Packet; + this.tail = packet as *mut Packet; } } Handle { id: id, selector: this, port: port } @@ -303,7 +303,7 @@ impl Drop for Select { #[unsafe_destructor] impl<'port, T: Send> Drop for Handle<'port, T> { fn drop(&mut self) { - unsafe { self.selector.remove(self.port.queue.packet()) } + unsafe { self.selector.remove(self.port.inner.packet()) } } } diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 1ec8ac5d83e..258162069d9 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -39,12 +39,10 @@ // /queues/non-intrusive-mpsc-node-based-queue use cast; -use clone::Clone; use kinds::Send; use ops::Drop; use option::{Option, None, Some}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; /// A result of the `pop` function. @@ -65,40 +63,12 @@ struct Node { value: Option, } -struct State { - head: AtomicPtr>, - tail: *mut Node, - packet: P, -} - -/// The consumer half of this concurrent queue. This half is used to receive -/// data from the producers. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// The production half of the concurrent queue. This handle may be cloned in -/// order to make handles for new producers. -pub struct Producer { - priv state: UnsafeArc>, -} - -impl Clone for Producer { - fn clone(&self) -> Producer { - Producer { state: self.state.clone() } - } -} - -/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet" -/// of information which will be shared by the consumer and the producer which -/// can be re-acquired via the `packet` function. This is helpful when extra -/// state is shared between the producer and consumer, but note that there is no -/// synchronization performed of this data. -pub fn queue(p: P) -> (Consumer, Producer) { - unsafe { - let (a, b) = UnsafeArc::new2(State::new(p)); - (Consumer { state: a }, Producer { state: b }) - } +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue { + priv head: AtomicPtr>, + priv tail: *mut Node, } impl Node { @@ -110,41 +80,66 @@ impl Node { } } -impl State { - unsafe fn new(p: P) -> State { - let stub = Node::new(None); - State { +impl Queue { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: stub, - packet: p, } } - unsafe fn push(&mut self, t: T) { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); + /// Pushes a new value onto this queue. + pub fn push(&mut self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); + } } - unsafe fn pop(&mut self) -> PopResult { - let tail = self.tail; - let next = (*tail).next.load(Acquire); + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option`. It is possible for this queue to be in an + /// inconsistent state where many pushes have suceeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + pub fn pop(&mut self) -> PopResult { + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); - if !next.is_null() { - self.tail = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: ~Node = cast::transmute(tail); - return Data(ret); + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node = cast::transmute(tail); + return Data(ret); + } + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} } + } - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + /// Attempts to pop data from this queue, but doesn't attempt too hard. This + /// will canonicalize inconsistent states to a `None` value. + pub fn casual_pop(&mut self) -> Option { + match self.pop() { + Data(t) => Some(t), + Empty | Inconsistent => None, + } } } #[unsafe_destructor] -impl Drop for State { +impl Drop for Queue { fn drop(&mut self) { unsafe { let mut cur = self.tail; @@ -157,58 +152,16 @@ impl Drop for State { } } -impl Producer { - /// Pushes a new value onto this queue. - pub fn push(&mut self, value: T) { - unsafe { (*self.state.get()).push(value) } - } - /// Gets an unsafe pointer to the user-defined packet shared by the - /// producers and the consumer. Note that care must be taken to ensure that - /// the lifetime of the queue outlives the usage of the returned pointer. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl Consumer { - /// Pops some data from this queue. - /// - /// Note that the current implementation means that this function cannot - /// return `Option`. It is possible for this queue to be in an - /// inconsistent state where many pushes have suceeded and completely - /// finished, but pops cannot return `Some(t)`. This inconsistent state - /// happens when a pusher is pre-empted at an inopportune moment. - /// - /// This inconsistent state means that this queue does indeed have data, but - /// it does not currently have access to it at this time. - pub fn pop(&mut self) -> PopResult { - unsafe { (*self.state.get()).pop() } - } - /// Attempts to pop data from this queue, but doesn't attempt too hard. This - /// will canonicalize inconsistent states to a `None` value. - pub fn casual_pop(&mut self) -> Option { - match self.pop() { - Data(t) => Some(t), - Empty | Inconsistent => None, - } - } - /// Gets an unsafe pointer to the underlying user-defined packet. See - /// `Producer.packet` for more information. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - #[cfg(test)] mod tests { use prelude::*; - use super::{queue, Data, Empty, Inconsistent}; + use super::{Queue, Data, Empty, Inconsistent}; use native; #[test] fn test_full() { - let (_, mut p) = queue(()); + let mut q = Queue::new(); p.push(~1); p.push(~2); } @@ -217,20 +170,20 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let (mut c, p) = queue(()); + let mut q = Queue::new(); match c.pop() { Empty => {} Inconsistent | Data(..) => fail!() } let (port, chan) = SharedChan::new(); + let q = UnsafeArc::new(q); for _ in range(0, nthreads) { - let q = p.clone(); let chan = chan.clone(); + let q = q.clone(); native::task::spawn(proc() { - let mut q = q; for i in range(0, nmsgs) { - q.push(i); + unsafe { (*q.get()).push(i); } } chan.send(()); }); @@ -238,11 +191,12 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match c.pop() { + match unsafe { (*q.get()).pop() } { Empty | Inconsistent => {}, Data(_) => { i += 1 } } } + drop(chan); for _ in range(0, nthreads) { port.recv(); } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 35a5846f11a..d1fde759cc1 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -38,7 +38,6 @@ use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; // Node within the linked list queue of messages to send @@ -50,75 +49,25 @@ struct Node { next: AtomicPtr>, // next node in the queue } -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State { +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue { // consumer fields - tail: *mut Node, // where to pop from - tail_prev: AtomicPtr>, // where to pop from + priv tail: *mut Node, // where to pop from + priv tail_prev: AtomicPtr>, // where to pop from // producer fields - head: *mut Node, // where to push to - first: *mut Node, // where to get new nodes from - tail_copy: *mut Node, // between first/tail + priv head: *mut Node, // where to push to + priv first: *mut Node, // where to get new nodes from + priv tail_copy: *mut Node, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -/// Producer half of this queue. This handle is used to push data to the -/// consumer. -pub struct Producer { - priv state: UnsafeArc>, -} - -/// Consumer half of this queue. This handle is used to receive data from the -/// producer. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// Creates a new queue. The producer returned is connected to the consumer to -/// push all data to the consumer. -/// -/// # Arguments -/// -/// * `bound` - This queue implementation is implemented with a linked list, -/// and this means that a push is always a malloc. In order to -/// amortize this cost, an internal cache of nodes is maintained -/// to prevent a malloc from always being necessary. This bound is -/// the limit on the size of the cache (if desired). If the value -/// is 0, then the cache has no bound. Otherwise, the cache will -/// never grow larger than `bound` (although the queue itself -/// could be much larger. -/// -/// * `p` - This is the user-defined packet of data which will also be shared -/// between the producer and consumer. -pub fn queue(bound: uint, - p: P) -> (Consumer, Producer) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) + priv cache_bound: uint, + priv cache_additions: AtomicUint, + priv cache_subtractions: AtomicUint, } impl Node { @@ -132,49 +81,49 @@ impl Node { } } -impl Producer { - /// Pushes data onto the queue +impl Queue { + /// Creates a new queue. The producer returned is connected to the consumer + /// to push all data to the consumer. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub fn new(bound: uint) -> Queue { + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + Queue { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + } + } + + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - /// Tests whether the queue is empty. Note that if this function returns - /// `false`, the return value is significant, but if the return value is - /// `true` then almost no meaning can be attached to the return value. - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - /// Acquires an unsafe pointer to the underlying user-defined packet. Note - /// that care must be taken to ensure that the queue outlives the usage of - /// the packet (because it is an unsafe pointer). - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl Consumer { - /// Pops some data from this queue, returning `None` when the queue is - /// empty. - pub fn pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } - } - /// Same function as the producer's `packet` method. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl State { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } } unsafe fn alloc(&mut self) -> *mut Node { @@ -208,50 +157,57 @@ impl State { Node::new() } - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&mut self) -> Option { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { + self.tail = next; + if self.cache_bound == 0 { self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node = cast::transmute(tail); + // FIXME: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node = cast::transmute(tail); + } } + return ret; } - return ret; } - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); + /// Tests whether this queue is empty or not. Remember that there can only + /// be one tester/popper, and also keep in mind that the answer returned + /// from this is likely to change if it is `false`. + pub fn is_empty(&self) -> bool { + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + return next.is_null(); + } } } #[unsafe_destructor] -impl Drop for State { +impl Drop for Queue { fn drop(&mut self) { unsafe { let mut cur = self.first; @@ -267,44 +223,44 @@ impl Drop for State { #[cfg(test)] mod test { use prelude::*; - use super::queue; + use super::Queue; use native; #[test] fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(0); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(0); + q.push(~1); + q.push(~2); } #[test] fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(1); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] @@ -313,13 +269,13 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); + let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); native::task::spawn(proc() { let mut c = c; for _ in range(0, 100000) { loop { - match c.pop() { + match unsafe { (*b.get()).pop() } { Some(1) => break, Some(_) => fail!(), None => {} @@ -329,7 +285,7 @@ mod test { chan.send(()); }); for _ in range(0, 100000) { - p.push(1); + unsafe { (*a.get()).push(1); } } port.recv(); }