diff --git a/library/std/src/sync/mpsc/blocking.rs b/library/std/src/sync/mpsc/blocking.rs index 4c852b8ee81..021df7b096c 100644 --- a/library/std/src/sync/mpsc/blocking.rs +++ b/library/std/src/sync/mpsc/blocking.rs @@ -1,6 +1,5 @@ //! Generic support for building blocking abstractions. -use crate::mem; use crate::sync::atomic::{AtomicBool, Ordering}; use crate::sync::Arc; use crate::thread::{self, Thread}; @@ -47,18 +46,18 @@ impl SignalToken { wake } - /// Converts to an unsafe usize value. Useful for storing in a pipe's state + /// Converts to an unsafe raw pointer. Useful for storing in a pipe's state /// flag. #[inline] - pub unsafe fn cast_to_usize(self) -> usize { - mem::transmute(self.inner) + pub unsafe fn to_raw(self) -> *mut u8 { + Arc::into_raw(self.inner) as *mut u8 } - /// Converts from an unsafe usize value. Useful for retrieving a pipe's state + /// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state /// flag. #[inline] - pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken { - SignalToken { inner: mem::transmute(signal_ptr) } + pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken { + SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) } } } diff --git a/library/std/src/sync/mpsc/oneshot.rs b/library/std/src/sync/mpsc/oneshot.rs index 3dcf03f579a..0e259b8aecb 100644 --- a/library/std/src/sync/mpsc/oneshot.rs +++ b/library/std/src/sync/mpsc/oneshot.rs @@ -27,15 +27,15 @@ pub use self::UpgradeResult::*; use crate::cell::UnsafeCell; use crate::ptr; -use crate::sync::atomic::{AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicPtr, Ordering}; use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::sync::mpsc::Receiver; use crate::time::Instant; // Various states you can find a port in. -const EMPTY: usize = 0; // initial state: no data, no blocked receiver -const DATA: usize = 1; // data ready for receiver to take -const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded +const EMPTY: *mut u8 = ptr::invalid_mut::(0); // initial state: no data, no blocked receiver +const DATA: *mut u8 = ptr::invalid_mut::(1); // data ready for receiver to take +const DISCONNECTED: *mut u8 = ptr::invalid_mut::(2); // channel is disconnected OR upgraded // Any other value represents a pointer to a SignalToken value. The // protocol ensures that when the state moves *to* a pointer, // ownership of the token is given to the packet, and when the state @@ -44,7 +44,7 @@ const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded pub struct Packet { // Internal state of the chan/port pair (stores the blocked thread as well) - state: AtomicUsize, + state: AtomicPtr, // One-shot data slot location data: UnsafeCell>, // when used for the second time, a oneshot channel must be upgraded, and @@ -75,7 +75,7 @@ impl Packet { Packet { data: UnsafeCell::new(None), upgrade: UnsafeCell::new(NothingSent), - state: AtomicUsize::new(EMPTY), + state: AtomicPtr::new(EMPTY), } } @@ -108,7 +108,7 @@ impl Packet { // There is a thread waiting on the other end. We leave the 'DATA' // state inside so it'll pick it up on the other end. ptr => { - SignalToken::cast_from_usize(ptr).signal(); + SignalToken::from_raw(ptr).signal(); Ok(()) } } @@ -126,7 +126,7 @@ impl Packet { // like we're not empty, then immediately go through to `try_recv`. if self.state.load(Ordering::SeqCst) == EMPTY { let (wait_token, signal_token) = blocking::tokens(); - let ptr = unsafe { signal_token.cast_to_usize() }; + let ptr = unsafe { signal_token.to_raw() }; // race with senders to enter the blocking state if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() { @@ -142,7 +142,7 @@ impl Packet { } } else { // drop the signal token, since we never blocked - drop(unsafe { SignalToken::cast_from_usize(ptr) }); + drop(unsafe { SignalToken::from_raw(ptr) }); } } @@ -218,7 +218,7 @@ impl Packet { } // If someone's waiting, we gotta wake them up - ptr => UpWoke(SignalToken::cast_from_usize(ptr)), + ptr => UpWoke(SignalToken::from_raw(ptr)), } } } @@ -229,7 +229,7 @@ impl Packet { // If someone's waiting, we gotta wake them up ptr => unsafe { - SignalToken::cast_from_usize(ptr).signal(); + SignalToken::from_raw(ptr).signal(); }, } } @@ -301,7 +301,7 @@ impl Packet { // We woke ourselves up from select. ptr => unsafe { - drop(SignalToken::cast_from_usize(ptr)); + drop(SignalToken::from_raw(ptr)); Ok(false) }, } diff --git a/library/std/src/sync/mpsc/shared.rs b/library/std/src/sync/mpsc/shared.rs index 56162655544..51917bd96bd 100644 --- a/library/std/src/sync/mpsc/shared.rs +++ b/library/std/src/sync/mpsc/shared.rs @@ -15,7 +15,7 @@ use core::intrinsics::abort; use crate::cell::UnsafeCell; use crate::ptr; -use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering}; use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::sync::mpsc::mpsc_queue as mpsc; use crate::sync::{Mutex, MutexGuard}; @@ -29,12 +29,13 @@ const MAX_REFCOUNT: usize = (isize::MAX) as usize; const MAX_STEALS: isize = 5; #[cfg(not(test))] const MAX_STEALS: isize = 1 << 20; +const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver pub struct Packet { queue: mpsc::Queue, cnt: AtomicIsize, // How many items are on this channel steals: UnsafeCell, // How many times has a port received without blocking? - to_wake: AtomicUsize, // SignalToken for wake up + to_wake: AtomicPtr, // SignalToken for wake up // The number of channels which are currently using this packet. channels: AtomicUsize, @@ -68,7 +69,7 @@ impl Packet { queue: mpsc::Queue::new(), cnt: AtomicIsize::new(0), steals: UnsafeCell::new(0), - to_wake: AtomicUsize::new(0), + to_wake: AtomicPtr::new(EMPTY), channels: AtomicUsize::new(2), port_dropped: AtomicBool::new(false), sender_drain: AtomicIsize::new(0), @@ -93,8 +94,8 @@ impl Packet { pub fn inherit_blocker(&self, token: Option, guard: MutexGuard<'_, ()>) { if let Some(token) = token { assert_eq!(self.cnt.load(Ordering::SeqCst), 0); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); - self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst); + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); + self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst); self.cnt.store(-1, Ordering::SeqCst); // This store is a little sketchy. What's happening here is that @@ -250,10 +251,10 @@ impl Packet { unsafe { assert_eq!( self.to_wake.load(Ordering::SeqCst), - 0, + EMPTY, "This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364" ); - let ptr = token.cast_to_usize(); + let ptr = token.to_raw(); self.to_wake.store(ptr, Ordering::SeqCst); let steals = ptr::replace(self.steals.get(), 0); @@ -272,8 +273,8 @@ impl Packet { } } - self.to_wake.store(0, Ordering::SeqCst); - drop(SignalToken::cast_from_usize(ptr)); + self.to_wake.store(EMPTY, Ordering::SeqCst); + drop(SignalToken::from_raw(ptr)); Abort } } @@ -415,9 +416,9 @@ impl Packet { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&self) -> SignalToken { let ptr = self.to_wake.load(Ordering::SeqCst); - self.to_wake.store(0, Ordering::SeqCst); - assert!(ptr != 0); - unsafe { SignalToken::cast_from_usize(ptr) } + self.to_wake.store(EMPTY, Ordering::SeqCst); + assert!(ptr != EMPTY); + unsafe { SignalToken::from_raw(ptr) } } //////////////////////////////////////////////////////////////////////////// @@ -462,7 +463,7 @@ impl Packet { let prev = self.bump(steals + 1); if prev == DISCONNECTED { - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); true } else { let cur = prev + steals + 1; @@ -470,7 +471,7 @@ impl Packet { if prev < 0 { drop(self.take_to_wake()); } else { - while self.to_wake.load(Ordering::SeqCst) != 0 { + while self.to_wake.load(Ordering::SeqCst) != EMPTY { thread::yield_now(); } } @@ -494,7 +495,7 @@ impl Drop for Packet { // `to_wake`, so this assert cannot be removed with also removing // the `to_wake` assert. assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED); - assert_eq!(self.to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY); assert_eq!(self.channels.load(Ordering::SeqCst), 0); } } diff --git a/library/std/src/sync/mpsc/stream.rs b/library/std/src/sync/mpsc/stream.rs index 2a1d3f8967e..4c3812c79f6 100644 --- a/library/std/src/sync/mpsc/stream.rs +++ b/library/std/src/sync/mpsc/stream.rs @@ -17,7 +17,7 @@ use crate::ptr; use crate::thread; use crate::time::Instant; -use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering}; +use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering}; use crate::sync::mpsc::blocking::{self, SignalToken}; use crate::sync::mpsc::spsc_queue as spsc; use crate::sync::mpsc::Receiver; @@ -27,6 +27,7 @@ const DISCONNECTED: isize = isize::MIN; const MAX_STEALS: isize = 5; #[cfg(not(test))] const MAX_STEALS: isize = 1 << 20; +const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver pub struct Packet { // internal queue for all messages @@ -34,8 +35,8 @@ pub struct Packet { } struct ProducerAddition { - cnt: AtomicIsize, // How many items are on this channel - to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up + cnt: AtomicIsize, // How many items are on this channel + to_wake: AtomicPtr, // SignalToken for the blocked thread to wake up port_dropped: AtomicBool, // flag if the channel has been destroyed. } @@ -71,7 +72,7 @@ impl Packet { 128, ProducerAddition { cnt: AtomicIsize::new(0), - to_wake: AtomicUsize::new(0), + to_wake: AtomicPtr::new(EMPTY), port_dropped: AtomicBool::new(false), }, @@ -147,17 +148,17 @@ impl Packet { // Consumes ownership of the 'to_wake' field. fn take_to_wake(&self) -> SignalToken { let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst); - self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); - assert!(ptr != 0); - unsafe { SignalToken::cast_from_usize(ptr) } + self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst); + assert!(ptr != EMPTY); + unsafe { SignalToken::from_raw(ptr) } } // Decrements the count on the channel for a sleeper, returning the sleeper // back if it shouldn't sleep. Note that this is the location where we take // steals into account. fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> { - assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); - let ptr = unsafe { token.cast_to_usize() }; + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); + let ptr = unsafe { token.to_raw() }; self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst); let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) }; @@ -176,8 +177,8 @@ impl Packet { } } - self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst); - Err(unsafe { SignalToken::cast_from_usize(ptr) }) + self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst); + Err(unsafe { SignalToken::from_raw(ptr) }) } pub fn recv(&self, deadline: Option) -> Result> { @@ -376,7 +377,7 @@ impl Packet { // of time until the data is actually sent. if was_upgrade { assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0); - assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); return Ok(true); } @@ -389,7 +390,7 @@ impl Packet { // If we were previously disconnected, then we know for sure that there // is no thread in to_wake, so just keep going let has_data = if prev == DISCONNECTED { - assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); true // there is data, that data is that we're disconnected } else { let cur = prev + steals + 1; @@ -412,7 +413,7 @@ impl Packet { if prev < 0 { drop(self.take_to_wake()); } else { - while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 { + while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY { thread::yield_now(); } } @@ -451,6 +452,6 @@ impl Drop for Packet { // `to_wake`, so this assert cannot be removed with also removing // the `to_wake` assert. assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED); - assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0); + assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY); } }