extra: Make room for more sync primitives

This commit is contained in:
Alex Crichton 2014-01-16 19:52:47 -08:00
parent b5f19795ad
commit a417de94a9

View File

@ -17,9 +17,8 @@
* in std.
*/
use std::cast;
use std::comm;
use std::unstable::sync::Exclusive;
use std::sync::arc::UnsafeArc;
use std::sync::atomics;
use std::unstable::finally::Finally;
@ -32,6 +31,10 @@ use arc::MutexArc;
* Internals
****************************************************************************/
pub mod mutex;
pub mod one;
mod mpsc_intrusive;
// Each waiting task receives on one of these.
#[doc(hidden)]
type WaitEnd = Port<()>;
@ -54,7 +57,7 @@ impl WaitQueue {
comm::Data(ch) => {
// Send a wakeup signal. If the waiter was killed, its port will
// have closed. Keep trying until we get a live task.
if ch.try_send_deferred(()) {
if ch.try_send(()) {
true
} else {
self.signal()
@ -69,7 +72,7 @@ impl WaitQueue {
loop {
match self.head.try_recv() {
comm::Data(ch) => {
if ch.try_send_deferred(()) {
if ch.try_send(()) {
count += 1;
}
}
@ -81,14 +84,14 @@ impl WaitQueue {
fn wait_end(&self) -> WaitEnd {
let (wait_end, signal_end) = Chan::new();
assert!(self.tail.try_send_deferred(signal_end));
assert!(self.tail.try_send(signal_end));
wait_end
}
}
// The building-block used to make semaphores, mutexes, and rwlocks.
#[doc(hidden)]
struct SemInner<Q> {
lock: mutex::Mutex,
count: int,
waiters: WaitQueue,
// Can be either unit or another waitqueue. Some sems shouldn't come with
@ -96,21 +99,30 @@ struct SemInner<Q> {
blocked: Q
}
#[doc(hidden)]
struct Sem<Q>(Exclusive<SemInner<Q>>);
struct Sem<Q>(UnsafeArc<SemInner<Q>>);
#[doc(hidden)]
impl<Q:Send> Sem<Q> {
fn new(count: int, q: Q) -> Sem<Q> {
Sem(Exclusive::new(SemInner {
count: count, waiters: WaitQueue::new(), blocked: q }))
Sem(UnsafeArc::new(SemInner {
count: count,
waiters: WaitQueue::new(),
blocked: q,
lock: mutex::Mutex::new(),
}))
}
unsafe fn with(&self, f: |&mut SemInner<Q>|) {
let Sem(ref arc) = *self;
let state = arc.get();
let _g = (*state).lock.lock();
f(cast::transmute(state));
}
pub fn acquire(&self) {
unsafe {
let mut waiter_nobe = None;
let Sem(ref lock) = *self;
lock.with(|state| {
self.with(|state| {
state.count -= 1;
if state.count < 0 {
// Create waiter nobe, enqueue ourself, and tell
@ -129,8 +141,7 @@ impl<Q:Send> Sem<Q> {
pub fn release(&self) {
unsafe {
let Sem(ref lock) = *self;
lock.with(|state| {
self.with(|state| {
state.count += 1;
if state.count <= 0 {
state.waiters.signal();
@ -210,8 +221,7 @@ impl<'a> Condvar<'a> {
let mut out_of_bounds = None;
// Release lock, 'atomically' enqueuing ourselves in so doing.
unsafe {
let Sem(ref queue) = *self.sem;
queue.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
// Drop the lock.
state.count += 1;
@ -253,8 +263,7 @@ impl<'a> Condvar<'a> {
unsafe {
let mut out_of_bounds = None;
let mut result = false;
let Sem(ref lock) = *self.sem;
lock.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
result = state.blocked[condvar_id].signal();
} else {
@ -276,8 +285,7 @@ impl<'a> Condvar<'a> {
let mut out_of_bounds = None;
let mut queue = None;
unsafe {
let Sem(ref lock) = *self.sem;
lock.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
// To avoid :broadcast_heavy, we make a new waitqueue,
// swap it out with the old one, and broadcast on the