From 24631c8bcd294c07f9c9779628ac2da761d52d6f Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 16 Jan 2014 19:53:42 -0800 Subject: [PATCH] extra: Add an intrusive MPSC to be used soon --- src/etc/licenseck.py | 1 + src/libextra/sync/mpsc_intrusive.rs | 139 ++++++++++++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 src/libextra/sync/mpsc_intrusive.rs diff --git a/src/etc/licenseck.py b/src/etc/licenseck.py index b5a721c03ff..afbf34d0753 100644 --- a/src/etc/licenseck.py +++ b/src/etc/licenseck.py @@ -41,6 +41,7 @@ exceptions = [ "libstd/sync/mpsc_queue.rs", # BSD "libstd/sync/spsc_queue.rs", # BSD "libstd/sync/mpmc_bounded_queue.rs", # BSD + "libextra/sync/mpsc_intrusive.rs", # BSD ] def check_license(name, contents): diff --git a/src/libextra/sync/mpsc_intrusive.rs b/src/libextra/sync/mpsc_intrusive.rs new file mode 100644 index 00000000000..0f13a4980d9 --- /dev/null +++ b/src/libextra/sync/mpsc_intrusive.rs @@ -0,0 +1,139 @@ +/* Copyright (c) 2010-2011 Dmitry Vyukov. All rights reserved. + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY DMITRY VYUKOV "AS IS" AND ANY EXPRESS OR IMPLIED + * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO + * EVENT SHALL DMITRY VYUKOV OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, + * OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING + * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, + * EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + * The views and conclusions contained in the software and documentation are + * those of the authors and should not be interpreted as representing official + * policies, either expressed or implied, of Dmitry Vyukov. + */ + +//! A mostly lock-free multi-producer, single consumer queue. +//! +//! This module implements an intrusive MPSC queue. This queue is incredibly +//! unsafe (due to use of unsafe pointers for nodes), and hence is not public. + +// http://www.1024cores.net/home/lock-free-algorithms +// /queues/intrusive-mpsc-node-based-queue + +use std::cast; +use std::sync::atomics; + +// NB: all links are done as AtomicUint instead of AtomicPtr to allow for static +// initialization. + +pub struct Node { + next: atomics::AtomicUint, + data: T, +} + +pub struct DummyNode { + next: atomics::AtomicUint, +} + +pub struct Queue { + head: atomics::AtomicUint, + tail: *mut Node, + stub: DummyNode, +} + +impl Queue { + pub fn new() -> Queue { + Queue { + head: atomics::AtomicUint::new(0), + tail: 0 as *mut Node, + stub: DummyNode { + next: atomics::AtomicUint::new(0), + }, + } + } + + pub unsafe fn push(&mut self, node: *mut Node) { + (*node).next.store(0, atomics::Release); + let prev = self.head.swap(node as uint, atomics::AcqRel); + + // Note that this code is slightly modified to allow static + // initialization of these queues with rust's flavor of static + // initialization. + if prev == 0 { + self.stub.next.store(node as uint, atomics::Release); + } else { + let prev = prev as *mut Node; + (*prev).next.store(node as uint, atomics::Release); + } + } + + /// You'll note that the other MPSC queue in std::sync is non-intrusive and + /// returns a `PopResult` here to indicate when the queue is inconsistent. + /// An "inconsistent state" in the other queue means that a pusher has + /// pushed, but it hasn't finished linking the rest of the chain. + /// + /// This queue also suffers from this problem, but I currently haven't been + /// able to detangle when this actually happens. This code is translated + /// verbatim from the website above, and is more complicated than the + /// non-intrusive version. + /// + /// Right now consumers of this queue must be ready for this fact. Just + /// because `pop` returns `None` does not mean that there is not data + /// on the queue. + pub unsafe fn pop(&mut self) -> Option<*mut Node> { + let tail = self.tail; + let mut tail = if !tail.is_null() {tail} else { + cast::transmute(&self.stub) + }; + let mut next = (*tail).next(atomics::Relaxed); + if tail as uint == &self.stub as *DummyNode as uint { + if next.is_null() { + return None; + } + self.tail = next; + tail = next; + next = (*next).next(atomics::Relaxed); + } + if !next.is_null() { + self.tail = next; + return Some(tail); + } + let head = self.head.load(atomics::Acquire) as *mut Node; + if tail != head { + return None; + } + let stub = cast::transmute(&self.stub); + self.push(stub); + next = (*tail).next(atomics::Relaxed); + if !next.is_null() { + self.tail = next; + return Some(tail); + } + return None + } +} + +impl Node { + pub fn new(t: T) -> Node { + Node { + data: t, + next: atomics::AtomicUint::new(0), + } + } + pub unsafe fn next(&mut self, ord: atomics::Ordering) -> *mut Node { + cast::transmute::>(self.next.load(ord)) + } +}