mirror of
https://github.com/rust-lang/rust.git
synced 2025-06-05 19:58:32 +00:00
queue_rwlock: use a separate QUEUE_LOCKED
bit to synchronize waiter queue updates
This commit is contained in:
parent
8db64b5e2d
commit
16aae04f68
@ -39,16 +39,16 @@
|
|||||||
//!
|
//!
|
||||||
//! ## State
|
//! ## State
|
||||||
//!
|
//!
|
||||||
//! A single [`AtomicPtr`] is used as state variable. The lowest two bits are used
|
//! A single [`AtomicPtr`] is used as state variable. The lowest three bits are used
|
||||||
//! to indicate the meaning of the remaining bits:
|
//! to indicate the meaning of the remaining bits:
|
||||||
//!
|
//!
|
||||||
//! | `LOCKED` | `QUEUED` | Remaining | |
|
//! | [`LOCKED`] | [`QUEUED`] | [`QUEUE_LOCKED`] | Remaining | |
|
||||||
//! |:----------|:----------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
|
//! |:-----------|:-----------|:-----------------|:-------------|:----------------------------------------------------------------------------------------------------------------------------|
|
||||||
//! | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
|
//! | 0 | 0 | 0 | 0 | The lock is unlocked, no threads are waiting |
|
||||||
//! | 1 | 0 | 0 | The lock is write-locked, no threads waiting |
|
//! | 1 | 0 | 0 | 0 | The lock is write-locked, no threads waiting |
|
||||||
//! | 1 | 0 | n > 0 | The lock is read-locked with n readers |
|
//! | 1 | 0 | 0 | n > 0 | The lock is read-locked with n readers |
|
||||||
//! | 0 | 1 | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
|
//! | 0 | 1 | * | `*mut Node` | The lock is unlocked, but some threads are waiting. Only writers may lock the lock |
|
||||||
//! | 1 | 1 | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
|
//! | 1 | 1 | * | `*mut Node` | The lock is locked, but some threads are waiting. If the lock is read-locked, the last queue node contains the reader count |
|
||||||
//!
|
//!
|
||||||
//! ## Waiter queue
|
//! ## Waiter queue
|
||||||
//!
|
//!
|
||||||
@ -84,28 +84,31 @@
|
|||||||
//! ```
|
//! ```
|
||||||
//!
|
//!
|
||||||
//! Invariants:
|
//! Invariants:
|
||||||
//! 1. The `next` field always points to a valid node, except in the tail node.
|
//! 1. At least one node must contain a non-null, current `tail` field.
|
||||||
//! 2. The `next` field of the tail node must be null while the queue is unlocked.
|
//! 2. The first non-null `tail` field must be valid and current.
|
||||||
//! 3. At least one node must contain a non-null, current `tail` field.
|
//! 3. All nodes preceding this node must have a correct, non-null `next` field.
|
||||||
//! 4. The first non-null `tail` field must be valid and current.
|
//! 4. All nodes following this node must have a correct, non-null `prev` field.
|
||||||
//! 5. All nodes following this node must have a correct, non-null `prev` field.
|
|
||||||
//!
|
//!
|
||||||
//! While adding a new node to the queue may be done by any thread at any time,
|
//! Access to the queue is controlled by the `QUEUE_LOCKED` bit, which threads
|
||||||
//! removing nodes may only be done by a single thread. Instead of using a
|
//! try to set both after enqueuing themselves to eagerly add backlinks to the
|
||||||
//! separate lock bit for the queue like usync does, this implementation
|
//! queue and after unlocking the lock to wake the next waiter(s). This is done
|
||||||
//! only allows the (last) lock owner to modify the queue.
|
//! atomically at the same time as the enqueuing/unlocking operation. The thread
|
||||||
|
//! releasing the `QUEUE_LOCK` bit will check the state of the lock and wake up
|
||||||
|
//! waiters as appropriate. This guarantees forward-progress even if the unlocking
|
||||||
|
//! thread could not acquire the queue lock.
|
||||||
//!
|
//!
|
||||||
//! ## Memory orderings
|
//! ## Memory orderings
|
||||||
//!
|
//!
|
||||||
//! To properly synchronize changes to the data protected by the lock, the lock
|
//! To properly synchronize changes to the data protected by the lock, the lock
|
||||||
//! is acquired and released with [`Acquire`] and [`Release`] ordering, respectively.
|
//! is acquired and released with [`Acquire`] and [`Release`] ordering, respectively.
|
||||||
//! To propagate the initialization of nodes, changes to the list are also propagated
|
//! To propagate the initialization of nodes, changes to the queue lock are also
|
||||||
//! using these orderings.
|
//! performed using these orderings.
|
||||||
|
|
||||||
#![forbid(unsafe_op_in_unsafe_fn)]
|
#![forbid(unsafe_op_in_unsafe_fn)]
|
||||||
|
|
||||||
use crate::cell::OnceCell;
|
use crate::cell::OnceCell;
|
||||||
use crate::hint::spin_loop;
|
use crate::hint::spin_loop;
|
||||||
|
use crate::mem;
|
||||||
use crate::ptr::{self, invalid_mut, null_mut, NonNull};
|
use crate::ptr::{self, invalid_mut, null_mut, NonNull};
|
||||||
use crate::sync::atomic::{
|
use crate::sync::atomic::{
|
||||||
AtomicBool, AtomicPtr,
|
AtomicBool, AtomicPtr,
|
||||||
@ -114,7 +117,10 @@ use crate::sync::atomic::{
|
|||||||
use crate::sys_common::thread_info;
|
use crate::sys_common::thread_info;
|
||||||
use crate::thread::Thread;
|
use crate::thread::Thread;
|
||||||
|
|
||||||
const SPIN_COUNT: usize = 6;
|
// Locking uses exponential backoff. `SPIN_COUNT` indicates how many times the
|
||||||
|
// locking operation will be retried.
|
||||||
|
// `spin_loop` will be called `2.pow(SPIN_COUNT) - 1` times.
|
||||||
|
const SPIN_COUNT: usize = 7;
|
||||||
|
|
||||||
type State = *mut ();
|
type State = *mut ();
|
||||||
type AtomicState = AtomicPtr<()>;
|
type AtomicState = AtomicPtr<()>;
|
||||||
@ -122,23 +128,24 @@ type AtomicState = AtomicPtr<()>;
|
|||||||
const UNLOCKED: State = invalid_mut(0);
|
const UNLOCKED: State = invalid_mut(0);
|
||||||
const LOCKED: usize = 1;
|
const LOCKED: usize = 1;
|
||||||
const QUEUED: usize = 2;
|
const QUEUED: usize = 2;
|
||||||
const SINGLE: usize = 4;
|
const QUEUE_LOCKED: usize = 4;
|
||||||
const MASK: usize = !(LOCKED | QUEUED);
|
const SINGLE: usize = 8;
|
||||||
|
const MASK: usize = !(QUEUE_LOCKED | QUEUED | LOCKED);
|
||||||
|
|
||||||
/// Returns a closure that changes the state to the lock state corresponding to
|
/// Returns a closure that changes the state to the lock state corresponding to
|
||||||
/// the lock mode indicated in `read`.
|
/// the lock mode indicated in `write`.
|
||||||
#[inline]
|
#[inline]
|
||||||
fn lock(read: bool) -> impl Fn(State) -> Option<State> {
|
fn lock(write: bool) -> impl Fn(State) -> Option<State> {
|
||||||
move |state| {
|
move |state| {
|
||||||
if read {
|
if write {
|
||||||
|
let state = state.wrapping_byte_add(LOCKED);
|
||||||
|
if state.addr() & LOCKED == LOCKED { Some(state) } else { None }
|
||||||
|
} else {
|
||||||
if state.addr() & QUEUED == 0 && state.addr() != LOCKED {
|
if state.addr() & QUEUED == 0 && state.addr() != LOCKED {
|
||||||
Some(invalid_mut(state.addr().checked_add(SINGLE)? | LOCKED))
|
Some(invalid_mut(state.addr().checked_add(SINGLE)? | LOCKED))
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
let state = state.wrapping_byte_add(LOCKED);
|
|
||||||
if state.addr() & LOCKED == LOCKED { Some(state) } else { None }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -169,24 +176,24 @@ impl AtomicLink {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[repr(align(4))]
|
#[repr(align(8))]
|
||||||
struct Node {
|
struct Node {
|
||||||
next: AtomicLink,
|
next: AtomicLink,
|
||||||
prev: AtomicLink,
|
prev: AtomicLink,
|
||||||
tail: AtomicLink,
|
tail: AtomicLink,
|
||||||
read: bool,
|
write: bool,
|
||||||
thread: OnceCell<Thread>,
|
thread: OnceCell<Thread>,
|
||||||
completed: AtomicBool,
|
completed: AtomicBool,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Node {
|
impl Node {
|
||||||
/// Create a new queue node.
|
/// Create a new queue node.
|
||||||
fn new(read: bool) -> Node {
|
fn new(write: bool) -> Node {
|
||||||
Node {
|
Node {
|
||||||
next: AtomicLink::new(None),
|
next: AtomicLink::new(None),
|
||||||
prev: AtomicLink::new(None),
|
prev: AtomicLink::new(None),
|
||||||
tail: AtomicLink::new(None),
|
tail: AtomicLink::new(None),
|
||||||
read,
|
write,
|
||||||
thread: OnceCell::new(),
|
thread: OnceCell::new(),
|
||||||
completed: AtomicBool::new(false),
|
completed: AtomicBool::new(false),
|
||||||
}
|
}
|
||||||
@ -201,9 +208,9 @@ impl Node {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Assuming the node contains a reader lock count, decrement that count.
|
/// Assuming the node contains a reader lock count, decrement that count.
|
||||||
/// Returns `true` if there are other lock owners.
|
/// Returns `true` if this thread was the last lock owner.
|
||||||
fn decrement_count(&self) -> bool {
|
fn decrement_count(&self) -> bool {
|
||||||
self.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() > SINGLE
|
self.next.0.fetch_byte_sub(SINGLE, AcqRel).addr() - SINGLE == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Prepare this node for waiting.
|
/// Prepare this node for waiting.
|
||||||
@ -239,6 +246,14 @@ impl Node {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct PanicGuard;
|
||||||
|
|
||||||
|
impl Drop for PanicGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
rtabort!("tried to drop node in intrusive list.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Find the tail of the queue beginning with `head`, caching the result in `head`.
|
/// Find the tail of the queue beginning with `head`, caching the result in `head`.
|
||||||
///
|
///
|
||||||
/// May be called from multiple threads at the same time, while the queue is not
|
/// May be called from multiple threads at the same time, while the queue is not
|
||||||
@ -257,9 +272,8 @@ unsafe fn find_tail(head: NonNull<Node>) -> NonNull<Node> {
|
|||||||
match c.tail.get() {
|
match c.tail.get() {
|
||||||
Some(tail) => break tail,
|
Some(tail) => break tail,
|
||||||
// SAFETY:
|
// SAFETY:
|
||||||
// Only the `next` field of the tail is null (invariants 1. and 2.)
|
// All `next` fields before the first node with a `set` tail are
|
||||||
// Since at least one element in the queue has a non-null tail (invariant 3.),
|
// non-null and valid (invariant 3).
|
||||||
// this code will never be run for `current == tail`.
|
|
||||||
None => unsafe {
|
None => unsafe {
|
||||||
let next = c.next.get().unwrap_unchecked();
|
let next = c.next.get().unwrap_unchecked();
|
||||||
next.as_ref().prev.set(Some(current));
|
next.as_ref().prev.set(Some(current));
|
||||||
@ -286,13 +300,13 @@ impl RwLock {
|
|||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn try_read(&self) -> bool {
|
pub fn try_read(&self) -> bool {
|
||||||
self.state.fetch_update(Acquire, Relaxed, lock(true)).is_ok()
|
self.state.fetch_update(Acquire, Relaxed, lock(false)).is_ok()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn read(&self) {
|
pub fn read(&self) {
|
||||||
if !self.try_read() {
|
if !self.try_read() {
|
||||||
self.lock_contended(true)
|
self.lock_contended(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -300,22 +314,22 @@ impl RwLock {
|
|||||||
pub fn try_write(&self) -> bool {
|
pub fn try_write(&self) -> bool {
|
||||||
// This is lowered to a single atomic instruction on most modern processors
|
// This is lowered to a single atomic instruction on most modern processors
|
||||||
// (e.g. "lock bts" on x86 and "ldseta" on modern AArch64), and therefore
|
// (e.g. "lock bts" on x86 and "ldseta" on modern AArch64), and therefore
|
||||||
// is more efficient than `fetch_update(lock(false))`, which can spuriously
|
// is more efficient than `fetch_update(lock(true))`, which can spuriously
|
||||||
// fail if a new node is appended to the queue.
|
// fail if a new node is appended to the queue.
|
||||||
self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED != LOCKED
|
self.state.fetch_or(LOCKED, Acquire).addr() & LOCKED == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub fn write(&self) {
|
pub fn write(&self) {
|
||||||
if !self.try_write() {
|
if !self.try_write() {
|
||||||
self.lock_contended(false)
|
self.lock_contended(true)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cold]
|
#[cold]
|
||||||
fn lock_contended(&self, read: bool) {
|
fn lock_contended(&self, write: bool) {
|
||||||
let update = lock(read);
|
let update = lock(write);
|
||||||
let mut node = Node::new(read);
|
let mut node = Node::new(write);
|
||||||
let mut state = self.state.load(Relaxed);
|
let mut state = self.state.load(Relaxed);
|
||||||
let mut count = 0;
|
let mut count = 0;
|
||||||
loop {
|
loop {
|
||||||
@ -326,8 +340,9 @@ impl RwLock {
|
|||||||
Err(new) => state = new,
|
Err(new) => state = new,
|
||||||
}
|
}
|
||||||
} else if state.addr() & QUEUED == 0 && count < SPIN_COUNT {
|
} else if state.addr() & QUEUED == 0 && count < SPIN_COUNT {
|
||||||
// If the lock is not available but no threads are queued, spin
|
// If the lock is not available and no threads are queued, spin
|
||||||
// for a while.
|
// for a while, using exponential backoff to decrease cache
|
||||||
|
// contention.
|
||||||
for _ in 0..(1 << count) {
|
for _ in 0..(1 << count) {
|
||||||
spin_loop();
|
spin_loop();
|
||||||
}
|
}
|
||||||
@ -338,18 +353,26 @@ impl RwLock {
|
|||||||
node.prepare();
|
node.prepare();
|
||||||
node.set_state(state);
|
node.set_state(state);
|
||||||
node.prev = AtomicLink::new(None);
|
node.prev = AtomicLink::new(None);
|
||||||
// If this is the first node in the queue, set the tail field to
|
let mut next = ptr::from_ref(&node)
|
||||||
// the node itself to ensure there is a current `tail` field in
|
|
||||||
// the queue (invariants 3. and 4.). This needs to use `set` to
|
|
||||||
// avoid invalidating the new pointer.
|
|
||||||
node.tail.set((state.addr() & QUEUED == 0).then_some(NonNull::from(&node)));
|
|
||||||
|
|
||||||
let next = ptr::from_ref(&node)
|
|
||||||
.map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED))
|
.map_addr(|addr| addr | QUEUED | (state.addr() & LOCKED))
|
||||||
as State;
|
as State;
|
||||||
|
|
||||||
|
if state.addr() & QUEUED == 0 {
|
||||||
|
// If this is the first node in the queue, set the tail field to
|
||||||
|
// the node itself to ensure there is a current `tail` field in
|
||||||
|
// the queue (invariants 1 and 2). This needs to use `set` to
|
||||||
|
// avoid invalidating the new pointer.
|
||||||
|
node.tail.set(Some(NonNull::from(&node)));
|
||||||
|
} else {
|
||||||
|
// Otherwise, the tail of the queue is not known.
|
||||||
|
node.tail.set(None);
|
||||||
|
// Try locking the queue to fully link it.
|
||||||
|
next = next.map_addr(|addr| addr | QUEUE_LOCKED);
|
||||||
|
}
|
||||||
|
|
||||||
// Use release ordering to propagate our changes to the waking
|
// Use release ordering to propagate our changes to the waking
|
||||||
// thread.
|
// thread.
|
||||||
if let Err(new) = self.state.compare_exchange_weak(state, next, Release, Relaxed) {
|
if let Err(new) = self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
|
||||||
// The state has changed, just try again.
|
// The state has changed, just try again.
|
||||||
state = new;
|
state = new;
|
||||||
continue;
|
continue;
|
||||||
@ -357,13 +380,27 @@ impl RwLock {
|
|||||||
|
|
||||||
// The node is registered, so the structure must not be
|
// The node is registered, so the structure must not be
|
||||||
// mutably accessed or destroyed while other threads may
|
// mutably accessed or destroyed while other threads may
|
||||||
// be accessing it. Just wait until it is completed.
|
// be accessing it. Guard against unwinds using a panic
|
||||||
|
// guard that aborts when dropped.
|
||||||
|
let guard = PanicGuard;
|
||||||
|
|
||||||
|
// If the current thread locked the queue, unlock it again,
|
||||||
|
// linking it in the process.
|
||||||
|
if state.addr() & (QUEUE_LOCKED | QUEUED) == QUEUED {
|
||||||
|
unsafe {
|
||||||
|
self.unlock_queue(next);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait until the node is removed from the queue.
|
||||||
// SAFETY: the node was created by the current thread.
|
// SAFETY: the node was created by the current thread.
|
||||||
unsafe {
|
unsafe {
|
||||||
node.wait();
|
node.wait();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The node was removed from the queue, disarm the guard.
|
||||||
|
mem::forget(guard);
|
||||||
|
|
||||||
// Reload the state and try again.
|
// Reload the state and try again.
|
||||||
state = self.state.load(Relaxed);
|
state = self.state.load(Relaxed);
|
||||||
count = 0;
|
count = 0;
|
||||||
@ -382,114 +419,128 @@ impl RwLock {
|
|||||||
}
|
}
|
||||||
}) {
|
}) {
|
||||||
Ok(_) => {}
|
Ok(_) => {}
|
||||||
Err(state) => unsafe { self.unlock_contended(state, true) },
|
// There are waiters queued and the lock count was moved to the
|
||||||
|
// tail of the queue.
|
||||||
|
Err(state) => unsafe { self.read_unlock_contended(state) },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cold]
|
||||||
|
unsafe fn read_unlock_contended(&self, state: State) {
|
||||||
|
// The state was observed with acquire ordering above, so the current
|
||||||
|
// thread will observe all node initializations.
|
||||||
|
|
||||||
|
let tail = unsafe { find_tail(to_node(state)) };
|
||||||
|
let was_last = unsafe { tail.as_ref().decrement_count() };
|
||||||
|
if was_last {
|
||||||
|
// SAFETY:
|
||||||
|
// Other threads cannot read-lock while threads are queued. Also,
|
||||||
|
// the `LOCKED` bit is still set, so there are no writers. Therefore,
|
||||||
|
// the current thread exclusively owns the lock.
|
||||||
|
unsafe { self.unlock_contended(state) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[inline]
|
#[inline]
|
||||||
pub unsafe fn write_unlock(&self) {
|
pub unsafe fn write_unlock(&self) {
|
||||||
match self.state.compare_exchange(invalid_mut(LOCKED), UNLOCKED, Release, Acquire) {
|
if let Err(state) =
|
||||||
Ok(_) => {}
|
self.state.compare_exchange(invalid_mut(LOCKED), UNLOCKED, Release, Relaxed)
|
||||||
|
{
|
||||||
|
// SAFETY:
|
||||||
// Since other threads cannot acquire the lock, the state can only
|
// Since other threads cannot acquire the lock, the state can only
|
||||||
// have changed because there are threads queued on the lock.
|
// have changed because there are threads queued on the lock.
|
||||||
Err(state) => unsafe { self.unlock_contended(state, false) },
|
unsafe { self.unlock_contended(state) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// # Safety
|
/// # Safety
|
||||||
/// The lock must be locked by the current thread and threads must be queued on it.
|
/// * The lock must be exclusively owned by this thread.
|
||||||
|
/// * There must be threads queued on the lock.
|
||||||
#[cold]
|
#[cold]
|
||||||
unsafe fn unlock_contended(&self, mut state: State, read: bool) {
|
unsafe fn unlock_contended(&self, mut state: State) {
|
||||||
// Find the last node in the linked queue.
|
loop {
|
||||||
let tail = unsafe { find_tail(to_node(state)) };
|
// Atomically release the lock and try to acquire the queue lock.
|
||||||
let not_last = unsafe { read && tail.as_ref().decrement_count() };
|
let next = state.map_addr(|a| (a & !LOCKED) | QUEUE_LOCKED);
|
||||||
if not_last {
|
match self.state.compare_exchange_weak(state, next, AcqRel, Relaxed) {
|
||||||
// There are other lock owners, leave waking up the next waiters to them.
|
// The queue lock was acquired. Release it, waking up the next
|
||||||
return;
|
// waiter in the process.
|
||||||
}
|
Ok(_) if state.addr() & QUEUE_LOCKED == 0 => unsafe {
|
||||||
|
return self.unlock_queue(next);
|
||||||
// At this point, the `next` field on `tail` will always be null
|
|
||||||
// (invariant 2).
|
|
||||||
|
|
||||||
let next_read = unsafe { tail.as_ref().read };
|
|
||||||
if next_read {
|
|
||||||
// The next waiter is a reader. Just wake all threads.
|
|
||||||
//
|
|
||||||
// SAFETY:
|
|
||||||
// `current` is the head of a valid queue, which no thread except the
|
|
||||||
// the current can observe.
|
|
||||||
unsafe {
|
|
||||||
let mut current = to_node(self.state.swap(UNLOCKED, AcqRel));
|
|
||||||
loop {
|
|
||||||
let next = current.as_ref().next.get();
|
|
||||||
Node::complete(current);
|
|
||||||
match next {
|
|
||||||
Some(next) => current = next,
|
|
||||||
None => break,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
// The next waiter is a writer. Remove it from the queue and wake it.
|
|
||||||
let prev = match unsafe { tail.as_ref().prev.get() } {
|
|
||||||
// If the lock was read-locked, multiple threads have invoked
|
|
||||||
// `find_tail` above. Therefore, it is possible that one of
|
|
||||||
// them observed a newer state than this thread did, meaning
|
|
||||||
// there is a set `tail` field in a node before `state`. To
|
|
||||||
// make sure that the queue is valid after the link update
|
|
||||||
// below, reload the state and relink the queue.
|
|
||||||
//
|
|
||||||
// SAFETY: since the current thread holds the lock, the queue
|
|
||||||
// was not removed from since the last time and therefore is
|
|
||||||
// still valid.
|
|
||||||
Some(prev) if read => unsafe {
|
|
||||||
let new = self.state.load(Acquire);
|
|
||||||
if new != state {
|
|
||||||
state = new;
|
|
||||||
find_tail(to_node(state));
|
|
||||||
}
|
|
||||||
Some(prev)
|
|
||||||
},
|
},
|
||||||
Some(prev) => Some(prev),
|
// Another thread already holds the queue lock, leave waking up
|
||||||
// The current node is the only one in the queue that we observed.
|
// waiters to it.
|
||||||
// Try setting the state to UNLOCKED.
|
Ok(_) => return,
|
||||||
None => self.state.compare_exchange(state, UNLOCKED, Release, Acquire).err().map(
|
Err(new) => state = new,
|
||||||
|new| {
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// # Safety
|
||||||
|
/// The queue lock must be held by the current thread.
|
||||||
|
unsafe fn unlock_queue(&self, mut state: State) {
|
||||||
|
debug_assert_eq!(state.addr() & (QUEUED | QUEUE_LOCKED), QUEUED | QUEUE_LOCKED);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// Find the last node in the linked list.
|
||||||
|
let tail = unsafe { find_tail(to_node(state)) };
|
||||||
|
|
||||||
|
if state.addr() & LOCKED == LOCKED {
|
||||||
|
// Another thread has locked the lock. Leave waking up waiters
|
||||||
|
// to them by releasing the queue lock.
|
||||||
|
match self.state.compare_exchange_weak(
|
||||||
|
state,
|
||||||
|
state.mask(!QUEUE_LOCKED),
|
||||||
|
Release,
|
||||||
|
Acquire,
|
||||||
|
) {
|
||||||
|
Ok(_) => return,
|
||||||
|
Err(new) => {
|
||||||
state = new;
|
state = new;
|
||||||
// Since the state was locked, it can only have changed
|
continue;
|
||||||
// because a new node was added since `state` was loaded.
|
}
|
||||||
// Relink the queue and get a pointer to the node before
|
|
||||||
// `tail`.
|
|
||||||
unsafe {
|
|
||||||
find_tail(to_node(state));
|
|
||||||
tail.as_ref().prev.get().unwrap()
|
|
||||||
}
|
|
||||||
},
|
|
||||||
),
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Some(prev) = prev {
|
|
||||||
unsafe {
|
|
||||||
// The `next` field of the tail field must be zero when
|
|
||||||
// releasing the lock (queue invariant 2).
|
|
||||||
prev.as_ref().next.set(None);
|
|
||||||
// There are no set `tail` links before the node pointed to by
|
|
||||||
// `state`, so the first non-null tail field will be current
|
|
||||||
// (queue invariant 4).
|
|
||||||
to_node(state).as_ref().tail.set(Some(prev));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Release the lock. Doing this by subtraction is more efficient
|
|
||||||
// on modern processors since it is a single instruction instead
|
|
||||||
// of an update loop, which will fail if new threads are added
|
|
||||||
// to the queue.
|
|
||||||
self.state.fetch_byte_sub(LOCKED, Release);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// The tail was split off and the lock released. Mark the node as
|
let is_writer = unsafe { tail.as_ref().write };
|
||||||
// completed.
|
if is_writer && let Some(prev) = unsafe { tail.as_ref().prev.get() } {
|
||||||
unsafe {
|
// `tail` is a writer and there is a node before `tail`.
|
||||||
Node::complete(tail);
|
// Split off `tail`.
|
||||||
|
|
||||||
|
// There are no set `tail` links before the node pointed to by
|
||||||
|
// `state`, so the first non-null tail field will be current
|
||||||
|
// (invariant 2). Invariant 4 is fullfilled since `find_tail`
|
||||||
|
// was called on this node, which ensures all backlinks are set.
|
||||||
|
unsafe { to_node(state).as_ref().tail.set(Some(prev)); }
|
||||||
|
|
||||||
|
// Release the queue lock. Doing this by subtraction is more
|
||||||
|
// efficient on modern processors since it is a single instruction
|
||||||
|
// instead of an update loop, which will fail if new threads are
|
||||||
|
// added to the list.
|
||||||
|
self.state.fetch_byte_sub(QUEUE_LOCKED, Release);
|
||||||
|
|
||||||
|
// The tail was split off and the lock released. Mark the node as
|
||||||
|
// completed.
|
||||||
|
unsafe { return Node::complete(tail); }
|
||||||
|
} else {
|
||||||
|
// The next waiter is a reader or the queue only consists of one
|
||||||
|
// waiter. Just wake all threads.
|
||||||
|
|
||||||
|
// The lock cannot be locked (checked above), so mark it as
|
||||||
|
// unlocked to reset the queue.
|
||||||
|
if let Err(new) = self.state.compare_exchange_weak(state, UNLOCKED, Release, Acquire) {
|
||||||
|
state = new;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut current = tail;
|
||||||
|
loop {
|
||||||
|
let prev = unsafe { current.as_ref().prev.get() };
|
||||||
|
unsafe { Node::complete(current); }
|
||||||
|
match prev {
|
||||||
|
Some(prev) => current = prev,
|
||||||
|
None => return,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user