From 47ef20014c32443b12a122c0371a87f513830807 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Mon, 6 Jan 2014 15:23:37 -0800 Subject: [PATCH] Shuffle around ownership in concurrent queues Beforehand, using a concurrent queue always mandated that the "shared state" be stored internally to the queues in order to provide a safe interface. This isn't quite as flexible as one would want in some circumstances, so instead this commit moves the queues to not containing the shared state. The queues no longer have a "default useful safe" interface, but rather a "default safe" interface (minus the useful part). The queues have to be shared manually through an Arc or some other means. This allows them to be a little more flexible at the cost of a usability hindrance. I plan on using this new flexibility to upgrade a channel to a shared channel seamlessly. --- src/libgreen/lib.rs | 1 + src/libgreen/message_queue.rs | 61 +++++++ src/libgreen/sched.rs | 16 +- src/librustuv/queue.rs | 51 +++--- src/libstd/comm/mod.rs | 132 ++++++++------- src/libstd/comm/select.rs | 12 +- src/libstd/sync/mpsc_queue.rs | 166 +++++++------------ src/libstd/sync/spsc_queue.rs | 292 +++++++++++++++------------------- 8 files changed, 351 insertions(+), 380 deletions(-) create mode 100644 src/libgreen/message_queue.rs diff --git a/src/libgreen/lib.rs b/src/libgreen/lib.rs index b6738f25c68..495f6ead1ca 100644 --- a/src/libgreen/lib.rs +++ b/src/libgreen/lib.rs @@ -193,6 +193,7 @@ use task::GreenTask; mod macros; mod simple; +mod message_queue; pub mod basic; pub mod context; diff --git a/src/libgreen/message_queue.rs b/src/libgreen/message_queue.rs new file mode 100644 index 00000000000..3a118476aff --- /dev/null +++ b/src/libgreen/message_queue.rs @@ -0,0 +1,61 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use mpsc = std::sync::mpsc_queue; +use std::sync::arc::UnsafeArc; + +pub enum PopResult { + Inconsistent, + Empty, + Data(T), +} + +pub fn queue() -> (Consumer, Producer) { + let (a, b) = UnsafeArc::new2(mpsc::Queue::new()); + (Consumer { inner: a }, Producer { inner: b }) +} + +pub struct Producer { + priv inner: UnsafeArc>, +} + +pub struct Consumer { + priv inner: UnsafeArc>, +} + +impl Consumer { + pub fn pop(&mut self) -> PopResult { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => Inconsistent, + mpsc::Empty => Empty, + mpsc::Data(t) => Data(t), + } + } + + pub fn casual_pop(&mut self) -> Option { + match unsafe { (*self.inner.get()).pop() } { + mpsc::Inconsistent => None, + mpsc::Empty => None, + mpsc::Data(t) => Some(t), + } + } +} + +impl Producer { + pub fn push(&mut self, t: T) { + unsafe { (*self.inner.get()).push(t); } + } +} + +impl Clone for Producer { + fn clone(&self) -> Producer { + Producer { inner: self.inner.clone() } + } +} diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 2b090ac94d7..bf6e0c3430e 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -17,7 +17,6 @@ use std::rt::task::Task; use std::sync::deque; use std::unstable::mutex::Mutex; use std::unstable::raw; -use mpsc = std::sync::mpsc_queue; use TaskState; use context::Context; @@ -25,6 +24,7 @@ use coroutine::Coroutine; use sleeper_list::SleeperList; use stack::StackPool; use task::{TypeSched, GreenTask, HomeSched, AnySched}; +use msgq = message_queue; /// A scheduler is responsible for coordinating the execution of Tasks /// on a single thread. The scheduler runs inside a slightly modified @@ -47,9 +47,9 @@ pub struct Scheduler { /// The queue of incoming messages from other schedulers. /// These are enqueued by SchedHandles after which a remote callback /// is triggered to handle the message. - message_queue: mpsc::Consumer, + message_queue: msgq::Consumer, /// Producer used to clone sched handles from - message_producer: mpsc::Producer, + message_producer: msgq::Producer, /// A shared list of sleeping schedulers. We'll use this to wake /// up schedulers when pushing work onto the work queue. sleeper_list: SleeperList, @@ -143,7 +143,7 @@ impl Scheduler { state: TaskState) -> Scheduler { - let (consumer, producer) = mpsc::queue(()); + let (consumer, producer) = msgq::queue(); let mut sched = Scheduler { pool_id: pool_id, sleeper_list: sleeper_list, @@ -215,7 +215,7 @@ impl Scheduler { // Should not have any messages let message = stask.sched.get_mut_ref().message_queue.pop(); - rtassert!(match message { mpsc::Empty => true, _ => false }); + rtassert!(match message { msgq::Empty => true, _ => false }); stask.task.get_mut_ref().destroyed = true; } @@ -340,8 +340,8 @@ impl Scheduler { // // I have chosen to take route #2. match self.message_queue.pop() { - mpsc::Data(t) => Some(t), - mpsc::Empty | mpsc::Inconsistent => None + msgq::Data(t) => Some(t), + msgq::Empty | msgq::Inconsistent => None } }; @@ -849,7 +849,7 @@ pub enum SchedMessage { pub struct SchedHandle { priv remote: ~RemoteCallback, - priv queue: mpsc::Producer, + priv queue: msgq::Producer, sched_id: uint } diff --git a/src/librustuv/queue.rs b/src/librustuv/queue.rs index 358582d436b..5b697e0d73d 100644 --- a/src/librustuv/queue.rs +++ b/src/librustuv/queue.rs @@ -24,6 +24,7 @@ use std::cast; use std::libc::{c_void, c_int}; use std::rt::task::BlockedTask; use std::unstable::sync::LittleLock; +use std::sync::arc::UnsafeArc; use mpsc = std::sync::mpsc_queue; use async::AsyncWatcher; @@ -39,46 +40,46 @@ enum Message { struct State { handle: *uvll::uv_async_t, lock: LittleLock, // see comments in async_cb for why this is needed + queue: mpsc::Queue, } /// This structure is intended to be stored next to the event loop, and it is /// used to create new `Queue` structures. pub struct QueuePool { - priv producer: mpsc::Producer, - priv consumer: mpsc::Consumer, + priv queue: UnsafeArc, priv refcnt: uint, } /// This type is used to send messages back to the original event loop. pub struct Queue { - priv queue: mpsc::Producer, + priv queue: UnsafeArc, } extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { assert_eq!(status, 0); - let state: &mut QueuePool = unsafe { + let pool: &mut QueuePool = unsafe { cast::transmute(uvll::get_data_for_uv_handle(handle)) }; - let packet = unsafe { state.consumer.packet() }; + let state: &mut State = unsafe { cast::transmute(pool.queue.get()) }; // Remember that there is no guarantee about how many times an async // callback is called with relation to the number of sends, so process the // entire queue in a loop. loop { - match state.consumer.pop() { + match state.queue.pop() { mpsc::Data(Task(task)) => { let _ = task.wake().map(|t| t.reawaken()); } mpsc::Data(Increment) => unsafe { - if state.refcnt == 0 { - uvll::uv_ref((*packet).handle); + if pool.refcnt == 0 { + uvll::uv_ref(state.handle); } - state.refcnt += 1; + pool.refcnt += 1; }, mpsc::Data(Decrement) => unsafe { - state.refcnt -= 1; - if state.refcnt == 0 { - uvll::uv_unref((*packet).handle); + pool.refcnt -= 1; + if pool.refcnt == 0 { + uvll::uv_unref(state.handle); } }, mpsc::Empty | mpsc::Inconsistent => break @@ -99,9 +100,9 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { // If we acquire the mutex here, then we are guaranteed that there are no // longer any senders which are holding on to their handles, so we can // safely allow the event loop to exit. - if state.refcnt == 0 { + if pool.refcnt == 0 { unsafe { - let _l = (*packet).lock.lock(); + let _l = state.lock.lock(); } } } @@ -109,14 +110,14 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) { impl QueuePool { pub fn new(loop_: &mut Loop) -> ~QueuePool { let handle = UvHandle::alloc(None::, uvll::UV_ASYNC); - let (c, p) = mpsc::queue(State { + let state = UnsafeArc::new(State { handle: handle, lock: LittleLock::new(), + queue: mpsc::Queue::new(), }); let q = ~QueuePool { - producer: p, - consumer: c, refcnt: 0, + queue: state, }; unsafe { @@ -132,23 +133,23 @@ impl QueuePool { pub fn queue(&mut self) -> Queue { unsafe { if self.refcnt == 0 { - uvll::uv_ref((*self.producer.packet()).handle); + uvll::uv_ref((*self.queue.get()).handle); } self.refcnt += 1; } - Queue { queue: self.producer.clone() } + Queue { queue: self.queue.clone() } } pub fn handle(&self) -> *uvll::uv_async_t { - unsafe { (*self.producer.packet()).handle } + unsafe { (*self.queue.get()).handle } } } impl Queue { pub fn push(&mut self, task: BlockedTask) { - self.queue.push(Task(task)); unsafe { - uvll::uv_async_send((*self.queue.packet()).handle); + (*self.queue.get()).queue.push(Task(task)); + uvll::uv_async_send((*self.queue.get()).handle); } } } @@ -161,7 +162,7 @@ impl Clone for Queue { // and if the queue is dropped later on it'll see the increment for the // decrement anyway. unsafe { - cast::transmute_mut(self).queue.push(Increment); + (*self.queue.get()).queue.push(Increment); } Queue { queue: self.queue.clone() } } @@ -172,9 +173,9 @@ impl Drop for Queue { // See the comments in the async_cb function for why there is a lock // that is acquired only on a drop. unsafe { - let state = self.queue.packet(); + let state = self.queue.get(); let _l = (*state).lock.lock(); - self.queue.push(Decrement); + (*state).queue.push(Decrement); uvll::uv_async_send((*state).handle); } } diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 34bf83ac491..63d69100671 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -238,6 +238,7 @@ use result::{Ok, Err}; use rt::local::Local; use rt::task::{Task, BlockedTask}; use rt::thread::Thread; +use sync::arc::UnsafeArc; use sync::atomics::{AtomicInt, AtomicBool, SeqCst, Relaxed}; use vec::OwnedVector; @@ -273,24 +274,6 @@ macro_rules! test ( mod select; -/////////////////////////////////////////////////////////////////////////////// -// Helper type to abstract ports for channels and shared channels -/////////////////////////////////////////////////////////////////////////////// - -enum Consumer { - SPSC(spsc::Consumer), - MPSC(mpsc::Consumer), -} - -impl Consumer{ - unsafe fn packet(&self) -> *mut Packet { - match *self { - SPSC(ref c) => c.packet(), - MPSC(ref c) => c.packet(), - } - } -} - /////////////////////////////////////////////////////////////////////////////// // Public structs /////////////////////////////////////////////////////////////////////////////// @@ -298,8 +281,7 @@ impl Consumer{ /// The receiving-half of Rust's channel type. This half can only be owned by /// one task pub struct Port { - priv queue: Consumer, - + priv inner: PortInner, // can't share in an arc priv marker: marker::NoFreeze, } @@ -314,8 +296,7 @@ pub struct Messages<'a, T> { /// The sending-half of Rust's channel type. This half can only be owned by one /// task pub struct Chan { - priv queue: spsc::Producer, - + priv inner: UnsafeArc>, // can't share in an arc priv marker: marker::NoFreeze, } @@ -323,8 +304,6 @@ pub struct Chan { /// The sending-half of Rust's channel type. This half can be shared among many /// tasks by creating copies of itself through the `clone` method. pub struct SharedChan { - priv queue: mpsc::Producer, - // can't share in an arc -- technically this implementation is // shareable, but it shouldn't be required to be shareable in an // arc @@ -349,16 +328,26 @@ pub enum TryRecvResult { // Internal struct definitions /////////////////////////////////////////////////////////////////////////////// +enum PortInner { + Single(UnsafeArc>), + Shared(UnsafeArc>), +} + +struct SingleInner { + queue: spsc::Queue, + packet: Packet, +} + +struct SharedInner { + queue: mpsc::Queue, + packet: Packet, +} + struct Packet { cnt: AtomicInt, // How many items are on this channel steals: int, // How many times has a port received without blocking? to_wake: Option, // Task to wake up - // This lock is used to wake up native threads blocked in select. The - // `lock` field is not used because the thread blocking in select must - // block on only one mutex. - //selection_lock: Option>, - // The number of channels which are currently using this packet. This is // used to reference count shared channels. channels: AtomicInt, @@ -377,6 +366,15 @@ struct Packet { static DISCONNECTED: int = int::MIN; static RESCHED_FREQ: int = 200; +impl PortInner { + fn packet<'a>(&'a mut self) -> &'a mut Packet { + match *self { + Single(ref arc) => unsafe { &mut (*arc.get()).packet }, + Shared(ref arc) => unsafe { &mut (*arc.get()).packet }, + } + } +} + impl Packet { fn new() -> Packet { Packet { @@ -550,10 +548,12 @@ impl Chan { pub fn new() -> (Port, Chan) { // arbitrary 128 size cache -- this is just a max cache size, not a // maximum buffer size - let (c, p) = spsc::queue(128, Packet::new()); - let c = SPSC(c); - (Port { queue: c, marker: marker::NoFreeze }, - Chan { queue: p, marker: marker::NoFreeze }) + let (a, b) = UnsafeArc::new2(SingleInner { + queue: spsc::Queue::new(128), + packet: Packet::new(), + }); + (Port { inner: Single(a), marker: marker::NoFreeze }, + Chan { inner: b, marker: marker::NoFreeze }) } /// Sends a value along this channel to be received by the corresponding @@ -596,16 +596,15 @@ impl Chan { /// be tolerated, then this method should be used instead. pub fn try_send(&self, t: T) -> bool { unsafe { - let this = cast::transmute_mut(self); - this.queue.push(t); - let packet = this.queue.packet(); - match (*packet).increment() { + let inner = self.inner.get(); + (*inner).queue.push(t); + match (*inner).packet.increment() { // As described above, -1 == wakeup - -1 => { (*packet).wakeup(); true } + -1 => { (*inner).packet.wakeup(); true } // Also as above, SPSC queues must be >= -2 -2 => true, // We succeeded if we sent data - DISCONNECTED => this.queue.is_empty(), + DISCONNECTED => (*inner).queue.is_empty(), // In order to prevent starvation of other tasks in situations // where a task sends repeatedly without ever receiving, we // occassionally yield instead of doing a send immediately. @@ -630,7 +629,7 @@ impl Chan { #[unsafe_destructor] impl Drop for Chan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + unsafe { (*self.inner.get()).packet.drop_chan(); } } } @@ -640,10 +639,13 @@ impl SharedChan { /// same time. All data sent on any channel will become available on the /// provided port as well. pub fn new() -> (Port, SharedChan) { - let (c, p) = mpsc::queue(Packet::new()); - let c = MPSC(c); - (Port { queue: c, marker: marker::NoFreeze }, - SharedChan { queue: p, marker: marker::NoFreeze }) + let (a, b) = UnsafeArc::new2(SharedInner { + queue: mpsc::Queue::new(), + packet: Packet::new(), + }); + (Port { inner: Shared(a), marker: marker::NoFreeze }, + SharedChan { inner: b, marker: marker::NoFreeze }) + (Port { inner: Shared(a) }, SharedChan { inner: b }) } /// Equivalent method to `send` on the `Chan` type (using the same @@ -683,17 +685,15 @@ impl SharedChan { // preflight check serves as the definitive "this will never be // received". Once we get beyond this check, we have permanently // entered the realm of "this may be received" - let packet = self.queue.packet(); - if (*packet).cnt.load(Relaxed) < DISCONNECTED + 1024 { + let inner = self.inner.get(); + if (*inner).packet.cnt.load(Relaxed) < DISCONNECTED + 1024 { return false } - let this = cast::transmute_mut(self); - this.queue.push(t); - - match (*packet).increment() { + (*inner).queue.push(t); + match (*inner).packet.increment() { DISCONNECTED => {} // oh well, we tried - -1 => { (*packet).wakeup(); } + -1 => { (*inner).packet.wakeup(); } n => { if n > 0 && n % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); @@ -708,15 +708,15 @@ impl SharedChan { impl Clone for SharedChan { fn clone(&self) -> SharedChan { - unsafe { (*self.queue.packet()).channels.fetch_add(1, SeqCst); } - SharedChan { queue: self.queue.clone(), marker: marker::NoFreeze } + unsafe { (*self.inner.get()).packet.channels.fetch_add(1, SeqCst); } + SharedChan { inner: self.inner.clone(), marker: marker::NoFreeze } } } #[unsafe_destructor] impl Drop for SharedChan { fn drop(&mut self) { - unsafe { (*self.queue.packet()).drop_chan(); } + unsafe { (*self.inner.get()).packet.drop_chan(); } } } @@ -768,18 +768,18 @@ impl Port { // See the comment about yielding on sends, but the same applies here. // If a thread is spinning in try_recv we should try - unsafe { - let packet = this.queue.packet(); - (*packet).recv_cnt += 1; - if (*packet).recv_cnt % RESCHED_FREQ == 0 { + { + let packet = this.inner.packet(); + packet.recv_cnt += 1; + if packet.recv_cnt % RESCHED_FREQ == 0 { let task: ~Task = Local::take(); task.maybe_yield(); } } - let ret = match this.queue { - SPSC(ref mut queue) => queue.pop(), - MPSC(ref mut queue) => match queue.pop() { + let ret = match this.inner { + Single(ref mut arc) => unsafe { (*arc.get()).queue.pop() }, + Shared(ref mut arc) => match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => Some(t), mpsc::Empty => None, @@ -812,7 +812,7 @@ impl Port { let data; loop { Thread::yield_now(); - match queue.pop() { + match unsafe { (*arc.get()).queue.pop() } { mpsc::Data(t) => { data = t; break } mpsc::Empty => fail!("inconsistent => empty"), mpsc::Inconsistent => {} @@ -823,7 +823,7 @@ impl Port { } }; if increment && ret.is_some() { - unsafe { (*this.queue.packet()).steals += 1; } + this.inner.packet().steals += 1; } match ret { Some(t) => Data(t), @@ -880,7 +880,7 @@ impl Port { let this; unsafe { this = cast::transmute_mut(self); - packet = this.queue.packet(); + packet = this.inner.packet(); let task: ~Task = Local::take(); task.deschedule(1, |task| { assert!((*packet).to_wake.is_none()); @@ -917,9 +917,7 @@ impl Drop for Port { // All we need to do is store that we're disconnected. If the channel // half has already disconnected, then we'll just deallocate everything // when the shared packet is deallocated. - unsafe { - (*self.queue.packet()).cnt.store(DISCONNECTED, SeqCst); - } + self.inner.packet().cnt.store(DISCONNECTED, SeqCst); } } diff --git a/src/libstd/comm/select.rs b/src/libstd/comm/select.rs index a369ecba86b..cf8df863817 100644 --- a/src/libstd/comm/select.rs +++ b/src/libstd/comm/select.rs @@ -130,18 +130,18 @@ impl Select { let id = this.next_id; this.next_id += 1; unsafe { - let packet = port.queue.packet(); + let packet = port.inner.packet(); assert!(!(*packet).selecting.load(Relaxed)); assert_eq!((*packet).selection_id, 0); (*packet).selection_id = id; if this.head.is_null() { - this.head = packet; - this.tail = packet; + this.head = packet as *mut Packet; + this.tail = packet as *mut Packet; } else { (*packet).select_prev = this.tail; assert!((*packet).select_next.is_null()); - (*this.tail).select_next = packet; - this.tail = packet; + (*this.tail).select_next = packet as *mut Packet; + this.tail = packet as *mut Packet; } } Handle { id: id, selector: this, port: port } @@ -303,7 +303,7 @@ impl Drop for Select { #[unsafe_destructor] impl<'port, T: Send> Drop for Handle<'port, T> { fn drop(&mut self) { - unsafe { self.selector.remove(self.port.queue.packet()) } + unsafe { self.selector.remove(self.port.inner.packet()) } } } diff --git a/src/libstd/sync/mpsc_queue.rs b/src/libstd/sync/mpsc_queue.rs index 1ec8ac5d83e..258162069d9 100644 --- a/src/libstd/sync/mpsc_queue.rs +++ b/src/libstd/sync/mpsc_queue.rs @@ -39,12 +39,10 @@ // /queues/non-intrusive-mpsc-node-based-queue use cast; -use clone::Clone; use kinds::Send; use ops::Drop; use option::{Option, None, Some}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed}; /// A result of the `pop` function. @@ -65,40 +63,12 @@ struct Node { value: Option, } -struct State { - head: AtomicPtr>, - tail: *mut Node, - packet: P, -} - -/// The consumer half of this concurrent queue. This half is used to receive -/// data from the producers. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// The production half of the concurrent queue. This handle may be cloned in -/// order to make handles for new producers. -pub struct Producer { - priv state: UnsafeArc>, -} - -impl Clone for Producer { - fn clone(&self) -> Producer { - Producer { state: self.state.clone() } - } -} - -/// Creates a new MPSC queue. The given argument `p` is a user-defined "packet" -/// of information which will be shared by the consumer and the producer which -/// can be re-acquired via the `packet` function. This is helpful when extra -/// state is shared between the producer and consumer, but note that there is no -/// synchronization performed of this data. -pub fn queue(p: P) -> (Consumer, Producer) { - unsafe { - let (a, b) = UnsafeArc::new2(State::new(p)); - (Consumer { state: a }, Producer { state: b }) - } +/// The multi-producer single-consumer structure. This is not cloneable, but it +/// may be safely shared so long as it is guaranteed that there is only one +/// popper at a time (many pushers are allowed). +pub struct Queue { + priv head: AtomicPtr>, + priv tail: *mut Node, } impl Node { @@ -110,41 +80,66 @@ impl Node { } } -impl State { - unsafe fn new(p: P) -> State { - let stub = Node::new(None); - State { +impl Queue { + /// Creates a new queue that is safe to share among multiple producers and + /// one consumer. + pub fn new() -> Queue { + let stub = unsafe { Node::new(None) }; + Queue { head: AtomicPtr::new(stub), tail: stub, - packet: p, } } - unsafe fn push(&mut self, t: T) { - let n = Node::new(Some(t)); - let prev = self.head.swap(n, AcqRel); - (*prev).next.store(n, Release); + /// Pushes a new value onto this queue. + pub fn push(&mut self, t: T) { + unsafe { + let n = Node::new(Some(t)); + let prev = self.head.swap(n, AcqRel); + (*prev).next.store(n, Release); + } } - unsafe fn pop(&mut self) -> PopResult { - let tail = self.tail; - let next = (*tail).next.load(Acquire); + /// Pops some data from this queue. + /// + /// Note that the current implementation means that this function cannot + /// return `Option`. It is possible for this queue to be in an + /// inconsistent state where many pushes have suceeded and completely + /// finished, but pops cannot return `Some(t)`. This inconsistent state + /// happens when a pusher is pre-empted at an inopportune moment. + /// + /// This inconsistent state means that this queue does indeed have data, but + /// it does not currently have access to it at this time. + pub fn pop(&mut self) -> PopResult { + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); - if !next.is_null() { - self.tail = next; - assert!((*tail).value.is_none()); - assert!((*next).value.is_some()); - let ret = (*next).value.take_unwrap(); - let _: ~Node = cast::transmute(tail); - return Data(ret); + if !next.is_null() { + self.tail = next; + assert!((*tail).value.is_none()); + assert!((*next).value.is_some()); + let ret = (*next).value.take_unwrap(); + let _: ~Node = cast::transmute(tail); + return Data(ret); + } + + if self.head.load(Acquire) == tail {Empty} else {Inconsistent} } + } - if self.head.load(Acquire) == tail {Empty} else {Inconsistent} + /// Attempts to pop data from this queue, but doesn't attempt too hard. This + /// will canonicalize inconsistent states to a `None` value. + pub fn casual_pop(&mut self) -> Option { + match self.pop() { + Data(t) => Some(t), + Empty | Inconsistent => None, + } } } #[unsafe_destructor] -impl Drop for State { +impl Drop for Queue { fn drop(&mut self) { unsafe { let mut cur = self.tail; @@ -157,58 +152,16 @@ impl Drop for State { } } -impl Producer { - /// Pushes a new value onto this queue. - pub fn push(&mut self, value: T) { - unsafe { (*self.state.get()).push(value) } - } - /// Gets an unsafe pointer to the user-defined packet shared by the - /// producers and the consumer. Note that care must be taken to ensure that - /// the lifetime of the queue outlives the usage of the returned pointer. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl Consumer { - /// Pops some data from this queue. - /// - /// Note that the current implementation means that this function cannot - /// return `Option`. It is possible for this queue to be in an - /// inconsistent state where many pushes have suceeded and completely - /// finished, but pops cannot return `Some(t)`. This inconsistent state - /// happens when a pusher is pre-empted at an inopportune moment. - /// - /// This inconsistent state means that this queue does indeed have data, but - /// it does not currently have access to it at this time. - pub fn pop(&mut self) -> PopResult { - unsafe { (*self.state.get()).pop() } - } - /// Attempts to pop data from this queue, but doesn't attempt too hard. This - /// will canonicalize inconsistent states to a `None` value. - pub fn casual_pop(&mut self) -> Option { - match self.pop() { - Data(t) => Some(t), - Empty | Inconsistent => None, - } - } - /// Gets an unsafe pointer to the underlying user-defined packet. See - /// `Producer.packet` for more information. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - #[cfg(test)] mod tests { use prelude::*; - use super::{queue, Data, Empty, Inconsistent}; + use super::{Queue, Data, Empty, Inconsistent}; use native; #[test] fn test_full() { - let (_, mut p) = queue(()); + let mut q = Queue::new(); p.push(~1); p.push(~2); } @@ -217,20 +170,20 @@ mod tests { fn test() { let nthreads = 8u; let nmsgs = 1000u; - let (mut c, p) = queue(()); + let mut q = Queue::new(); match c.pop() { Empty => {} Inconsistent | Data(..) => fail!() } let (port, chan) = SharedChan::new(); + let q = UnsafeArc::new(q); for _ in range(0, nthreads) { - let q = p.clone(); let chan = chan.clone(); + let q = q.clone(); native::task::spawn(proc() { - let mut q = q; for i in range(0, nmsgs) { - q.push(i); + unsafe { (*q.get()).push(i); } } chan.send(()); }); @@ -238,11 +191,12 @@ mod tests { let mut i = 0u; while i < nthreads * nmsgs { - match c.pop() { + match unsafe { (*q.get()).pop() } { Empty | Inconsistent => {}, Data(_) => { i += 1 } } } + drop(chan); for _ in range(0, nthreads) { port.recv(); } diff --git a/src/libstd/sync/spsc_queue.rs b/src/libstd/sync/spsc_queue.rs index 35a5846f11a..d1fde759cc1 100644 --- a/src/libstd/sync/spsc_queue.rs +++ b/src/libstd/sync/spsc_queue.rs @@ -38,7 +38,6 @@ use kinds::Send; use ops::Drop; use option::{Some, None, Option}; use ptr::RawPtr; -use sync::arc::UnsafeArc; use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release}; // Node within the linked list queue of messages to send @@ -50,75 +49,25 @@ struct Node { next: AtomicPtr>, // next node in the queue } -// The producer/consumer halves both need access to the `tail` field, and if -// they both have access to that we may as well just give them both access -// to this whole structure. -struct State { +/// The single-producer single-consumer queue. This structure is not cloneable, +/// but it can be safely shared in an UnsafeArc if it is guaranteed that there +/// is only one popper and one pusher touching the queue at any one point in +/// time. +pub struct Queue { // consumer fields - tail: *mut Node, // where to pop from - tail_prev: AtomicPtr>, // where to pop from + priv tail: *mut Node, // where to pop from + priv tail_prev: AtomicPtr>, // where to pop from // producer fields - head: *mut Node, // where to push to - first: *mut Node, // where to get new nodes from - tail_copy: *mut Node, // between first/tail + priv head: *mut Node, // where to push to + priv first: *mut Node, // where to get new nodes from + priv tail_copy: *mut Node, // between first/tail // Cache maintenance fields. Additions and subtractions are stored // separately in order to allow them to use nonatomic addition/subtraction. - cache_bound: uint, - cache_additions: AtomicUint, - cache_subtractions: AtomicUint, - - packet: P, -} - -/// Producer half of this queue. This handle is used to push data to the -/// consumer. -pub struct Producer { - priv state: UnsafeArc>, -} - -/// Consumer half of this queue. This handle is used to receive data from the -/// producer. -pub struct Consumer { - priv state: UnsafeArc>, -} - -/// Creates a new queue. The producer returned is connected to the consumer to -/// push all data to the consumer. -/// -/// # Arguments -/// -/// * `bound` - This queue implementation is implemented with a linked list, -/// and this means that a push is always a malloc. In order to -/// amortize this cost, an internal cache of nodes is maintained -/// to prevent a malloc from always being necessary. This bound is -/// the limit on the size of the cache (if desired). If the value -/// is 0, then the cache has no bound. Otherwise, the cache will -/// never grow larger than `bound` (although the queue itself -/// could be much larger. -/// -/// * `p` - This is the user-defined packet of data which will also be shared -/// between the producer and consumer. -pub fn queue(bound: uint, - p: P) -> (Consumer, Producer) -{ - let n1 = Node::new(); - let n2 = Node::new(); - unsafe { (*n1).next.store(n2, Relaxed) } - let state = State { - tail: n2, - tail_prev: AtomicPtr::new(n1), - head: n2, - first: n1, - tail_copy: n1, - cache_bound: bound, - cache_additions: AtomicUint::new(0), - cache_subtractions: AtomicUint::new(0), - packet: p, - }; - let (arc1, arc2) = UnsafeArc::new2(state); - (Consumer { state: arc1 }, Producer { state: arc2 }) + priv cache_bound: uint, + priv cache_additions: AtomicUint, + priv cache_subtractions: AtomicUint, } impl Node { @@ -132,49 +81,49 @@ impl Node { } } -impl Producer { - /// Pushes data onto the queue +impl Queue { + /// Creates a new queue. The producer returned is connected to the consumer + /// to push all data to the consumer. + /// + /// # Arguments + /// + /// * `bound` - This queue implementation is implemented with a linked + /// list, and this means that a push is always a malloc. In + /// order to amortize this cost, an internal cache of nodes is + /// maintained to prevent a malloc from always being + /// necessary. This bound is the limit on the size of the + /// cache (if desired). If the value is 0, then the cache has + /// no bound. Otherwise, the cache will never grow larger than + /// `bound` (although the queue itself could be much larger. + pub fn new(bound: uint) -> Queue { + let n1 = Node::new(); + let n2 = Node::new(); + unsafe { (*n1).next.store(n2, Relaxed) } + Queue { + tail: n2, + tail_prev: AtomicPtr::new(n1), + head: n2, + first: n1, + tail_copy: n1, + cache_bound: bound, + cache_additions: AtomicUint::new(0), + cache_subtractions: AtomicUint::new(0), + } + } + + /// Pushes a new value onto this queue. Note that to use this function + /// safely, it must be externally guaranteed that there is only one pusher. pub fn push(&mut self, t: T) { - unsafe { (*self.state.get()).push(t) } - } - /// Tests whether the queue is empty. Note that if this function returns - /// `false`, the return value is significant, but if the return value is - /// `true` then almost no meaning can be attached to the return value. - pub fn is_empty(&self) -> bool { - unsafe { (*self.state.get()).is_empty() } - } - /// Acquires an unsafe pointer to the underlying user-defined packet. Note - /// that care must be taken to ensure that the queue outlives the usage of - /// the packet (because it is an unsafe pointer). - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl Consumer { - /// Pops some data from this queue, returning `None` when the queue is - /// empty. - pub fn pop(&mut self) -> Option { - unsafe { (*self.state.get()).pop() } - } - /// Same function as the producer's `packet` method. - pub unsafe fn packet(&self) -> *mut P { - &mut (*self.state.get()).packet as *mut P - } -} - -impl State { - // remember that there is only one thread executing `push` (and only one - // thread executing `pop`) - unsafe fn push(&mut self, t: T) { - // Acquire a node (which either uses a cached one or allocates a new - // one), and then append this to the 'head' node. - let n = self.alloc(); - assert!((*n).value.is_none()); - (*n).value = Some(t); - (*n).next.store(0 as *mut Node, Relaxed); - (*self.head).next.store(n, Release); - self.head = n; + unsafe { + // Acquire a node (which either uses a cached one or allocates a new + // one), and then append this to the 'head' node. + let n = self.alloc(); + assert!((*n).value.is_none()); + (*n).value = Some(t); + (*n).next.store(0 as *mut Node, Relaxed); + (*self.head).next.store(n, Release); + self.head = n; + } } unsafe fn alloc(&mut self) -> *mut Node { @@ -208,50 +157,57 @@ impl State { Node::new() } - // remember that there is only one thread executing `pop` (and only one - // thread executing `push`) - unsafe fn pop(&mut self) -> Option { - // The `tail` node is not actually a used node, but rather a - // sentinel from where we should start popping from. Hence, look at - // tail's next field and see if we can use it. If we do a pop, then - // the current tail node is a candidate for going into the cache. - let tail = self.tail; - let next = (*tail).next.load(Acquire); - if next.is_null() { return None } - assert!((*next).value.is_some()); - let ret = (*next).value.take(); + /// Attempts to pop a value from this queue. Remember that to use this type + /// safely you must ensure that there is only one popper at a time. + pub fn pop(&mut self) -> Option { + unsafe { + // The `tail` node is not actually a used node, but rather a + // sentinel from where we should start popping from. Hence, look at + // tail's next field and see if we can use it. If we do a pop, then + // the current tail node is a candidate for going into the cache. + let tail = self.tail; + let next = (*tail).next.load(Acquire); + if next.is_null() { return None } + assert!((*next).value.is_some()); + let ret = (*next).value.take(); - self.tail = next; - if self.cache_bound == 0 { - self.tail_prev.store(tail, Release); - } else { - // FIXME: this is dubious with overflow. - let additions = self.cache_additions.load(Relaxed); - let subtractions = self.cache_subtractions.load(Relaxed); - let size = additions - subtractions; - - if size < self.cache_bound { + self.tail = next; + if self.cache_bound == 0 { self.tail_prev.store(tail, Release); - self.cache_additions.store(additions + 1, Relaxed); } else { - (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); - // We have successfully erased all references to 'tail', so - // now we can safely drop it. - let _: ~Node = cast::transmute(tail); + // FIXME: this is dubious with overflow. + let additions = self.cache_additions.load(Relaxed); + let subtractions = self.cache_subtractions.load(Relaxed); + let size = additions - subtractions; + + if size < self.cache_bound { + self.tail_prev.store(tail, Release); + self.cache_additions.store(additions + 1, Relaxed); + } else { + (*self.tail_prev.load(Relaxed)).next.store(next, Relaxed); + // We have successfully erased all references to 'tail', so + // now we can safely drop it. + let _: ~Node = cast::transmute(tail); + } } + return ret; } - return ret; } - unsafe fn is_empty(&self) -> bool { - let tail = self.tail; - let next = (*tail).next.load(Acquire); - return next.is_null(); + /// Tests whether this queue is empty or not. Remember that there can only + /// be one tester/popper, and also keep in mind that the answer returned + /// from this is likely to change if it is `false`. + pub fn is_empty(&self) -> bool { + unsafe { + let tail = self.tail; + let next = (*tail).next.load(Acquire); + return next.is_null(); + } } } #[unsafe_destructor] -impl Drop for State { +impl Drop for Queue { fn drop(&mut self) { unsafe { let mut cur = self.first; @@ -267,44 +223,44 @@ impl Drop for State { #[cfg(test)] mod test { use prelude::*; - use super::queue; + use super::Queue; use native; #[test] fn smoke() { - let (mut c, mut p) = queue(0, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(0); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] fn drop_full() { - let (_, mut p) = queue(0, ()); - p.push(~1); - p.push(~2); + let mut q = Queue::new(0); + q.push(~1); + q.push(~2); } #[test] fn smoke_bound() { - let (mut c, mut p) = queue(1, ()); - p.push(1); - p.push(2); - assert_eq!(c.pop(), Some(1)); - assert_eq!(c.pop(), Some(2)); - assert_eq!(c.pop(), None); - p.push(3); - p.push(4); - assert_eq!(c.pop(), Some(3)); - assert_eq!(c.pop(), Some(4)); - assert_eq!(c.pop(), None); + let mut q = Queue::new(1); + q.push(1); + q.push(2); + assert_eq!(q.pop(), Some(1)); + assert_eq!(q.pop(), Some(2)); + assert_eq!(q.pop(), None); + q.push(3); + q.push(4); + assert_eq!(q.pop(), Some(3)); + assert_eq!(q.pop(), Some(4)); + assert_eq!(q.pop(), None); } #[test] @@ -313,13 +269,13 @@ mod test { stress_bound(1); fn stress_bound(bound: uint) { - let (c, mut p) = queue(bound, ()); + let (a, b) = UnsafeArc::new2(Queue::new(bound)); let (port, chan) = Chan::new(); native::task::spawn(proc() { let mut c = c; for _ in range(0, 100000) { loop { - match c.pop() { + match unsafe { (*b.get()).pop() } { Some(1) => break, Some(_) => fail!(), None => {} @@ -329,7 +285,7 @@ mod test { chan.send(()); }); for _ in range(0, 100000) { - p.push(1); + unsafe { (*a.get()).push(1); } } port.recv(); }