mirror of
https://github.com/rust-lang/rust.git
synced 2025-02-16 17:03:35 +00:00
Rollup merge of #105903 - joboet:unify_parking, r=m-ou-se
Unify id-based thread parking implementations Multiple platforms currently use thread-id-based parking implementations (NetBSD and SGX[^1]). Even though the strategy does not differ, these are duplicated for each platform, as the id is encoded into an atomic thread variable in different ways for each platform. Since `park` is only called by one thread, it is possible to move the thread id into a separate field. By ensuring that the field is only written to once, before any other threads access it, these accesses can be unsynchronized, removing any restrictions on the size and niches of the thread id. This PR also renames the internal `thread_parker` modules to `thread_parking`, as that name now better reflects their contents. I hope this does not add too much reviewing noise. r? `@m-ou-se` `@rustbot` label +T-libs [^1]: SOLID supports this as well, I will switch it over in a follow-up PR.
This commit is contained in:
commit
ff3326d925
@ -34,7 +34,7 @@ pub mod process;
|
||||
pub mod stdio;
|
||||
pub mod thread;
|
||||
pub mod thread_local_key;
|
||||
pub mod thread_parker;
|
||||
pub mod thread_parking;
|
||||
pub mod time;
|
||||
|
||||
mod condvar;
|
||||
|
@ -65,9 +65,9 @@ mod task_queue {
|
||||
/// execution. The signal is sent once all TLS destructors have finished at
|
||||
/// which point no new thread locals should be created.
|
||||
pub mod wait_notify {
|
||||
use super::super::thread_parker::Parker;
|
||||
use crate::pin::Pin;
|
||||
use crate::sync::Arc;
|
||||
use crate::sys_common::thread_parking::Parker;
|
||||
|
||||
pub struct Notifier(Arc<Parker>);
|
||||
|
||||
@ -87,14 +87,14 @@ pub mod wait_notify {
|
||||
/// called, this will return immediately, otherwise the current thread
|
||||
/// is blocked until notified.
|
||||
pub fn wait(self) {
|
||||
// This is not actually `unsafe`, but it uses the `Parker` API,
|
||||
// which needs `unsafe` on some platforms.
|
||||
// SAFETY:
|
||||
// This is only ever called on one thread.
|
||||
unsafe { Pin::new(&*self.0).park() }
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new() -> (Notifier, Waiter) {
|
||||
let inner = Arc::new(Parker::new_internal());
|
||||
let inner = Arc::new(Parker::new());
|
||||
(Notifier(inner.clone()), Waiter(inner))
|
||||
}
|
||||
}
|
||||
|
@ -1,107 +0,0 @@
|
||||
//! Thread parking based on SGX events.
|
||||
|
||||
use super::abi::{thread, usercalls};
|
||||
use crate::io::ErrorKind;
|
||||
use crate::pin::Pin;
|
||||
use crate::ptr::{self, NonNull};
|
||||
use crate::sync::atomic::AtomicPtr;
|
||||
use crate::sync::atomic::Ordering::{Acquire, Relaxed, Release};
|
||||
use crate::time::Duration;
|
||||
use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE};
|
||||
|
||||
// The TCS structure must be page-aligned (this is checked by EENTER), so these cannot
|
||||
// be valid pointers
|
||||
const EMPTY: *mut u8 = ptr::invalid_mut(1);
|
||||
const NOTIFIED: *mut u8 = ptr::invalid_mut(2);
|
||||
|
||||
pub struct Parker {
|
||||
/// The park state. One of EMPTY, NOTIFIED or a TCS address.
|
||||
/// A state change to NOTIFIED must be done with release ordering
|
||||
/// and be observed with acquire ordering so that operations after
|
||||
/// `thread::park` returns will not occur before the unpark message
|
||||
/// was sent.
|
||||
state: AtomicPtr<u8>,
|
||||
}
|
||||
|
||||
impl Parker {
|
||||
/// Construct the thread parker. The UNIX parker implementation
|
||||
/// requires this to happen in-place.
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
unsafe { parker.write(Parker::new_internal()) }
|
||||
}
|
||||
|
||||
pub(super) fn new_internal() -> Parker {
|
||||
Parker { state: AtomicPtr::new(EMPTY) }
|
||||
}
|
||||
|
||||
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
|
||||
pub unsafe fn park(self: Pin<&Self>) {
|
||||
if self.state.load(Acquire) != NOTIFIED {
|
||||
let mut prev = EMPTY;
|
||||
loop {
|
||||
// Guard against changing TCS addresses by always setting the state to
|
||||
// the current value.
|
||||
let tcs = thread::current().as_ptr();
|
||||
if self.state.compare_exchange(prev, tcs, Relaxed, Acquire).is_ok() {
|
||||
let event = usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap();
|
||||
assert!(event & EV_UNPARK == EV_UNPARK);
|
||||
prev = tcs;
|
||||
} else {
|
||||
// The state was definitely changed by another thread at this point.
|
||||
// The only time this occurs is when the state is changed to NOTIFIED.
|
||||
// We observed this change with acquire ordering, so we can simply
|
||||
// change the state to EMPTY with a relaxed store.
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, the token was definately read with acquire ordering,
|
||||
// so this can be a relaxed store.
|
||||
self.state.store(EMPTY, Relaxed);
|
||||
}
|
||||
|
||||
// This implementation doesn't require `unsafe` and `Pin`, but other implementations do.
|
||||
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
|
||||
let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64;
|
||||
let tcs = thread::current().as_ptr();
|
||||
|
||||
if self.state.load(Acquire) != NOTIFIED {
|
||||
if self.state.compare_exchange(EMPTY, tcs, Relaxed, Acquire).is_ok() {
|
||||
match usercalls::wait(EV_UNPARK, timeout) {
|
||||
Ok(event) => assert!(event & EV_UNPARK == EV_UNPARK),
|
||||
Err(e) => {
|
||||
assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock))
|
||||
}
|
||||
}
|
||||
|
||||
// Swap to provide acquire ordering even if the timeout occurred
|
||||
// before the token was set. This situation can result in spurious
|
||||
// wakeups on the next call to `park_timeout`, but it is better to let
|
||||
// those be handled by the user than do some perhaps unnecessary, but
|
||||
// always expensive guarding.
|
||||
self.state.swap(EMPTY, Acquire);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// The token was already read with `acquire` ordering, this can be a store.
|
||||
self.state.store(EMPTY, Relaxed);
|
||||
}
|
||||
|
||||
// This implementation doesn't require `Pin`, but other implementations do.
|
||||
pub fn unpark(self: Pin<&Self>) {
|
||||
let state = self.state.swap(NOTIFIED, Release);
|
||||
|
||||
if !matches!(state, EMPTY | NOTIFIED) {
|
||||
// There is a thread waiting, wake it up.
|
||||
let tcs = NonNull::new(state).unwrap();
|
||||
// This will fail if the thread has already terminated or its TCS is destroyed
|
||||
// by the time the signal is sent, but that is fine. If another thread receives
|
||||
// the same TCS, it will receive this notification as a spurious wakeup, but
|
||||
// all users of `wait` should and (internally) do guard against those where
|
||||
// necessary.
|
||||
let _ = usercalls::send(EV_UNPARK, Some(tcs));
|
||||
}
|
||||
}
|
||||
}
|
23
library/std/src/sys/sgx/thread_parking.rs
Normal file
23
library/std/src/sys/sgx/thread_parking.rs
Normal file
@ -0,0 +1,23 @@
|
||||
use super::abi::usercalls;
|
||||
use crate::io::ErrorKind;
|
||||
use crate::time::Duration;
|
||||
use fortanix_sgx_abi::{EV_UNPARK, WAIT_INDEFINITE};
|
||||
|
||||
pub type ThreadId = fortanix_sgx_abi::Tcs;
|
||||
|
||||
pub use super::abi::thread::current;
|
||||
|
||||
pub fn park(_hint: usize) {
|
||||
usercalls::wait(EV_UNPARK, WAIT_INDEFINITE).unwrap();
|
||||
}
|
||||
|
||||
pub fn park_timeout(dur: Duration, _hint: usize) {
|
||||
let timeout = u128::min(dur.as_nanos(), WAIT_INDEFINITE as u128 - 1) as u64;
|
||||
if let Err(e) = usercalls::wait(EV_UNPARK, timeout) {
|
||||
assert!(matches!(e.kind(), ErrorKind::TimedOut | ErrorKind::WouldBlock))
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unpark(tid: ThreadId, _hint: usize) {
|
||||
let _ = usercalls::send(EV_UNPARK, Some(tid));
|
||||
}
|
@ -40,7 +40,7 @@ pub mod stdio;
|
||||
pub mod thread;
|
||||
pub mod thread_local_dtor;
|
||||
pub mod thread_local_key;
|
||||
pub mod thread_parker;
|
||||
pub mod thread_parking;
|
||||
pub mod time;
|
||||
|
||||
#[cfg(target_os = "espidf")]
|
||||
|
@ -1,113 +0,0 @@
|
||||
use crate::ffi::{c_int, c_void};
|
||||
use crate::pin::Pin;
|
||||
use crate::ptr::{null, null_mut};
|
||||
use crate::sync::atomic::{
|
||||
AtomicU64,
|
||||
Ordering::{Acquire, Relaxed, Release},
|
||||
};
|
||||
use crate::time::Duration;
|
||||
use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC};
|
||||
|
||||
extern "C" {
|
||||
fn ___lwp_park60(
|
||||
clock_id: clockid_t,
|
||||
flags: c_int,
|
||||
ts: *mut timespec,
|
||||
unpark: lwpid_t,
|
||||
hint: *const c_void,
|
||||
unparkhint: *const c_void,
|
||||
) -> c_int;
|
||||
fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int;
|
||||
}
|
||||
|
||||
/// The thread is not parked and the token is not available.
|
||||
///
|
||||
/// Zero cannot be a valid LWP id, since it is used as empty value for the unpark
|
||||
/// argument in _lwp_park.
|
||||
const EMPTY: u64 = 0;
|
||||
/// The token is available. Do not park anymore.
|
||||
const NOTIFIED: u64 = u64::MAX;
|
||||
|
||||
pub struct Parker {
|
||||
/// The parker state. Contains either one of the two state values above or the LWP
|
||||
/// id of the parked thread.
|
||||
state: AtomicU64,
|
||||
}
|
||||
|
||||
impl Parker {
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
parker.write(Parker { state: AtomicU64::new(EMPTY) })
|
||||
}
|
||||
|
||||
// Does not actually need `unsafe` or `Pin`, but the pthread implementation does.
|
||||
pub unsafe fn park(self: Pin<&Self>) {
|
||||
// If the token has already been made available, we can skip
|
||||
// a bit of work, so check for it here.
|
||||
if self.state.load(Acquire) != NOTIFIED {
|
||||
let parked = _lwp_self() as u64;
|
||||
let hint = self.state.as_mut_ptr().cast();
|
||||
if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() {
|
||||
// Loop to guard against spurious wakeups.
|
||||
loop {
|
||||
___lwp_park60(0, 0, null_mut(), 0, hint, null());
|
||||
if self.state.load(Acquire) == NOTIFIED {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// At this point, the change to NOTIFIED has always been observed with acquire
|
||||
// ordering, so we can just use a relaxed store here (instead of a swap).
|
||||
self.state.store(EMPTY, Relaxed);
|
||||
}
|
||||
|
||||
// Does not actually need `unsafe` or `Pin`, but the pthread implementation does.
|
||||
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
|
||||
if self.state.load(Acquire) != NOTIFIED {
|
||||
let parked = _lwp_self() as u64;
|
||||
let hint = self.state.as_mut_ptr().cast();
|
||||
let mut timeout = timespec {
|
||||
// Saturate so that the operation will definitely time out
|
||||
// (even if it is after the heat death of the universe).
|
||||
tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX),
|
||||
tv_nsec: dur.subsec_nanos().into(),
|
||||
};
|
||||
|
||||
if self.state.compare_exchange(EMPTY, parked, Relaxed, Acquire).is_ok() {
|
||||
// Timeout needs to be mutable since it is modified on NetBSD 9.0 and
|
||||
// above.
|
||||
___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, hint, null());
|
||||
// Use a swap to get acquire ordering even if the token was set after
|
||||
// the timeout occurred.
|
||||
self.state.swap(EMPTY, Acquire);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
self.state.store(EMPTY, Relaxed);
|
||||
}
|
||||
|
||||
// Does not actually need `Pin`, but the pthread implementation does.
|
||||
pub fn unpark(self: Pin<&Self>) {
|
||||
let state = self.state.swap(NOTIFIED, Release);
|
||||
if !matches!(state, EMPTY | NOTIFIED) {
|
||||
let lwp = state as lwpid_t;
|
||||
let hint = self.state.as_mut_ptr().cast();
|
||||
|
||||
// If the parking thread terminated and did not actually park, this will
|
||||
// probably return an error, which is OK. In the worst case, another
|
||||
// thread has received the same LWP id. It will then receive a spurious
|
||||
// wakeup, but those are allowable per the API contract. The same reasoning
|
||||
// applies if a timeout occurred before this call, but the state was not
|
||||
// yet reset.
|
||||
|
||||
// SAFETY:
|
||||
// The syscall has no invariants to hold. Only unsafe because it is an
|
||||
// extern function.
|
||||
unsafe {
|
||||
_lwp_unpark(lwp, hint);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -46,7 +46,7 @@ unsafe impl Sync for Parker {}
|
||||
unsafe impl Send for Parker {}
|
||||
|
||||
impl Parker {
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
pub unsafe fn new_in_place(parker: *mut Parker) {
|
||||
let semaphore = dispatch_semaphore_create(0);
|
||||
assert!(
|
||||
!semaphore.is_null(),
|
@ -24,7 +24,7 @@ cfg_if::cfg_if! {
|
||||
pub use darwin::Parker;
|
||||
} else if #[cfg(target_os = "netbsd")] {
|
||||
mod netbsd;
|
||||
pub use netbsd::Parker;
|
||||
pub use netbsd::{current, park, park_timeout, unpark, ThreadId};
|
||||
} else {
|
||||
mod pthread;
|
||||
pub use pthread::Parker;
|
52
library/std/src/sys/unix/thread_parking/netbsd.rs
Normal file
52
library/std/src/sys/unix/thread_parking/netbsd.rs
Normal file
@ -0,0 +1,52 @@
|
||||
use crate::ffi::{c_int, c_void};
|
||||
use crate::ptr;
|
||||
use crate::time::Duration;
|
||||
use libc::{_lwp_self, clockid_t, lwpid_t, time_t, timespec, CLOCK_MONOTONIC};
|
||||
|
||||
extern "C" {
|
||||
fn ___lwp_park60(
|
||||
clock_id: clockid_t,
|
||||
flags: c_int,
|
||||
ts: *mut timespec,
|
||||
unpark: lwpid_t,
|
||||
hint: *const c_void,
|
||||
unparkhint: *const c_void,
|
||||
) -> c_int;
|
||||
fn _lwp_unpark(lwp: lwpid_t, hint: *const c_void) -> c_int;
|
||||
}
|
||||
|
||||
pub type ThreadId = lwpid_t;
|
||||
|
||||
#[inline]
|
||||
pub fn current() -> ThreadId {
|
||||
unsafe { _lwp_self() }
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn park(hint: usize) {
|
||||
unsafe {
|
||||
___lwp_park60(0, 0, ptr::null_mut(), 0, ptr::invalid(hint), ptr::null());
|
||||
}
|
||||
}
|
||||
|
||||
pub fn park_timeout(dur: Duration, hint: usize) {
|
||||
let mut timeout = timespec {
|
||||
// Saturate so that the operation will definitely time out
|
||||
// (even if it is after the heat death of the universe).
|
||||
tv_sec: dur.as_secs().try_into().ok().unwrap_or(time_t::MAX),
|
||||
tv_nsec: dur.subsec_nanos().into(),
|
||||
};
|
||||
|
||||
// Timeout needs to be mutable since it is modified on NetBSD 9.0 and
|
||||
// above.
|
||||
unsafe {
|
||||
___lwp_park60(CLOCK_MONOTONIC, 0, &mut timeout, 0, ptr::invalid(hint), ptr::null());
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn unpark(tid: ThreadId, hint: usize) {
|
||||
unsafe {
|
||||
_lwp_unpark(tid, ptr::invalid(hint));
|
||||
}
|
||||
}
|
@ -99,7 +99,7 @@ impl Parker {
|
||||
///
|
||||
/// # Safety
|
||||
/// The constructed parker must never be moved.
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
pub unsafe fn new_in_place(parker: *mut Parker) {
|
||||
// Use the default mutex implementation to allow for simpler initialization.
|
||||
// This could lead to undefined behaviour when deadlocking. This is avoided
|
||||
// by not deadlocking. Note in particular the unlocking operation before any
|
@ -33,7 +33,7 @@ pub mod stdio;
|
||||
pub mod thread;
|
||||
pub mod thread_local_dtor;
|
||||
pub mod thread_local_key;
|
||||
pub mod thread_parker;
|
||||
pub mod thread_parking;
|
||||
pub mod time;
|
||||
cfg_if::cfg_if! {
|
||||
if #[cfg(not(target_vendor = "uwp"))] {
|
||||
|
@ -97,7 +97,7 @@ const NOTIFIED: i8 = 1;
|
||||
impl Parker {
|
||||
/// Construct the Windows parker. The UNIX parker implementation
|
||||
/// requires this to happen in-place.
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
pub unsafe fn new_in_place(parker: *mut Parker) {
|
||||
parker.write(Self { state: AtomicI8::new(EMPTY) });
|
||||
}
|
||||
|
@ -30,7 +30,7 @@ pub mod process;
|
||||
pub mod thread;
|
||||
pub mod thread_info;
|
||||
pub mod thread_local_dtor;
|
||||
pub mod thread_parker;
|
||||
pub mod thread_parking;
|
||||
pub mod wstr;
|
||||
pub mod wtf8;
|
||||
|
||||
|
@ -35,7 +35,7 @@ pub struct Parker {
|
||||
impl Parker {
|
||||
/// Construct the futex parker. The UNIX parker implementation
|
||||
/// requires this to happen in-place.
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
pub unsafe fn new_in_place(parker: *mut Parker) {
|
||||
parker.write(Self { state: AtomicU32::new(EMPTY) });
|
||||
}
|
||||
|
@ -19,7 +19,7 @@ pub struct Parker {
|
||||
impl Parker {
|
||||
/// Construct the generic parker. The UNIX parker implementation
|
||||
/// requires this to happen in-place.
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
pub unsafe fn new_in_place(parker: *mut Parker) {
|
||||
parker.write(Parker {
|
||||
state: AtomicUsize::new(EMPTY),
|
||||
lock: Mutex::new(()),
|
108
library/std/src/sys_common/thread_parking/id.rs
Normal file
108
library/std/src/sys_common/thread_parking/id.rs
Normal file
@ -0,0 +1,108 @@
|
||||
//! Thread parking using thread ids.
|
||||
//!
|
||||
//! Some platforms (notably NetBSD) have thread parking primitives whose semantics
|
||||
//! match those offered by `thread::park`, with the difference that the thread to
|
||||
//! be unparked is referenced by a platform-specific thread id. Since the thread
|
||||
//! parker is constructed before that id is known, an atomic state variable is used
|
||||
//! to manage the park state and propagate the thread id. This also avoids platform
|
||||
//! calls in the case where `unpark` is called before `park`.
|
||||
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::pin::Pin;
|
||||
use crate::sync::atomic::{
|
||||
fence, AtomicI8,
|
||||
Ordering::{Acquire, Relaxed, Release},
|
||||
};
|
||||
use crate::sys::thread_parking::{current, park, park_timeout, unpark, ThreadId};
|
||||
use crate::time::Duration;
|
||||
|
||||
pub struct Parker {
|
||||
state: AtomicI8,
|
||||
tid: UnsafeCell<Option<ThreadId>>,
|
||||
}
|
||||
|
||||
const PARKED: i8 = -1;
|
||||
const EMPTY: i8 = 0;
|
||||
const NOTIFIED: i8 = 1;
|
||||
|
||||
impl Parker {
|
||||
pub fn new() -> Parker {
|
||||
Parker { state: AtomicI8::new(EMPTY), tid: UnsafeCell::new(None) }
|
||||
}
|
||||
|
||||
/// Create a new thread parker. UNIX requires this to happen in-place.
|
||||
pub unsafe fn new_in_place(parker: *mut Parker) {
|
||||
parker.write(Parker::new())
|
||||
}
|
||||
|
||||
/// # Safety
|
||||
/// * must always be called from the same thread
|
||||
/// * must be called before the state is set to PARKED
|
||||
unsafe fn init_tid(&self) {
|
||||
// The field is only ever written to from this thread, so we don't need
|
||||
// synchronization to read it here.
|
||||
if self.tid.get().read().is_none() {
|
||||
// Because this point is only reached once, before the state is set
|
||||
// to PARKED for the first time, the non-atomic write here can not
|
||||
// conflict with reads by other threads.
|
||||
self.tid.get().write(Some(current()));
|
||||
// Ensure that the write can be observed by all threads reading the
|
||||
// state. Synchronizes with the acquire barrier in `unpark`.
|
||||
fence(Release);
|
||||
}
|
||||
}
|
||||
|
||||
pub unsafe fn park(self: Pin<&Self>) {
|
||||
self.init_tid();
|
||||
|
||||
// Changes NOTIFIED to EMPTY and EMPTY to PARKED.
|
||||
let mut state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
|
||||
if state == PARKED {
|
||||
// Loop to guard against spurious wakeups.
|
||||
while state == PARKED {
|
||||
park(self.state.as_mut_ptr().addr());
|
||||
state = self.state.load(Acquire);
|
||||
}
|
||||
|
||||
// Since the state change has already been observed with acquire
|
||||
// ordering, the state can be reset with a relaxed store instead
|
||||
// of a swap.
|
||||
self.state.store(EMPTY, Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
pub unsafe fn park_timeout(self: Pin<&Self>, dur: Duration) {
|
||||
self.init_tid();
|
||||
|
||||
let state = self.state.fetch_sub(1, Acquire).wrapping_sub(1);
|
||||
if state == PARKED {
|
||||
park_timeout(dur, self.state.as_mut_ptr().addr());
|
||||
// Swap to ensure that we observe all state changes with acquire
|
||||
// ordering, even if the state has been changed after the timeout
|
||||
// occured.
|
||||
self.state.swap(EMPTY, Acquire);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn unpark(self: Pin<&Self>) {
|
||||
let state = self.state.swap(NOTIFIED, Release);
|
||||
if state == PARKED {
|
||||
// Synchronize with the release fence in `init_tid` to observe the
|
||||
// write to `tid`.
|
||||
fence(Acquire);
|
||||
// # Safety
|
||||
// The thread id is initialized before the state is set to `PARKED`
|
||||
// for the first time and is not written to from that point on
|
||||
// (negating the need for an atomic read).
|
||||
let tid = unsafe { self.tid.get().read().unwrap_unchecked() };
|
||||
// It is possible that the waiting thread woke up because of a timeout
|
||||
// and terminated before this call is made. This call then returns an
|
||||
// error or wakes up an unrelated thread. The platform API and
|
||||
// environment does allow this, however.
|
||||
unpark(tid, self.state.as_mut_ptr().addr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
unsafe impl Send for Parker {}
|
||||
unsafe impl Sync for Parker {}
|
@ -11,13 +11,17 @@ cfg_if::cfg_if! {
|
||||
))] {
|
||||
mod futex;
|
||||
pub use futex::Parker;
|
||||
} else if #[cfg(any(
|
||||
target_os = "netbsd",
|
||||
all(target_vendor = "fortanix", target_env = "sgx"),
|
||||
))] {
|
||||
mod id;
|
||||
pub use id::Parker;
|
||||
} else if #[cfg(target_os = "solid_asp3")] {
|
||||
mod wait_flag;
|
||||
pub use wait_flag::Parker;
|
||||
} else if #[cfg(any(windows, target_family = "unix"))] {
|
||||
pub use crate::sys::thread_parker::Parker;
|
||||
} else if #[cfg(all(target_vendor = "fortanix", target_env = "sgx"))] {
|
||||
pub use crate::sys::thread_parker::Parker;
|
||||
pub use crate::sys::thread_parking::Parker;
|
||||
} else {
|
||||
mod generic;
|
||||
pub use generic::Parker;
|
@ -41,7 +41,7 @@ pub struct Parker {
|
||||
impl Parker {
|
||||
/// Construct a parker for the current thread. The UNIX parker
|
||||
/// implementation requires this to happen in-place.
|
||||
pub unsafe fn new(parker: *mut Parker) {
|
||||
pub unsafe fn new_in_place(parker: *mut Parker) {
|
||||
parker.write(Parker { state: AtomicI8::new(EMPTY), wait_flag: WaitFlag::new() })
|
||||
}
|
||||
|
@ -173,7 +173,7 @@ use crate::sync::Arc;
|
||||
use crate::sys::thread as imp;
|
||||
use crate::sys_common::thread;
|
||||
use crate::sys_common::thread_info;
|
||||
use crate::sys_common::thread_parker::Parker;
|
||||
use crate::sys_common::thread_parking::Parker;
|
||||
use crate::sys_common::{AsInner, IntoInner};
|
||||
use crate::time::Duration;
|
||||
|
||||
@ -1216,7 +1216,7 @@ impl Thread {
|
||||
let ptr = Arc::get_mut_unchecked(&mut arc).as_mut_ptr();
|
||||
addr_of_mut!((*ptr).name).write(name);
|
||||
addr_of_mut!((*ptr).id).write(ThreadId::new());
|
||||
Parker::new(addr_of_mut!((*ptr).parker));
|
||||
Parker::new_in_place(addr_of_mut!((*ptr).parker));
|
||||
Pin::new_unchecked(arc.assume_init())
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user