auto merge of #14301 : alexcrichton/rust/remove-unsafe-arc, r=brson

This type can be built with `Arc<Unsafe<T>>` now that liballoc exists.
This commit is contained in:
bors 2014-05-21 17:31:29 -07:00
commit 257a73ce82
24 changed files with 280 additions and 477 deletions

View File

@ -15,6 +15,8 @@
//! This implementation is also used as the fallback implementation of an event
//! loop if no other one is provided (and M:N scheduling is desired).
use alloc::arc::Arc;
use std::sync::atomics;
use std::mem;
use std::rt::rtio::{EventLoop, IoFactory, RemoteCallback};
use std::rt::rtio::{PausableIdleCallback, Callback};
@ -27,10 +29,11 @@ pub fn event_loop() -> Box<EventLoop:Send> {
struct BasicLoop {
work: Vec<proc():Send>, // pending work
idle: Option<*mut BasicPausable>, // only one is allowed
remotes: Vec<(uint, Box<Callback:Send>)>,
next_remote: uint,
messages: Exclusive<Vec<Message>>,
idle: Option<Box<Callback:Send>>,
idle_active: Option<Arc<atomics::AtomicBool>>,
}
enum Message { RunRemote(uint), RemoveRemote(uint) }
@ -40,6 +43,7 @@ impl BasicLoop {
BasicLoop {
work: vec![],
idle: None,
idle_active: None,
next_remote: 0,
remotes: vec![],
messages: Exclusive::new(vec![]),
@ -92,20 +96,18 @@ impl BasicLoop {
/// Run the idle callback if one is registered
fn idle(&mut self) {
unsafe {
match self.idle {
Some(idle) => {
if (*idle).active {
(*idle).work.call();
}
match self.idle {
Some(ref mut idle) => {
if self.idle_active.get_ref().load(atomics::SeqCst) {
idle.call();
}
None => {}
}
None => {}
}
}
fn has_idle(&self) -> bool {
unsafe { self.idle.is_some() && (**self.idle.get_ref()).active }
self.idle.is_some() && self.idle_active.get_ref().load(atomics::SeqCst)
}
}
@ -141,13 +143,11 @@ impl EventLoop for BasicLoop {
// FIXME: Seems like a really weird requirement to have an event loop provide.
fn pausable_idle_callback(&mut self, cb: Box<Callback:Send>)
-> Box<PausableIdleCallback:Send> {
let callback = box BasicPausable::new(self, cb);
rtassert!(self.idle.is_none());
unsafe {
let cb_ptr: &*mut BasicPausable = mem::transmute(&callback);
self.idle = Some(*cb_ptr);
}
callback as Box<PausableIdleCallback:Send>
self.idle = Some(cb);
let a = Arc::new(atomics::AtomicBool::new(true));
self.idle_active = Some(a.clone());
box BasicPausable { active: a } as Box<PausableIdleCallback:Send>
}
fn remote_callback(&mut self, f: Box<Callback:Send>)
@ -196,35 +196,21 @@ impl Drop for BasicRemote {
}
struct BasicPausable {
eloop: *mut BasicLoop,
work: Box<Callback:Send>,
active: bool,
}
impl BasicPausable {
fn new(eloop: &mut BasicLoop, cb: Box<Callback:Send>) -> BasicPausable {
BasicPausable {
active: false,
work: cb,
eloop: eloop,
}
}
active: Arc<atomics::AtomicBool>,
}
impl PausableIdleCallback for BasicPausable {
fn pause(&mut self) {
self.active = false;
self.active.store(false, atomics::SeqCst);
}
fn resume(&mut self) {
self.active = true;
self.active.store(true, atomics::SeqCst);
}
}
impl Drop for BasicPausable {
fn drop(&mut self) {
unsafe {
(*self.eloop).idle = None;
}
self.active.store(false, atomics::SeqCst);
}
}

View File

@ -214,7 +214,9 @@
#[cfg(test)] extern crate rustuv;
extern crate rand;
extern crate libc;
extern crate alloc;
use alloc::arc::Arc;
use std::mem::replace;
use std::os;
use std::rt::rtio;
@ -223,7 +225,6 @@ use std::rt;
use std::sync::atomics::{SeqCst, AtomicUint, INIT_ATOMIC_UINT};
use std::sync::deque;
use std::task::TaskOpts;
use std::sync::arc::UnsafeArc;
use sched::{Shutdown, Scheduler, SchedHandle, TaskFromFriend, NewNeighbor};
use sleeper_list::SleeperList;
@ -375,7 +376,7 @@ pub struct SchedPool {
/// sending on a channel once the entire pool has been drained of all tasks.
#[deriving(Clone)]
struct TaskState {
cnt: UnsafeArc<AtomicUint>,
cnt: Arc<AtomicUint>,
done: Sender<()>,
}
@ -434,7 +435,6 @@ impl SchedPool {
pool.sleepers.clone(),
pool.task_state.clone());
pool.handles.push(sched.make_handle());
let sched = sched;
pool.threads.push(Thread::start(proc() { sched.bootstrap(); }));
}
@ -496,7 +496,6 @@ impl SchedPool {
self.task_state.clone());
let ret = sched.make_handle();
self.handles.push(sched.make_handle());
let sched = sched;
self.threads.push(Thread::start(proc() { sched.bootstrap() }));
return ret;
@ -537,21 +536,21 @@ impl TaskState {
fn new() -> (Receiver<()>, TaskState) {
let (tx, rx) = channel();
(rx, TaskState {
cnt: UnsafeArc::new(AtomicUint::new(0)),
cnt: Arc::new(AtomicUint::new(0)),
done: tx,
})
}
fn increment(&mut self) {
unsafe { (*self.cnt.get()).fetch_add(1, SeqCst); }
self.cnt.fetch_add(1, SeqCst);
}
fn active(&self) -> bool {
unsafe { (*self.cnt.get()).load(SeqCst) != 0 }
self.cnt.load(SeqCst) != 0
}
fn decrement(&mut self) {
let prev = unsafe { (*self.cnt.get()).fetch_sub(1, SeqCst) };
let prev = self.cnt.fetch_sub(1, SeqCst);
if prev == 1 {
self.done.send(());
}

View File

@ -8,8 +8,9 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use alloc::arc::Arc;
use mpsc = std::sync::mpsc_queue;
use std::sync::arc::UnsafeArc;
use std::kinds::marker;
pub enum PopResult<T> {
Inconsistent,
@ -18,29 +19,32 @@ pub enum PopResult<T> {
}
pub fn queue<T: Send>() -> (Consumer<T>, Producer<T>) {
let (a, b) = UnsafeArc::new2(mpsc::Queue::new());
(Consumer { inner: a }, Producer { inner: b })
let a = Arc::new(mpsc::Queue::new());
(Consumer { inner: a.clone(), noshare: marker::NoShare },
Producer { inner: a, noshare: marker::NoShare })
}
pub struct Producer<T> {
inner: UnsafeArc<mpsc::Queue<T>>,
inner: Arc<mpsc::Queue<T>>,
noshare: marker::NoShare,
}
pub struct Consumer<T> {
inner: UnsafeArc<mpsc::Queue<T>>,
inner: Arc<mpsc::Queue<T>>,
noshare: marker::NoShare,
}
impl<T: Send> Consumer<T> {
pub fn pop(&mut self) -> PopResult<T> {
match unsafe { (*self.inner.get()).pop() } {
pub fn pop(&self) -> PopResult<T> {
match self.inner.pop() {
mpsc::Inconsistent => Inconsistent,
mpsc::Empty => Empty,
mpsc::Data(t) => Data(t),
}
}
pub fn casual_pop(&mut self) -> Option<T> {
match unsafe { (*self.inner.get()).pop() } {
pub fn casual_pop(&self) -> Option<T> {
match self.inner.pop() {
mpsc::Inconsistent => None,
mpsc::Empty => None,
mpsc::Data(t) => Some(t),
@ -49,13 +53,13 @@ impl<T: Send> Consumer<T> {
}
impl<T: Send> Producer<T> {
pub fn push(&mut self, t: T) {
unsafe { (*self.inner.get()).push(t); }
pub fn push(&self, t: T) {
self.inner.push(t);
}
}
impl<T: Send> Clone for Producer<T> {
fn clone(&self) -> Producer<T> {
Producer { inner: self.inner.clone() }
Producer { inner: self.inner.clone(), noshare: marker::NoShare }
}
}

View File

@ -1142,7 +1142,7 @@ mod test {
Thread::start(proc() {
let sleepers = SleeperList::new();
let mut pool = BufferPool::new();
let pool = BufferPool::new();
let (normal_worker, normal_stealer) = pool.deque();
let (special_worker, special_stealer) = pool.deque();
let queues = vec![normal_stealer, special_stealer];

View File

@ -10,6 +10,7 @@
//! Blocking posix-based file I/O
use alloc::arc::Arc;
use libc::{c_int, c_void};
use libc;
use std::c_str::CString;
@ -17,7 +18,6 @@ use std::io::IoError;
use std::io;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use io::{IoResult, retry, keep_going};
@ -29,7 +29,7 @@ struct Inner {
}
pub struct FileDesc {
inner: UnsafeArc<Inner>
inner: Arc<Inner>
}
impl FileDesc {
@ -42,7 +42,7 @@ impl FileDesc {
/// Note that all I/O operations done on this object will be *blocking*, but
/// they do not require the runtime to be active.
pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc {
FileDesc { inner: UnsafeArc::new(Inner {
FileDesc { inner: Arc::new(Inner {
fd: fd,
close_on_drop: close_on_drop
}) }
@ -79,11 +79,7 @@ impl FileDesc {
}
}
pub fn fd(&self) -> fd_t {
// This unsafety is fine because we're just reading off the file
// descriptor, no one is modifying this.
unsafe { (*self.inner.get()).fd }
}
pub fn fd(&self) -> fd_t { self.inner.fd }
}
impl io::Reader for FileDesc {

View File

@ -10,17 +10,17 @@
//! Blocking win32-based file I/O
use alloc::arc::Arc;
use libc::{c_int, c_void};
use libc;
use std::c_str::CString;
use std::io::IoError;
use std::io;
use libc::{c_int, c_void};
use libc;
use std::mem;
use std::os::win32::{as_utf16_p, fill_utf16_buf_and_decode};
use std::ptr;
use std::rt::rtio;
use std::str;
use std::sync::arc::UnsafeArc;
use std::vec;
use io::IoResult;
@ -33,7 +33,7 @@ struct Inner {
}
pub struct FileDesc {
inner: UnsafeArc<Inner>
inner: Arc<Inner>
}
impl FileDesc {
@ -46,7 +46,7 @@ impl FileDesc {
/// Note that all I/O operations done on this object will be *blocking*, but
/// they do not require the runtime to be active.
pub fn new(fd: fd_t, close_on_drop: bool) -> FileDesc {
FileDesc { inner: UnsafeArc::new(Inner {
FileDesc { inner: Arc::new(Inner {
fd: fd,
close_on_drop: close_on_drop
}) }
@ -85,11 +85,7 @@ impl FileDesc {
Ok(())
}
pub fn fd(&self) -> fd_t {
// This unsafety is fine because we're just reading off the file
// descriptor, no one is modifying this.
unsafe { (*self.inner.get()).fd }
}
pub fn fd(&self) -> fd_t { self.inner.fd }
pub fn handle(&self) -> libc::HANDLE {
unsafe { libc::get_osfhandle(self.fd()) as libc::HANDLE }

View File

@ -8,12 +8,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use alloc::arc::Arc;
use libc;
use std::io::net::ip;
use std::io;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::unstable::mutex;
use super::{IoResult, retry, keep_going};
@ -235,7 +235,7 @@ pub fn init() {
////////////////////////////////////////////////////////////////////////////////
pub struct TcpStream {
inner: UnsafeArc<Inner>,
inner: Arc<Inner>,
read_deadline: u64,
write_deadline: u64,
}
@ -282,16 +282,13 @@ impl TcpStream {
fn new(inner: Inner) -> TcpStream {
TcpStream {
inner: UnsafeArc::new(inner),
inner: Arc::new(inner),
read_deadline: 0,
write_deadline: 0,
}
}
pub fn fd(&self) -> sock_t {
// This unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
}
pub fn fd(&self) -> sock_t { self.inner.fd }
fn set_nodelay(&mut self, nodelay: bool) -> IoResult<()> {
setsockopt(self.fd(), libc::IPPROTO_TCP, libc::TCP_NODELAY,
@ -329,7 +326,7 @@ impl TcpStream {
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { (*self.inner.get()).lock.lock() },
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret
@ -536,7 +533,7 @@ impl rtio::RtioTcpAcceptor for TcpAcceptor {
////////////////////////////////////////////////////////////////////////////////
pub struct UdpSocket {
inner: UnsafeArc<Inner>,
inner: Arc<Inner>,
read_deadline: u64,
write_deadline: u64,
}
@ -545,7 +542,7 @@ impl UdpSocket {
pub fn bind(addr: ip::SocketAddr) -> IoResult<UdpSocket> {
let fd = try!(socket(addr, libc::SOCK_DGRAM));
let ret = UdpSocket {
inner: UnsafeArc::new(Inner::new(fd)),
inner: Arc::new(Inner::new(fd)),
read_deadline: 0,
write_deadline: 0,
};
@ -560,10 +557,7 @@ impl UdpSocket {
}
}
pub fn fd(&self) -> sock_t {
// unsafety is fine because it's just a read-only arc
unsafe { (*self.inner.get()).fd }
}
pub fn fd(&self) -> sock_t { self.inner.fd }
pub fn set_broadcast(&mut self, on: bool) -> IoResult<()> {
setsockopt(self.fd(), libc::SOL_SOCKET, libc::SO_BROADCAST,
@ -603,7 +597,7 @@ impl UdpSocket {
fn lock_nonblocking<'a>(&'a self) -> Guard<'a> {
let ret = Guard {
fd: self.fd(),
guard: unsafe { (*self.inner.get()).lock.lock() },
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret

View File

@ -8,13 +8,13 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use alloc::arc::Arc;
use libc;
use std::c_str::CString;
use std::intrinsics;
use std::io;
use std::mem;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::unstable::mutex;
use super::{IoResult, retry};
@ -108,7 +108,7 @@ fn bind(addr: &CString, ty: libc::c_int) -> IoResult<Inner> {
////////////////////////////////////////////////////////////////////////////////
pub struct UnixStream {
inner: UnsafeArc<Inner>,
inner: Arc<Inner>,
read_deadline: u64,
write_deadline: u64,
}
@ -117,11 +117,11 @@ impl UnixStream {
pub fn connect(addr: &CString,
timeout: Option<u64>) -> IoResult<UnixStream> {
connect(addr, libc::SOCK_STREAM, timeout).map(|inner| {
UnixStream::new(UnsafeArc::new(inner))
UnixStream::new(Arc::new(inner))
})
}
fn new(inner: UnsafeArc<Inner>) -> UnixStream {
fn new(inner: Arc<Inner>) -> UnixStream {
UnixStream {
inner: inner,
read_deadline: 0,
@ -129,7 +129,7 @@ impl UnixStream {
}
}
fn fd(&self) -> fd_t { unsafe { (*self.inner.get()).fd } }
fn fd(&self) -> fd_t { self.inner.fd }
#[cfg(target_os = "linux")]
fn lock_nonblocking(&self) {}
@ -138,7 +138,7 @@ impl UnixStream {
fn lock_nonblocking<'a>(&'a self) -> net::Guard<'a> {
let ret = net::Guard {
fd: self.fd(),
guard: unsafe { (*self.inner.get()).lock.lock() },
guard: unsafe { self.inner.lock.lock() },
};
assert!(util::set_nonblocking(self.fd(), true).is_ok());
ret
@ -254,7 +254,7 @@ impl UnixAcceptor {
&mut size as *mut libc::socklen_t) as libc::c_int
}) {
-1 => Err(super::last_error()),
fd => Ok(UnixStream::new(UnsafeArc::new(Inner::new(fd))))
fd => Ok(UnixStream::new(Arc::new(Inner::new(fd))))
}
}
}

View File

@ -84,6 +84,7 @@
//! the test suite passing (the suite is in libstd), and that's good enough for
//! me!
use alloc::arc::Arc;
use libc;
use std::c_str::CString;
use std::io;
@ -92,7 +93,6 @@ use std::os::win32::as_utf16_p;
use std::os;
use std::ptr;
use std::rt::rtio;
use std::sync::arc::UnsafeArc;
use std::sync::atomics;
use std::unstable::mutex;
@ -195,7 +195,7 @@ pub fn await(handle: libc::HANDLE, deadline: u64,
////////////////////////////////////////////////////////////////////////////////
pub struct UnixStream {
inner: UnsafeArc<Inner>,
inner: Arc<Inner>,
write: Option<Event>,
read: Option<Event>,
read_deadline: u64,
@ -273,7 +273,7 @@ impl UnixStream {
Err(super::last_error())
} else {
Ok(UnixStream {
inner: UnsafeArc::new(inner),
inner: Arc::new(inner),
read: None,
write: None,
read_deadline: 0,
@ -317,14 +317,14 @@ impl UnixStream {
})
}
fn handle(&self) -> libc::HANDLE { unsafe { (*self.inner.get()).handle } }
fn handle(&self) -> libc::HANDLE { self.inner.handle }
fn read_closed(&self) -> bool {
unsafe { (*self.inner.get()).read_closed.load(atomics::SeqCst) }
self.inner.read_closed.load(atomics::SeqCst)
}
fn write_closed(&self) -> bool {
unsafe { (*self.inner.get()).write_closed.load(atomics::SeqCst) }
self.inner.write_closed.load(atomics::SeqCst)
}
fn cancel_io(&self) -> IoResult<()> {
@ -353,7 +353,7 @@ impl rtio::RtioPipe for UnixStream {
// acquire the lock.
//
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { (*self.inner.get()).lock.lock() };
let guard = unsafe { self.inner.lock.lock() };
if self.read_closed() {
return Err(io::standard_error(io::EndOfFile))
}
@ -429,7 +429,7 @@ impl rtio::RtioPipe for UnixStream {
// going after we woke up.
//
// See comments in close_read() about why this lock is necessary.
let guard = unsafe { (*self.inner.get()).lock.lock() };
let guard = unsafe { self.inner.lock.lock() };
if self.write_closed() {
return Err(io::standard_error(io::BrokenPipe))
}
@ -514,15 +514,15 @@ impl rtio::RtioPipe for UnixStream {
// close_read() between steps 1 and 2. By atomically executing steps 1
// and 2 with a lock with respect to close_read(), we're guaranteed that
// no thread will erroneously sit in a read forever.
let _guard = unsafe { (*self.inner.get()).lock.lock() };
unsafe { (*self.inner.get()).read_closed.store(true, atomics::SeqCst) }
let _guard = unsafe { self.inner.lock.lock() };
self.inner.read_closed.store(true, atomics::SeqCst);
self.cancel_io()
}
fn close_write(&mut self) -> IoResult<()> {
// see comments in close_read() for why this lock is necessary
let _guard = unsafe { (*self.inner.get()).lock.lock() };
unsafe { (*self.inner.get()).write_closed.store(true, atomics::SeqCst) }
let _guard = unsafe { self.inner.lock.lock() };
self.inner.write_closed.store(true, atomics::SeqCst);
self.cancel_io()
}
@ -683,7 +683,7 @@ impl UnixAcceptor {
// Transfer ownership of our handle into this stream
Ok(UnixStream {
inner: UnsafeArc::new(Inner::new(handle)),
inner: Arc::new(Inner::new(handle)),
read: None,
write: None,
read_deadline: 0,

View File

@ -57,6 +57,7 @@
// answer is that you don't need them)
#![feature(macro_rules)]
extern crate alloc;
extern crate libc;
use std::os;

View File

@ -14,15 +14,16 @@
/// It is assumed that all invocations of this struct happen on the same thread
/// (the uv event loop).
use alloc::arc::Arc;
use std::mem;
use std::rt::local::Local;
use std::rt::task::{BlockedTask, Task};
use std::sync::arc::UnsafeArc;
use std::ty::Unsafe;
use homing::HomingMissile;
pub struct Access {
inner: UnsafeArc<Inner>,
inner: Arc<Unsafe<Inner>>,
}
pub struct Guard<'a> {
@ -39,11 +40,11 @@ struct Inner {
impl Access {
pub fn new() -> Access {
Access {
inner: UnsafeArc::new(Inner {
inner: Arc::new(Unsafe::new(Inner {
queue: vec![],
held: false,
closed: false,
})
}))
}
}

View File

@ -46,6 +46,7 @@ via `close` and `delete` methods.
#[cfg(test)] extern crate green;
#[cfg(test)] extern crate realrustuv = "rustuv";
extern crate libc;
extern crate alloc;
use libc::{c_int, c_void};
use std::fmt;

View File

@ -20,10 +20,10 @@
#![allow(dead_code)]
use alloc::arc::Arc;
use libc::c_void;
use std::mem;
use std::rt::task::BlockedTask;
use std::sync::arc::UnsafeArc;
use std::unstable::mutex::NativeMutex;
use mpsc = std::sync::mpsc_queue;
@ -46,20 +46,20 @@ struct State {
/// This structure is intended to be stored next to the event loop, and it is
/// used to create new `Queue` structures.
pub struct QueuePool {
queue: UnsafeArc<State>,
queue: Arc<State>,
refcnt: uint,
}
/// This type is used to send messages back to the original event loop.
pub struct Queue {
queue: UnsafeArc<State>,
queue: Arc<State>,
}
extern fn async_cb(handle: *uvll::uv_async_t) {
let pool: &mut QueuePool = unsafe {
mem::transmute(uvll::get_data_for_uv_handle(handle))
};
let state: &mut State = unsafe { mem::transmute(pool.queue.get()) };
let state: &State = &*pool.queue;
// Remember that there is no guarantee about how many times an async
// callback is called with relation to the number of sends, so process the
@ -109,7 +109,7 @@ extern fn async_cb(handle: *uvll::uv_async_t) {
impl QueuePool {
pub fn new(loop_: &mut Loop) -> Box<QueuePool> {
let handle = UvHandle::alloc(None::<AsyncWatcher>, uvll::UV_ASYNC);
let state = UnsafeArc::new(State {
let state = Arc::new(State {
handle: handle,
lock: unsafe {NativeMutex::new()},
queue: mpsc::Queue::new(),
@ -132,24 +132,20 @@ impl QueuePool {
pub fn queue(&mut self) -> Queue {
unsafe {
if self.refcnt == 0 {
uvll::uv_ref((*self.queue.get()).handle);
uvll::uv_ref(self.queue.handle);
}
self.refcnt += 1;
}
Queue { queue: self.queue.clone() }
}
pub fn handle(&self) -> *uvll::uv_async_t {
unsafe { (*self.queue.get()).handle }
}
pub fn handle(&self) -> *uvll::uv_async_t { self.queue.handle }
}
impl Queue {
pub fn push(&mut self, task: BlockedTask) {
unsafe {
(*self.queue.get()).queue.push(Task(task));
uvll::uv_async_send((*self.queue.get()).handle);
}
self.queue.queue.push(Task(task));
unsafe { uvll::uv_async_send(self.queue.handle); }
}
}
@ -160,9 +156,7 @@ impl Clone for Queue {
// that the count is at least one (because we have a queue right here),
// and if the queue is dropped later on it'll see the increment for the
// decrement anyway.
unsafe {
(*self.queue.get()).queue.push(Increment);
}
self.queue.queue.push(Increment);
Queue { queue: self.queue.clone() }
}
}
@ -172,10 +166,9 @@ impl Drop for Queue {
// See the comments in the async_cb function for why there is a lock
// that is acquired only on a drop.
unsafe {
let state = self.queue.get();
let _l = (*state).lock.lock();
(*state).queue.push(Decrement);
uvll::uv_async_send((*state).handle);
let _l = self.queue.lock.lock();
self.queue.queue.push(Decrement);
uvll::uv_async_send(self.queue.handle);
}
}
}

View File

@ -16,16 +16,17 @@
/// the same underlying uv object, hence Rc is not used and this simple counter
/// should suffice.
use std::sync::arc::UnsafeArc;
use alloc::arc::Arc;
use std::ty::Unsafe;
pub struct Refcount {
rc: UnsafeArc<uint>,
rc: Arc<Unsafe<uint>>,
}
impl Refcount {
/// Creates a new refcount of 1
pub fn new() -> Refcount {
Refcount { rc: UnsafeArc::new(1) }
Refcount { rc: Arc::new(Unsafe::new(1)) }
}
fn increment(&self) {

View File

@ -271,6 +271,8 @@
// And now that you've seen all the races that I found and attempted to fix,
// here's the code for you to find some more!
use alloc::arc::Arc;
use cell::Cell;
use clone::Clone;
use iter::Iterator;
@ -283,7 +285,6 @@ use owned::Box;
use result::{Ok, Err, Result};
use rt::local::Local;
use rt::task::{Task, BlockedTask};
use sync::arc::UnsafeArc;
use ty::Unsafe;
pub use comm::select::{Select, Handle};
@ -352,7 +353,7 @@ pub struct Sender<T> {
/// The sending-half of Rust's synchronous channel type. This half can only be
/// owned by one task, but it can be cloned to send to other tasks.
pub struct SyncSender<T> {
inner: UnsafeArc<sync::Packet<T>>,
inner: Arc<Unsafe<sync::Packet<T>>>,
// can't share in an arc
marker: marker::NoShare,
}
@ -386,10 +387,10 @@ pub enum TrySendError<T> {
}
enum Flavor<T> {
Oneshot(UnsafeArc<oneshot::Packet<T>>),
Stream(UnsafeArc<stream::Packet<T>>),
Shared(UnsafeArc<shared::Packet<T>>),
Sync(UnsafeArc<sync::Packet<T>>),
Oneshot(Arc<Unsafe<oneshot::Packet<T>>>),
Stream(Arc<Unsafe<stream::Packet<T>>>),
Shared(Arc<Unsafe<shared::Packet<T>>>),
Sync(Arc<Unsafe<sync::Packet<T>>>),
}
#[doc(hidden)]
@ -435,8 +436,8 @@ impl<T> UnsafeFlavor<T> for Receiver<T> {
/// println!("{}", rx.recv());
/// ```
pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(oneshot::Packet::new());
(Sender::new(Oneshot(b)), Receiver::new(Oneshot(a)))
let a = Arc::new(Unsafe::new(oneshot::Packet::new()));
(Sender::new(Oneshot(a.clone())), Receiver::new(Oneshot(a)))
}
/// Creates a new synchronous, bounded channel.
@ -471,8 +472,8 @@ pub fn channel<T: Send>() -> (Sender<T>, Receiver<T>) {
/// assert_eq!(rx.recv(), 2);
/// ```
pub fn sync_channel<T: Send>(bound: uint) -> (SyncSender<T>, Receiver<T>) {
let (a, b) = UnsafeArc::new2(sync::Packet::new(bound));
(SyncSender::new(a), Receiver::new(Sync(b)))
let a = Arc::new(Unsafe::new(sync::Packet::new(bound)));
(SyncSender::new(a.clone()), Receiver::new(Sync(a)))
}
////////////////////////////////////////////////////////////////////////////////
@ -557,13 +558,13 @@ impl<T: Send> Sender<T> {
let (new_inner, ret) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
let p = p.get();
unsafe {
let p = p.get();
if !(*p).sent() {
return (*p).send(t);
} else {
let (a, b) = UnsafeArc::new2(stream::Packet::new());
match (*p).upgrade(Receiver::new(Stream(b))) {
let a = Arc::new(Unsafe::new(stream::Packet::new()));
match (*p).upgrade(Receiver::new(Stream(a.clone()))) {
oneshot::UpSuccess => {
let ret = (*a.get()).send(t);
(a, ret)
@ -598,17 +599,21 @@ impl<T: Send> Clone for Sender<T> {
fn clone(&self) -> Sender<T> {
let (packet, sleeper) = match *unsafe { self.inner() } {
Oneshot(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new());
match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
oneshot::UpSuccess | oneshot::UpDisconnected => (b, None),
oneshot::UpWoke(task) => (b, Some(task))
let a = Arc::new(Unsafe::new(shared::Packet::new()));
match unsafe {
(*p.get()).upgrade(Receiver::new(Shared(a.clone())))
} {
oneshot::UpSuccess | oneshot::UpDisconnected => (a, None),
oneshot::UpWoke(task) => (a, Some(task))
}
}
Stream(ref p) => {
let (a, b) = UnsafeArc::new2(shared::Packet::new());
match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } {
stream::UpSuccess | stream::UpDisconnected => (b, None),
stream::UpWoke(task) => (b, Some(task)),
let a = Arc::new(Unsafe::new(shared::Packet::new()));
match unsafe {
(*p.get()).upgrade(Receiver::new(Shared(a.clone())))
} {
stream::UpSuccess | stream::UpDisconnected => (a, None),
stream::UpWoke(task) => (a, Some(task)),
}
}
Shared(ref p) => {
@ -645,7 +650,7 @@ impl<T: Send> Drop for Sender<T> {
////////////////////////////////////////////////////////////////////////////////
impl<T: Send> SyncSender<T> {
fn new(inner: UnsafeArc<sync::Packet<T>>) -> SyncSender<T> {
fn new(inner: Arc<Unsafe<sync::Packet<T>>>) -> SyncSender<T> {
SyncSender { inner: inner, marker: marker::NoShare }
}

View File

@ -15,7 +15,7 @@
/// this type is to have one and exactly one allocation when the chan/port pair
/// is created.
///
/// Another possible optimization would be to not use an UnsafeArc box because
/// Another possible optimization would be to not use an Arc box because
/// in theory we know when the shared packet can be deallocated (no real need
/// for the atomic reference counting), but I was having trouble how to destroy
/// the data early in a drop of a Port.

View File

@ -13,6 +13,8 @@
//! local storage, and logging. Even a 'freestanding' Rust would likely want
//! to implement this.
use alloc::arc::Arc;
use cleanup;
use clone::Clone;
use comm::Sender;
@ -32,7 +34,6 @@ use rt::local_heap::LocalHeap;
use rt::rtio::LocalIo;
use rt::unwind::Unwinder;
use str::SendStr;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicUint, SeqCst};
use task::{TaskResult, TaskOpts};
use unstable::finally::Finally;
@ -65,7 +66,7 @@ pub struct LocalStorage(pub Option<local_data::Map>);
/// at any time.
pub enum BlockedTask {
Owned(Box<Task>),
Shared(UnsafeArc<AtomicUint>),
Shared(Arc<AtomicUint>),
}
pub enum DeathAction {
@ -82,7 +83,7 @@ pub struct Death {
}
pub struct BlockedTasks {
inner: UnsafeArc<AtomicUint>,
inner: Arc<AtomicUint>,
}
impl Task {
@ -313,10 +314,10 @@ impl BlockedTask {
pub fn wake(self) -> Option<Box<Task>> {
match self {
Owned(task) => Some(task),
Shared(arc) => unsafe {
match (*arc.get()).swap(0, SeqCst) {
Shared(arc) => {
match arc.swap(0, SeqCst) {
0 => None,
n => Some(mem::transmute(n)),
n => Some(unsafe { mem::transmute(n) }),
}
}
}
@ -343,7 +344,7 @@ impl BlockedTask {
let arc = match self {
Owned(task) => {
let flag = unsafe { AtomicUint::new(mem::transmute(task)) };
UnsafeArc::new(flag)
Arc::new(flag)
}
Shared(arc) => arc.clone(),
};
@ -375,7 +376,7 @@ impl BlockedTask {
if blocked_task_ptr & 0x1 == 0 {
Owned(mem::transmute(blocked_task_ptr))
} else {
let ptr: Box<UnsafeArc<AtomicUint>> =
let ptr: Box<Arc<AtomicUint>> =
mem::transmute(blocked_task_ptr & !1);
Shared(*ptr)
}

View File

@ -1,189 +0,0 @@
// Copyright 2013-2014 The Rust Project Developers. See the COPYRIGHT
// file at the top-level directory of this distribution and at
// http://rust-lang.org/COPYRIGHT.
//
// Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
// http://www.apache.org/licenses/LICENSE-2.0> or the MIT license
// <LICENSE-MIT or http://opensource.org/licenses/MIT>, at your
// option. This file may not be copied, modified, or distributed
// except according to those terms.
//! Atomically reference counted data
//!
//! This modules contains the implementation of an atomically reference counted
//! pointer for the purpose of sharing data between tasks. This is obviously a
//! very unsafe primitive to use, but it has its use cases when implementing
//! concurrent data structures and similar tasks.
//!
//! Great care must be taken to ensure that data races do not arise through the
//! usage of `UnsafeArc`, and this often requires some form of external
//! synchronization. The only guarantee provided to you by this class is that
//! the underlying data will remain valid (not free'd) so long as the reference
//! count is greater than one.
use clone::Clone;
use iter::Iterator;
use kinds::Send;
use mem;
use ops::Drop;
use owned::Box;
use ptr::RawPtr;
use sync::atomics::{fence, AtomicUint, Relaxed, Acquire, Release};
use ty::Unsafe;
use vec::Vec;
/// An atomically reference counted pointer.
///
/// Enforces no shared-memory safety.
#[unsafe_no_drop_flag]
pub struct UnsafeArc<T> {
data: *mut ArcData<T>,
}
struct ArcData<T> {
count: AtomicUint,
data: Unsafe<T>,
}
unsafe fn new_inner<T: Send>(data: T, refcount: uint) -> *mut ArcData<T> {
let data = box ArcData {
count: AtomicUint::new(refcount),
data: Unsafe::new(data)
};
mem::transmute(data)
}
impl<T: Send> UnsafeArc<T> {
/// Creates a new `UnsafeArc` which wraps the given data.
pub fn new(data: T) -> UnsafeArc<T> {
unsafe { UnsafeArc { data: new_inner(data, 1) } }
}
/// As new(), but returns an extra pre-cloned handle.
pub fn new2(data: T) -> (UnsafeArc<T>, UnsafeArc<T>) {
unsafe {
let ptr = new_inner(data, 2);
(UnsafeArc { data: ptr }, UnsafeArc { data: ptr })
}
}
/// As new(), but returns a vector of as many pre-cloned handles as
/// requested.
pub fn newN(data: T, num_handles: uint) -> Vec<UnsafeArc<T>> {
unsafe {
if num_handles == 0 {
vec![] // need to free data here
} else {
let ptr = new_inner(data, num_handles);
let v = Vec::from_fn(num_handles, |_| UnsafeArc { data: ptr });
v
}
}
}
/// Gets a pointer to the inner shared data. Note that care must be taken to
/// ensure that the outer `UnsafeArc` does not fall out of scope while this
/// pointer is in use, otherwise it could possibly contain a use-after-free.
#[inline]
pub fn get(&self) -> *mut T {
unsafe {
debug_assert!((*self.data).count.load(Relaxed) > 0);
return (*self.data).data.get();
}
}
/// Gets an immutable pointer to the inner shared data. This has the same
/// caveats as the `get` method.
#[inline]
pub fn get_immut(&self) -> *T {
unsafe {
debug_assert!((*self.data).count.load(Relaxed) > 0);
return (*self.data).data.get() as *T;
}
}
/// checks if this is the only reference to the arc protected data
#[inline]
pub fn is_owned(&self) -> bool {
unsafe {
(*self.data).count.load(Relaxed) == 1
}
}
}
impl<T: Send> Clone for UnsafeArc<T> {
fn clone(&self) -> UnsafeArc<T> {
unsafe {
// Using a relaxed ordering is alright here, as knowledge of the original reference
// prevents other threads from erroneously deleting the object.
//
// As explained in the [Boost documentation][1],
// Increasing the reference counter can always be done with memory_order_relaxed: New
// references to an object can only be formed from an existing reference, and passing
// an existing reference from one thread to another must already provide any required
// synchronization.
// [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
let old_count = (*self.data).count.fetch_add(1, Relaxed);
debug_assert!(old_count >= 1);
return UnsafeArc { data: self.data };
}
}
}
#[unsafe_destructor]
impl<T> Drop for UnsafeArc<T>{
fn drop(&mut self) {
unsafe {
// Happens when destructing an unwrapper's handle and from
// `#[unsafe_no_drop_flag]`
if self.data.is_null() {
return
}
// Because `fetch_sub` is already atomic, we do not need to synchronize with other
// threads unless we are going to delete the object.
let old_count = (*self.data).count.fetch_sub(1, Release);
debug_assert!(old_count >= 1);
if old_count == 1 {
// This fence is needed to prevent reordering of use of the data and deletion of
// the data. Because it is marked `Release`, the decreasing of the reference count
// sychronizes with this `Acquire` fence. This means that use of the data happens
// before decreasing the refernce count, which happens before this fence, which
// happens before the deletion of the data.
//
// As explained in the [Boost documentation][1],
// It is important to enforce any possible access to the object in one thread
// (through an existing reference) to *happen before* deleting the object in a
// different thread. This is achieved by a "release" operation after dropping a
// reference (any access to the object through this reference must obviously
// happened before), and an "acquire" operation before deleting the object.
// [1]: (www.boost.org/doc/libs/1_55_0/doc/html/atomic/usage_examples.html)
fence(Acquire);
let _: Box<ArcData<T>> = mem::transmute(self.data);
}
}
}
}
#[cfg(test)]
mod tests {
use prelude::*;
use super::UnsafeArc;
use mem::size_of;
#[test]
fn test_size() {
assert_eq!(size_of::<UnsafeArc<[int, ..10]>>(), size_of::<*[int, ..10]>());
}
#[test]
fn arclike_newN() {
// Tests that the many-refcounts-at-once constructors don't leak.
let _ = UnsafeArc::new2("hello".to_owned().to_owned());
let x = UnsafeArc::newN("hello".to_owned().to_owned(), 0);
assert_eq!(x.len(), 0)
let x = UnsafeArc::newN("hello".to_owned().to_owned(), 1);
assert_eq!(x.len(), 1)
let x = UnsafeArc::newN("hello".to_owned().to_owned(), 10);
assert_eq!(x.len(), 10)
}
}

View File

@ -48,20 +48,22 @@
// FIXME: all atomic operations in this module use a SeqCst ordering. That is
// probably overkill
use alloc::arc::Arc;
use clone::Clone;
use iter::{range, Iterator};
use kinds::Send;
use kinds::marker;
use mem::{forget, min_align_of, size_of, transmute};
use ops::Drop;
use option::{Option, Some, None};
use owned::Box;
use ptr::RawPtr;
use ptr;
use rt::heap::{allocate, deallocate};
use slice::ImmutableVector;
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicInt, AtomicPtr, SeqCst};
use unstable::sync::Exclusive;
use rt::heap::{allocate, deallocate};
use vec::Vec;
// Once the queue is less than 1/K full, then it will be downsized. Note that
@ -87,14 +89,16 @@ struct Deque<T> {
///
/// There may only be one worker per deque.
pub struct Worker<T> {
deque: UnsafeArc<Deque<T>>,
deque: Arc<Deque<T>>,
noshare: marker::NoShare,
}
/// The stealing half of the work-stealing deque. Stealers have access to the
/// opposite end of the deque from the worker, and they only have access to the
/// `steal` method.
pub struct Stealer<T> {
deque: UnsafeArc<Deque<T>>,
deque: Arc<Deque<T>>,
noshare: marker::NoShare,
}
/// When stealing some data, this is an enumeration of the possible outcomes.
@ -149,12 +153,14 @@ impl<T: Send> BufferPool<T> {
/// Allocates a new work-stealing deque which will send/receiving memory to
/// and from this buffer pool.
pub fn deque(&mut self) -> (Worker<T>, Stealer<T>) {
let (a, b) = UnsafeArc::new2(Deque::new(self.clone()));
(Worker { deque: a }, Stealer { deque: b })
pub fn deque(&self) -> (Worker<T>, Stealer<T>) {
let a = Arc::new(Deque::new(self.clone()));
let b = a.clone();
(Worker { deque: a, noshare: marker::NoShare },
Stealer { deque: b, noshare: marker::NoShare })
}
fn alloc(&mut self, bits: int) -> Box<Buffer<T>> {
fn alloc(&self, bits: int) -> Box<Buffer<T>> {
unsafe {
self.pool.with(|pool| {
match pool.iter().position(|x| x.size() >= (1 << bits)) {
@ -165,7 +171,7 @@ impl<T: Send> BufferPool<T> {
}
}
fn free(&mut self, buf: Box<Buffer<T>>) {
fn free(&self, buf: Box<Buffer<T>>) {
unsafe {
let mut buf = Some(buf);
self.pool.with(|pool| {
@ -185,46 +191,48 @@ impl<T: Send> Clone for BufferPool<T> {
impl<T: Send> Worker<T> {
/// Pushes data onto the front of this work queue.
pub fn push(&mut self, t: T) {
unsafe { (*self.deque.get()).push(t) }
pub fn push(&self, t: T) {
unsafe { self.deque.push(t) }
}
/// Pops data off the front of the work queue, returning `None` on an empty
/// queue.
pub fn pop(&mut self) -> Option<T> {
unsafe { (*self.deque.get()).pop() }
pub fn pop(&self) -> Option<T> {
unsafe { self.deque.pop() }
}
/// Gets access to the buffer pool that this worker is attached to. This can
/// be used to create more deques which share the same buffer pool as this
/// deque.
pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
unsafe { &mut (*self.deque.get()).pool }
pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
&self.deque.pool
}
}
impl<T: Send> Stealer<T> {
/// Steals work off the end of the queue (opposite of the worker's end)
pub fn steal(&mut self) -> Stolen<T> {
unsafe { (*self.deque.get()).steal() }
pub fn steal(&self) -> Stolen<T> {
unsafe { self.deque.steal() }
}
/// Gets access to the buffer pool that this stealer is attached to. This
/// can be used to create more deques which share the same buffer pool as
/// this deque.
pub fn pool<'a>(&'a mut self) -> &'a mut BufferPool<T> {
unsafe { &mut (*self.deque.get()).pool }
pub fn pool<'a>(&'a self) -> &'a BufferPool<T> {
&self.deque.pool
}
}
impl<T: Send> Clone for Stealer<T> {
fn clone(&self) -> Stealer<T> { Stealer { deque: self.deque.clone() } }
fn clone(&self) -> Stealer<T> {
Stealer { deque: self.deque.clone(), noshare: marker::NoShare }
}
}
// Almost all of this code can be found directly in the paper so I'm not
// personally going to heavily comment what's going on here.
impl<T: Send> Deque<T> {
fn new(mut pool: BufferPool<T>) -> Deque<T> {
fn new(pool: BufferPool<T>) -> Deque<T> {
let buf = pool.alloc(MIN_BITS);
Deque {
bottom: AtomicInt::new(0),
@ -234,7 +242,7 @@ impl<T: Send> Deque<T> {
}
}
unsafe fn push(&mut self, data: T) {
unsafe fn push(&self, data: T) {
let mut b = self.bottom.load(SeqCst);
let t = self.top.load(SeqCst);
let mut a = self.array.load(SeqCst);
@ -250,7 +258,7 @@ impl<T: Send> Deque<T> {
self.bottom.store(b + 1, SeqCst);
}
unsafe fn pop(&mut self) -> Option<T> {
unsafe fn pop(&self) -> Option<T> {
let b = self.bottom.load(SeqCst);
let a = self.array.load(SeqCst);
let b = b - 1;
@ -276,7 +284,7 @@ impl<T: Send> Deque<T> {
}
}
unsafe fn steal(&mut self) -> Stolen<T> {
unsafe fn steal(&self) -> Stolen<T> {
let t = self.top.load(SeqCst);
let old = self.array.load(SeqCst);
let b = self.bottom.load(SeqCst);
@ -298,7 +306,7 @@ impl<T: Send> Deque<T> {
}
}
unsafe fn maybe_shrink(&mut self, b: int, t: int) {
unsafe fn maybe_shrink(&self, b: int, t: int) {
let a = self.array.load(SeqCst);
if b - t < (*a).size() / K && b - t > (1 << MIN_BITS) {
self.swap_buffer(b, a, (*a).resize(b, t, -1));
@ -312,7 +320,7 @@ impl<T: Send> Deque<T> {
// after this method has called 'free' on it. The continued usage is simply
// a read followed by a forget, but we must make sure that the memory can
// continue to be read after we flag this buffer for reclamation.
unsafe fn swap_buffer(&mut self, b: int, old: *mut Buffer<T>,
unsafe fn swap_buffer(&self, b: int, old: *mut Buffer<T>,
buf: Buffer<T>) -> *mut Buffer<T> {
let newbuf: *mut Buffer<T> = transmute(box buf);
self.array.store(newbuf, SeqCst);
@ -373,7 +381,7 @@ impl<T: Send> Buffer<T> {
// Unsafe because this unsafely overwrites possibly uninitialized or
// initialized data.
unsafe fn put(&mut self, i: int, t: T) {
unsafe fn put(&self, i: int, t: T) {
let ptr = self.storage.offset(i & self.mask());
ptr::copy_nonoverlapping_memory(ptr as *mut T, &t as *T, 1);
forget(t);
@ -382,7 +390,7 @@ impl<T: Send> Buffer<T> {
// Again, unsafe because this has incredibly dubious ownership violations.
// It is assumed that this buffer is immediately dropped.
unsafe fn resize(&self, b: int, t: int, delta: int) -> Buffer<T> {
let mut buf = Buffer::new(self.log_size + delta);
let buf = Buffer::new(self.log_size + delta);
for i in range(t, b) {
buf.put(i, self.get(i));
}
@ -415,8 +423,8 @@ mod tests {
#[test]
fn smoke() {
let mut pool = BufferPool::new();
let (mut w, mut s) = pool.deque();
let pool = BufferPool::new();
let (w, s) = pool.deque();
assert_eq!(w.pop(), None);
assert_eq!(s.steal(), Empty);
w.push(1);
@ -430,10 +438,9 @@ mod tests {
#[test]
fn stealpush() {
static AMT: int = 100000;
let mut pool = BufferPool::<int>::new();
let (mut w, s) = pool.deque();
let pool = BufferPool::<int>::new();
let (w, s) = pool.deque();
let t = Thread::start(proc() {
let mut s = s;
let mut left = AMT;
while left > 0 {
match s.steal() {
@ -456,10 +463,9 @@ mod tests {
#[test]
fn stealpush_large() {
static AMT: int = 100000;
let mut pool = BufferPool::<(int, int)>::new();
let (mut w, s) = pool.deque();
let pool = BufferPool::<(int, int)>::new();
let (w, s) = pool.deque();
let t = Thread::start(proc() {
let mut s = s;
let mut left = AMT;
while left > 0 {
match s.steal() {
@ -477,7 +483,7 @@ mod tests {
t.join();
}
fn stampede(mut w: Worker<Box<int>>, s: Stealer<Box<int>>,
fn stampede(w: Worker<Box<int>>, s: Stealer<Box<int>>,
nthreads: int, amt: uint) {
for _ in range(0, amt) {
w.push(box 20);
@ -489,7 +495,6 @@ mod tests {
let s = s.clone();
Thread::start(proc() {
unsafe {
let mut s = s;
while (*unsafe_remaining).load(SeqCst) > 0 {
match s.steal() {
Data(box 20) => {
@ -518,7 +523,7 @@ mod tests {
#[test]
fn run_stampede() {
let mut pool = BufferPool::<Box<int>>::new();
let pool = BufferPool::<Box<int>>::new();
let (w, s) = pool.deque();
stampede(w, s, 8, 10000);
}
@ -526,7 +531,7 @@ mod tests {
#[test]
fn many_stampede() {
static AMT: uint = 4;
let mut pool = BufferPool::<Box<int>>::new();
let pool = BufferPool::<Box<int>>::new();
let threads = range(0, AMT).map(|_| {
let (w, s) = pool.deque();
Thread::start(proc() {
@ -545,14 +550,13 @@ mod tests {
static NTHREADS: int = 8;
static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
static mut HITS: AtomicUint = INIT_ATOMIC_UINT;
let mut pool = BufferPool::<int>::new();
let (mut w, s) = pool.deque();
let pool = BufferPool::<int>::new();
let (w, s) = pool.deque();
let threads = range(0, NTHREADS).map(|_| {
let s = s.clone();
Thread::start(proc() {
unsafe {
let mut s = s;
loop {
match s.steal() {
Data(2) => { HITS.fetch_add(1, SeqCst); }
@ -604,8 +608,8 @@ mod tests {
static AMT: int = 10000;
static NTHREADS: int = 4;
static mut DONE: AtomicBool = INIT_ATOMIC_BOOL;
let mut pool = BufferPool::<(int, uint)>::new();
let (mut w, s) = pool.deque();
let pool = BufferPool::<(int, uint)>::new();
let (w, s) = pool.deque();
let (threads, hits) = vec::unzip(range(0, NTHREADS).map(|_| {
let s = s.clone();
@ -615,7 +619,6 @@ mod tests {
};
(Thread::start(proc() {
unsafe {
let mut s = s;
loop {
match s.steal() {
Data((1, 2)) => {

View File

@ -15,7 +15,6 @@
//! and/or blocking at all, but rather provide the necessary tools to build
//! other types of concurrent primitives.
pub mod arc;
pub mod atomics;
pub mod deque;
pub mod mpmc_bounded_queue;

View File

@ -29,13 +29,15 @@
// http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
use alloc::arc::Arc;
use clone::Clone;
use kinds::Send;
use num::next_power_of_two;
use option::{Option, Some, None};
use sync::arc::UnsafeArc;
use sync::atomics::{AtomicUint,Relaxed,Release,Acquire};
use vec::Vec;
use ty::Unsafe;
struct Node<T> {
sequence: AtomicUint,
@ -44,7 +46,7 @@ struct Node<T> {
struct State<T> {
pad0: [u8, ..64],
buffer: Vec<Node<T>>,
buffer: Vec<Unsafe<Node<T>>>,
mask: uint,
pad1: [u8, ..64],
enqueue_pos: AtomicUint,
@ -54,7 +56,7 @@ struct State<T> {
}
pub struct Queue<T> {
state: UnsafeArc<State<T>>,
state: Arc<State<T>>,
}
impl<T: Send> State<T> {
@ -70,7 +72,7 @@ impl<T: Send> State<T> {
capacity
};
let buffer = Vec::from_fn(capacity, |i| {
Node { sequence:AtomicUint::new(i), value: None }
Unsafe::new(Node { sequence:AtomicUint::new(i), value: None })
});
State{
pad0: [0, ..64],
@ -84,19 +86,21 @@ impl<T: Send> State<T> {
}
}
fn push(&mut self, value: T) -> bool {
fn push(&self, value: T) -> bool {
let mask = self.mask;
let mut pos = self.enqueue_pos.load(Relaxed);
loop {
let node = self.buffer.get_mut(pos & mask);
let seq = node.sequence.load(Acquire);
let node = self.buffer.get(pos & mask);
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
let diff: int = seq as int - pos as int;
if diff == 0 {
let enqueue_pos = self.enqueue_pos.compare_and_swap(pos, pos+1, Relaxed);
if enqueue_pos == pos {
node.value = Some(value);
node.sequence.store(pos+1, Release);
unsafe {
(*node.get()).value = Some(value);
(*node.get()).sequence.store(pos+1, Release);
}
break
} else {
pos = enqueue_pos;
@ -110,19 +114,21 @@ impl<T: Send> State<T> {
true
}
fn pop(&mut self) -> Option<T> {
fn pop(&self) -> Option<T> {
let mask = self.mask;
let mut pos = self.dequeue_pos.load(Relaxed);
loop {
let node = self.buffer.get_mut(pos & mask);
let seq = node.sequence.load(Acquire);
let node = self.buffer.get(pos & mask);
let seq = unsafe { (*node.get()).sequence.load(Acquire) };
let diff: int = seq as int - (pos + 1) as int;
if diff == 0 {
let dequeue_pos = self.dequeue_pos.compare_and_swap(pos, pos+1, Relaxed);
if dequeue_pos == pos {
let value = node.value.take();
node.sequence.store(pos + mask + 1, Release);
return value
unsafe {
let value = (*node.get()).value.take();
(*node.get()).sequence.store(pos + mask + 1, Release);
return value
}
} else {
pos = dequeue_pos;
}
@ -138,24 +144,22 @@ impl<T: Send> State<T> {
impl<T: Send> Queue<T> {
pub fn with_capacity(capacity: uint) -> Queue<T> {
Queue{
state: UnsafeArc::new(State::with_capacity(capacity))
state: Arc::new(State::with_capacity(capacity))
}
}
pub fn push(&mut self, value: T) -> bool {
unsafe { (*self.state.get()).push(value) }
pub fn push(&self, value: T) -> bool {
self.state.push(value)
}
pub fn pop(&mut self) -> Option<T> {
unsafe { (*self.state.get()).pop() }
pub fn pop(&self) -> Option<T> {
self.state.pop()
}
}
impl<T: Send> Clone for Queue<T> {
fn clone(&self) -> Queue<T> {
Queue {
state: self.state.clone()
}
Queue { state: self.state.clone() }
}
}
@ -169,7 +173,7 @@ mod tests {
fn test() {
let nthreads = 8u;
let nmsgs = 1000u;
let mut q = Queue::with_capacity(nthreads*nmsgs);
let q = Queue::with_capacity(nthreads*nmsgs);
assert_eq!(None, q.pop());
let (tx, rx) = channel();
@ -177,7 +181,7 @@ mod tests {
let q = q.clone();
let tx = tx.clone();
native::task::spawn(proc() {
let mut q = q;
let q = q;
for i in range(0, nmsgs) {
assert!(q.push(i));
}
@ -191,7 +195,7 @@ mod tests {
completion_rxs.push(rx);
let q = q.clone();
native::task::spawn(proc() {
let mut q = q;
let q = q;
let mut i = 0u;
loop {
match q.pop() {

View File

@ -45,6 +45,7 @@ use option::{Option, None, Some};
use owned::Box;
use ptr::RawPtr;
use sync::atomics::{AtomicPtr, Release, Acquire, AcqRel, Relaxed};
use ty::Unsafe;
/// A result of the `pop` function.
pub enum PopResult<T> {
@ -69,7 +70,7 @@ struct Node<T> {
/// popper at a time (many pushers are allowed).
pub struct Queue<T> {
head: AtomicPtr<Node<T>>,
tail: *mut Node<T>,
tail: Unsafe<*mut Node<T>>,
}
impl<T> Node<T> {
@ -88,12 +89,12 @@ impl<T: Send> Queue<T> {
let stub = unsafe { Node::new(None) };
Queue {
head: AtomicPtr::new(stub),
tail: stub,
tail: Unsafe::new(stub),
}
}
/// Pushes a new value onto this queue.
pub fn push(&mut self, t: T) {
pub fn push(&self, t: T) {
unsafe {
let n = Node::new(Some(t));
let prev = self.head.swap(n, AcqRel);
@ -111,13 +112,13 @@ impl<T: Send> Queue<T> {
///
/// This inconsistent state means that this queue does indeed have data, but
/// it does not currently have access to it at this time.
pub fn pop(&mut self) -> PopResult<T> {
pub fn pop(&self) -> PopResult<T> {
unsafe {
let tail = self.tail;
let tail = *self.tail.get();
let next = (*tail).next.load(Acquire);
if !next.is_null() {
self.tail = next;
*self.tail.get() = next;
assert!((*tail).value.is_none());
assert!((*next).value.is_some());
let ret = (*next).value.take_unwrap();
@ -131,7 +132,7 @@ impl<T: Send> Queue<T> {
/// Attempts to pop data from this queue, but doesn't attempt too hard. This
/// will canonicalize inconsistent states to a `None` value.
pub fn casual_pop(&mut self) -> Option<T> {
pub fn casual_pop(&self) -> Option<T> {
match self.pop() {
Data(t) => Some(t),
Empty | Inconsistent => None,
@ -143,7 +144,7 @@ impl<T: Send> Queue<T> {
impl<T: Send> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
let mut cur = self.tail;
let mut cur = *self.tail.get();
while !cur.is_null() {
let next = (*cur).next.load(Relaxed);
let _: Box<Node<T>> = mem::transmute(cur);
@ -157,13 +158,14 @@ impl<T: Send> Drop for Queue<T> {
mod tests {
use prelude::*;
use alloc::arc::Arc;
use native;
use super::{Queue, Data, Empty, Inconsistent};
use sync::arc::UnsafeArc;
#[test]
fn test_full() {
let mut q = Queue::new();
let q = Queue::new();
q.push(box 1);
q.push(box 2);
}
@ -172,20 +174,20 @@ mod tests {
fn test() {
let nthreads = 8u;
let nmsgs = 1000u;
let mut q = Queue::new();
let q = Queue::new();
match q.pop() {
Empty => {}
Inconsistent | Data(..) => fail!()
}
let (tx, rx) = channel();
let q = UnsafeArc::new(q);
let q = Arc::new(q);
for _ in range(0, nthreads) {
let tx = tx.clone();
let q = q.clone();
native::task::spawn(proc() {
for i in range(0, nmsgs) {
unsafe { (*q.get()).push(i); }
q.push(i);
}
tx.send(());
});
@ -193,7 +195,7 @@ mod tests {
let mut i = 0u;
while i < nthreads * nmsgs {
match unsafe { (*q.get()).pop() } {
match q.pop() {
Empty | Inconsistent => {},
Data(_) => { i += 1 }
}

View File

@ -40,6 +40,7 @@ use option::{Some, None, Option};
use owned::Box;
use ptr::RawPtr;
use sync::atomics::{AtomicPtr, Relaxed, AtomicUint, Acquire, Release};
use ty::Unsafe;
// Node within the linked list queue of messages to send
struct Node<T> {
@ -51,18 +52,18 @@ struct Node<T> {
}
/// The single-producer single-consumer queue. This structure is not cloneable,
/// but it can be safely shared in an UnsafeArc if it is guaranteed that there
/// but it can be safely shared in an Arc if it is guaranteed that there
/// is only one popper and one pusher touching the queue at any one point in
/// time.
pub struct Queue<T> {
// consumer fields
tail: *mut Node<T>, // where to pop from
tail: Unsafe<*mut Node<T>>, // where to pop from
tail_prev: AtomicPtr<Node<T>>, // where to pop from
// producer fields
head: *mut Node<T>, // where to push to
first: *mut Node<T>, // where to get new nodes from
tail_copy: *mut Node<T>, // between first/tail
head: Unsafe<*mut Node<T>>, // where to push to
first: Unsafe<*mut Node<T>>, // where to get new nodes from
tail_copy: Unsafe<*mut Node<T>>, // between first/tail
// Cache maintenance fields. Additions and subtractions are stored
// separately in order to allow them to use nonatomic addition/subtraction.
@ -101,11 +102,11 @@ impl<T: Send> Queue<T> {
let n2 = Node::new();
unsafe { (*n1).next.store(n2, Relaxed) }
Queue {
tail: n2,
tail: Unsafe::new(n2),
tail_prev: AtomicPtr::new(n1),
head: n2,
first: n1,
tail_copy: n1,
head: Unsafe::new(n2),
first: Unsafe::new(n1),
tail_copy: Unsafe::new(n1),
cache_bound: bound,
cache_additions: AtomicUint::new(0),
cache_subtractions: AtomicUint::new(0),
@ -114,7 +115,7 @@ impl<T: Send> Queue<T> {
/// Pushes a new value onto this queue. Note that to use this function
/// safely, it must be externally guaranteed that there is only one pusher.
pub fn push(&mut self, t: T) {
pub fn push(&self, t: T) {
unsafe {
// Acquire a node (which either uses a cached one or allocates a new
// one), and then append this to the 'head' node.
@ -122,35 +123,35 @@ impl<T: Send> Queue<T> {
assert!((*n).value.is_none());
(*n).value = Some(t);
(*n).next.store(0 as *mut Node<T>, Relaxed);
(*self.head).next.store(n, Release);
self.head = n;
(**self.head.get()).next.store(n, Release);
*self.head.get() = n;
}
}
unsafe fn alloc(&mut self) -> *mut Node<T> {
unsafe fn alloc(&self) -> *mut Node<T> {
// First try to see if we can consume the 'first' node for our uses.
// We try to avoid as many atomic instructions as possible here, so
// the addition to cache_subtractions is not atomic (plus we're the
// only one subtracting from the cache).
if self.first != self.tail_copy {
if *self.first.get() != *self.tail_copy.get() {
if self.cache_bound > 0 {
let b = self.cache_subtractions.load(Relaxed);
self.cache_subtractions.store(b + 1, Relaxed);
}
let ret = self.first;
self.first = (*ret).next.load(Relaxed);
let ret = *self.first.get();
*self.first.get() = (*ret).next.load(Relaxed);
return ret;
}
// If the above fails, then update our copy of the tail and try
// again.
self.tail_copy = self.tail_prev.load(Acquire);
if self.first != self.tail_copy {
*self.tail_copy.get() = self.tail_prev.load(Acquire);
if *self.first.get() != *self.tail_copy.get() {
if self.cache_bound > 0 {
let b = self.cache_subtractions.load(Relaxed);
self.cache_subtractions.store(b + 1, Relaxed);
}
let ret = self.first;
self.first = (*ret).next.load(Relaxed);
let ret = *self.first.get();
*self.first.get() = (*ret).next.load(Relaxed);
return ret;
}
// If all of that fails, then we have to allocate a new node
@ -160,19 +161,19 @@ impl<T: Send> Queue<T> {
/// Attempts to pop a value from this queue. Remember that to use this type
/// safely you must ensure that there is only one popper at a time.
pub fn pop(&mut self) -> Option<T> {
pub fn pop(&self) -> Option<T> {
unsafe {
// The `tail` node is not actually a used node, but rather a
// sentinel from where we should start popping from. Hence, look at
// tail's next field and see if we can use it. If we do a pop, then
// the current tail node is a candidate for going into the cache.
let tail = self.tail;
let tail = *self.tail.get();
let next = (*tail).next.load(Acquire);
if next.is_null() { return None }
assert!((*next).value.is_some());
let ret = (*next).value.take();
self.tail = next;
*self.tail.get() = next;
if self.cache_bound == 0 {
self.tail_prev.store(tail, Release);
} else {
@ -197,11 +198,11 @@ impl<T: Send> Queue<T> {
/// Attempts to peek at the head of the queue, returning `None` if the queue
/// has no data currently
pub fn peek<'a>(&'a mut self) -> Option<&'a mut T> {
pub fn peek<'a>(&'a self) -> Option<&'a mut T> {
// This is essentially the same as above with all the popping bits
// stripped out.
unsafe {
let tail = self.tail;
let tail = *self.tail.get();
let next = (*tail).next.load(Acquire);
if next.is_null() { return None }
return (*next).value.as_mut();
@ -213,7 +214,7 @@ impl<T: Send> Queue<T> {
impl<T: Send> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
let mut cur = self.first;
let mut cur = *self.first.get();
while !cur.is_null() {
let next = (*cur).next.load(Relaxed);
let _n: Box<Node<T>> = mem::transmute(cur);
@ -226,13 +227,15 @@ impl<T: Send> Drop for Queue<T> {
#[cfg(test)]
mod test {
use prelude::*;
use alloc::arc::Arc;
use native;
use super::Queue;
use sync::arc::UnsafeArc;
#[test]
fn smoke() {
let mut q = Queue::new(0);
let q = Queue::new(0);
q.push(1);
q.push(2);
assert_eq!(q.pop(), Some(1));
@ -247,14 +250,14 @@ mod test {
#[test]
fn drop_full() {
let mut q = Queue::new(0);
let q = Queue::new(0);
q.push(box 1);
q.push(box 2);
}
#[test]
fn smoke_bound() {
let mut q = Queue::new(1);
let q = Queue::new(1);
q.push(1);
q.push(2);
assert_eq!(q.pop(), Some(1));
@ -273,12 +276,13 @@ mod test {
stress_bound(1);
fn stress_bound(bound: uint) {
let (a, b) = UnsafeArc::new2(Queue::new(bound));
let a = Arc::new(Queue::new(bound));
let b = a.clone();
let (tx, rx) = channel();
native::task::spawn(proc() {
for _ in range(0, 100000) {
loop {
match unsafe { (*b.get()).pop() } {
match b.pop() {
Some(1) => break,
Some(_) => fail!(),
None => {}
@ -288,7 +292,7 @@ mod test {
tx.send(());
});
for _ in range(0, 100000) {
unsafe { (*a.get()).push(1); }
a.push(1);
}
rx.recv();
}

View File

@ -8,9 +8,11 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.
use alloc::arc::Arc;
use clone::Clone;
use kinds::Send;
use sync::arc::UnsafeArc;
use ty::Unsafe;
use unstable::mutex::NativeMutex;
struct ExData<T> {
@ -30,7 +32,7 @@ struct ExData<T> {
* need to block or deschedule while accessing shared state, use extra::sync::RWArc.
*/
pub struct Exclusive<T> {
x: UnsafeArc<ExData<T>>
x: Arc<Unsafe<ExData<T>>>
}
impl<T:Send> Clone for Exclusive<T> {
@ -48,7 +50,7 @@ impl<T:Send> Exclusive<T> {
data: user_data
};
Exclusive {
x: UnsafeArc::new(data)
x: Arc::new(Unsafe::new(data))
}
}