mirror of
https://github.com/rust-lang/rust.git
synced 2025-01-24 05:33:41 +00:00
Remove ptr-int transmute in std::sync::mpsc
Since https://github.com/rust-lang/rust/pull/95340 landed, Miri with -Zmiri-check-number-validity produces an error on the test suites of some crates which implement concurrency tools, because it seems like such crates tend to use std::sync::mpsc in their tests. This fixes the problem by storing pointer bytes in a pointer.
This commit is contained in:
parent
340f6491be
commit
dec73f58d8
@ -1,6 +1,5 @@
|
||||
//! Generic support for building blocking abstractions.
|
||||
|
||||
use crate::mem;
|
||||
use crate::sync::atomic::{AtomicBool, Ordering};
|
||||
use crate::sync::Arc;
|
||||
use crate::thread::{self, Thread};
|
||||
@ -47,18 +46,18 @@ impl SignalToken {
|
||||
wake
|
||||
}
|
||||
|
||||
/// Converts to an unsafe usize value. Useful for storing in a pipe's state
|
||||
/// Converts to an unsafe raw pointer. Useful for storing in a pipe's state
|
||||
/// flag.
|
||||
#[inline]
|
||||
pub unsafe fn cast_to_usize(self) -> usize {
|
||||
mem::transmute(self.inner)
|
||||
pub unsafe fn to_raw(self) -> *mut u8 {
|
||||
Arc::into_raw(self.inner) as *mut u8
|
||||
}
|
||||
|
||||
/// Converts from an unsafe usize value. Useful for retrieving a pipe's state
|
||||
/// Converts from an unsafe raw pointer. Useful for retrieving a pipe's state
|
||||
/// flag.
|
||||
#[inline]
|
||||
pub unsafe fn cast_from_usize(signal_ptr: usize) -> SignalToken {
|
||||
SignalToken { inner: mem::transmute(signal_ptr) }
|
||||
pub unsafe fn from_raw(signal_ptr: *mut u8) -> SignalToken {
|
||||
SignalToken { inner: Arc::from_raw(signal_ptr as *mut Inner) }
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -27,15 +27,15 @@ pub use self::UpgradeResult::*;
|
||||
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::ptr;
|
||||
use crate::sync::atomic::{AtomicUsize, Ordering};
|
||||
use crate::sync::atomic::{AtomicPtr, Ordering};
|
||||
use crate::sync::mpsc::blocking::{self, SignalToken};
|
||||
use crate::sync::mpsc::Receiver;
|
||||
use crate::time::Instant;
|
||||
|
||||
// Various states you can find a port in.
|
||||
const EMPTY: usize = 0; // initial state: no data, no blocked receiver
|
||||
const DATA: usize = 1; // data ready for receiver to take
|
||||
const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
|
||||
const EMPTY: *mut u8 = ptr::invalid_mut::<u8>(0); // initial state: no data, no blocked receiver
|
||||
const DATA: *mut u8 = ptr::invalid_mut::<u8>(1); // data ready for receiver to take
|
||||
const DISCONNECTED: *mut u8 = ptr::invalid_mut::<u8>(2); // channel is disconnected OR upgraded
|
||||
// Any other value represents a pointer to a SignalToken value. The
|
||||
// protocol ensures that when the state moves *to* a pointer,
|
||||
// ownership of the token is given to the packet, and when the state
|
||||
@ -44,7 +44,7 @@ const DISCONNECTED: usize = 2; // channel is disconnected OR upgraded
|
||||
|
||||
pub struct Packet<T> {
|
||||
// Internal state of the chan/port pair (stores the blocked thread as well)
|
||||
state: AtomicUsize,
|
||||
state: AtomicPtr<u8>,
|
||||
// One-shot data slot location
|
||||
data: UnsafeCell<Option<T>>,
|
||||
// when used for the second time, a oneshot channel must be upgraded, and
|
||||
@ -75,7 +75,7 @@ impl<T> Packet<T> {
|
||||
Packet {
|
||||
data: UnsafeCell::new(None),
|
||||
upgrade: UnsafeCell::new(NothingSent),
|
||||
state: AtomicUsize::new(EMPTY),
|
||||
state: AtomicPtr::new(EMPTY),
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,7 +108,7 @@ impl<T> Packet<T> {
|
||||
// There is a thread waiting on the other end. We leave the 'DATA'
|
||||
// state inside so it'll pick it up on the other end.
|
||||
ptr => {
|
||||
SignalToken::cast_from_usize(ptr).signal();
|
||||
SignalToken::from_raw(ptr).signal();
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -126,7 +126,7 @@ impl<T> Packet<T> {
|
||||
// like we're not empty, then immediately go through to `try_recv`.
|
||||
if self.state.load(Ordering::SeqCst) == EMPTY {
|
||||
let (wait_token, signal_token) = blocking::tokens();
|
||||
let ptr = unsafe { signal_token.cast_to_usize() };
|
||||
let ptr = unsafe { signal_token.to_raw() };
|
||||
|
||||
// race with senders to enter the blocking state
|
||||
if self.state.compare_exchange(EMPTY, ptr, Ordering::SeqCst, Ordering::SeqCst).is_ok() {
|
||||
@ -142,7 +142,7 @@ impl<T> Packet<T> {
|
||||
}
|
||||
} else {
|
||||
// drop the signal token, since we never blocked
|
||||
drop(unsafe { SignalToken::cast_from_usize(ptr) });
|
||||
drop(unsafe { SignalToken::from_raw(ptr) });
|
||||
}
|
||||
}
|
||||
|
||||
@ -218,7 +218,7 @@ impl<T> Packet<T> {
|
||||
}
|
||||
|
||||
// If someone's waiting, we gotta wake them up
|
||||
ptr => UpWoke(SignalToken::cast_from_usize(ptr)),
|
||||
ptr => UpWoke(SignalToken::from_raw(ptr)),
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -229,7 +229,7 @@ impl<T> Packet<T> {
|
||||
|
||||
// If someone's waiting, we gotta wake them up
|
||||
ptr => unsafe {
|
||||
SignalToken::cast_from_usize(ptr).signal();
|
||||
SignalToken::from_raw(ptr).signal();
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -301,7 +301,7 @@ impl<T> Packet<T> {
|
||||
|
||||
// We woke ourselves up from select.
|
||||
ptr => unsafe {
|
||||
drop(SignalToken::cast_from_usize(ptr));
|
||||
drop(SignalToken::from_raw(ptr));
|
||||
Ok(false)
|
||||
},
|
||||
}
|
||||
|
@ -15,7 +15,7 @@ use core::intrinsics::abort;
|
||||
|
||||
use crate::cell::UnsafeCell;
|
||||
use crate::ptr;
|
||||
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
|
||||
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, AtomicUsize, Ordering};
|
||||
use crate::sync::mpsc::blocking::{self, SignalToken};
|
||||
use crate::sync::mpsc::mpsc_queue as mpsc;
|
||||
use crate::sync::{Mutex, MutexGuard};
|
||||
@ -29,12 +29,13 @@ const MAX_REFCOUNT: usize = (isize::MAX) as usize;
|
||||
const MAX_STEALS: isize = 5;
|
||||
#[cfg(not(test))]
|
||||
const MAX_STEALS: isize = 1 << 20;
|
||||
const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
|
||||
|
||||
pub struct Packet<T> {
|
||||
queue: mpsc::Queue<T>,
|
||||
cnt: AtomicIsize, // How many items are on this channel
|
||||
steals: UnsafeCell<isize>, // How many times has a port received without blocking?
|
||||
to_wake: AtomicUsize, // SignalToken for wake up
|
||||
to_wake: AtomicPtr<u8>, // SignalToken for wake up
|
||||
|
||||
// The number of channels which are currently using this packet.
|
||||
channels: AtomicUsize,
|
||||
@ -68,7 +69,7 @@ impl<T> Packet<T> {
|
||||
queue: mpsc::Queue::new(),
|
||||
cnt: AtomicIsize::new(0),
|
||||
steals: UnsafeCell::new(0),
|
||||
to_wake: AtomicUsize::new(0),
|
||||
to_wake: AtomicPtr::new(EMPTY),
|
||||
channels: AtomicUsize::new(2),
|
||||
port_dropped: AtomicBool::new(false),
|
||||
sender_drain: AtomicIsize::new(0),
|
||||
@ -93,8 +94,8 @@ impl<T> Packet<T> {
|
||||
pub fn inherit_blocker(&self, token: Option<SignalToken>, guard: MutexGuard<'_, ()>) {
|
||||
if let Some(token) = token {
|
||||
assert_eq!(self.cnt.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
|
||||
self.to_wake.store(unsafe { token.cast_to_usize() }, Ordering::SeqCst);
|
||||
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
|
||||
self.to_wake.store(unsafe { token.to_raw() }, Ordering::SeqCst);
|
||||
self.cnt.store(-1, Ordering::SeqCst);
|
||||
|
||||
// This store is a little sketchy. What's happening here is that
|
||||
@ -250,10 +251,10 @@ impl<T> Packet<T> {
|
||||
unsafe {
|
||||
assert_eq!(
|
||||
self.to_wake.load(Ordering::SeqCst),
|
||||
0,
|
||||
EMPTY,
|
||||
"This is a known bug in the Rust standard library. See https://github.com/rust-lang/rust/issues/39364"
|
||||
);
|
||||
let ptr = token.cast_to_usize();
|
||||
let ptr = token.to_raw();
|
||||
self.to_wake.store(ptr, Ordering::SeqCst);
|
||||
|
||||
let steals = ptr::replace(self.steals.get(), 0);
|
||||
@ -272,8 +273,8 @@ impl<T> Packet<T> {
|
||||
}
|
||||
}
|
||||
|
||||
self.to_wake.store(0, Ordering::SeqCst);
|
||||
drop(SignalToken::cast_from_usize(ptr));
|
||||
self.to_wake.store(EMPTY, Ordering::SeqCst);
|
||||
drop(SignalToken::from_raw(ptr));
|
||||
Abort
|
||||
}
|
||||
}
|
||||
@ -415,9 +416,9 @@ impl<T> Packet<T> {
|
||||
// Consumes ownership of the 'to_wake' field.
|
||||
fn take_to_wake(&self) -> SignalToken {
|
||||
let ptr = self.to_wake.load(Ordering::SeqCst);
|
||||
self.to_wake.store(0, Ordering::SeqCst);
|
||||
assert!(ptr != 0);
|
||||
unsafe { SignalToken::cast_from_usize(ptr) }
|
||||
self.to_wake.store(EMPTY, Ordering::SeqCst);
|
||||
assert!(ptr != EMPTY);
|
||||
unsafe { SignalToken::from_raw(ptr) }
|
||||
}
|
||||
|
||||
////////////////////////////////////////////////////////////////////////////
|
||||
@ -462,7 +463,7 @@ impl<T> Packet<T> {
|
||||
let prev = self.bump(steals + 1);
|
||||
|
||||
if prev == DISCONNECTED {
|
||||
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
|
||||
true
|
||||
} else {
|
||||
let cur = prev + steals + 1;
|
||||
@ -470,7 +471,7 @@ impl<T> Packet<T> {
|
||||
if prev < 0 {
|
||||
drop(self.take_to_wake());
|
||||
} else {
|
||||
while self.to_wake.load(Ordering::SeqCst) != 0 {
|
||||
while self.to_wake.load(Ordering::SeqCst) != EMPTY {
|
||||
thread::yield_now();
|
||||
}
|
||||
}
|
||||
@ -494,7 +495,7 @@ impl<T> Drop for Packet<T> {
|
||||
// `to_wake`, so this assert cannot be removed with also removing
|
||||
// the `to_wake` assert.
|
||||
assert_eq!(self.cnt.load(Ordering::SeqCst), DISCONNECTED);
|
||||
assert_eq!(self.to_wake.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(self.to_wake.load(Ordering::SeqCst), EMPTY);
|
||||
assert_eq!(self.channels.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,7 @@ use crate::ptr;
|
||||
use crate::thread;
|
||||
use crate::time::Instant;
|
||||
|
||||
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicUsize, Ordering};
|
||||
use crate::sync::atomic::{AtomicBool, AtomicIsize, AtomicPtr, Ordering};
|
||||
use crate::sync::mpsc::blocking::{self, SignalToken};
|
||||
use crate::sync::mpsc::spsc_queue as spsc;
|
||||
use crate::sync::mpsc::Receiver;
|
||||
@ -27,6 +27,7 @@ const DISCONNECTED: isize = isize::MIN;
|
||||
const MAX_STEALS: isize = 5;
|
||||
#[cfg(not(test))]
|
||||
const MAX_STEALS: isize = 1 << 20;
|
||||
const EMPTY: *mut u8 = ptr::null_mut(); // initial state: no data, no blocked receiver
|
||||
|
||||
pub struct Packet<T> {
|
||||
// internal queue for all messages
|
||||
@ -34,8 +35,8 @@ pub struct Packet<T> {
|
||||
}
|
||||
|
||||
struct ProducerAddition {
|
||||
cnt: AtomicIsize, // How many items are on this channel
|
||||
to_wake: AtomicUsize, // SignalToken for the blocked thread to wake up
|
||||
cnt: AtomicIsize, // How many items are on this channel
|
||||
to_wake: AtomicPtr<u8>, // SignalToken for the blocked thread to wake up
|
||||
|
||||
port_dropped: AtomicBool, // flag if the channel has been destroyed.
|
||||
}
|
||||
@ -71,7 +72,7 @@ impl<T> Packet<T> {
|
||||
128,
|
||||
ProducerAddition {
|
||||
cnt: AtomicIsize::new(0),
|
||||
to_wake: AtomicUsize::new(0),
|
||||
to_wake: AtomicPtr::new(EMPTY),
|
||||
|
||||
port_dropped: AtomicBool::new(false),
|
||||
},
|
||||
@ -147,17 +148,17 @@ impl<T> Packet<T> {
|
||||
// Consumes ownership of the 'to_wake' field.
|
||||
fn take_to_wake(&self) -> SignalToken {
|
||||
let ptr = self.queue.producer_addition().to_wake.load(Ordering::SeqCst);
|
||||
self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
|
||||
assert!(ptr != 0);
|
||||
unsafe { SignalToken::cast_from_usize(ptr) }
|
||||
self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
|
||||
assert!(ptr != EMPTY);
|
||||
unsafe { SignalToken::from_raw(ptr) }
|
||||
}
|
||||
|
||||
// Decrements the count on the channel for a sleeper, returning the sleeper
|
||||
// back if it shouldn't sleep. Note that this is the location where we take
|
||||
// steals into account.
|
||||
fn decrement(&self, token: SignalToken) -> Result<(), SignalToken> {
|
||||
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
|
||||
let ptr = unsafe { token.cast_to_usize() };
|
||||
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
|
||||
let ptr = unsafe { token.to_raw() };
|
||||
self.queue.producer_addition().to_wake.store(ptr, Ordering::SeqCst);
|
||||
|
||||
let steals = unsafe { ptr::replace(self.queue.consumer_addition().steals.get(), 0) };
|
||||
@ -176,8 +177,8 @@ impl<T> Packet<T> {
|
||||
}
|
||||
}
|
||||
|
||||
self.queue.producer_addition().to_wake.store(0, Ordering::SeqCst);
|
||||
Err(unsafe { SignalToken::cast_from_usize(ptr) })
|
||||
self.queue.producer_addition().to_wake.store(EMPTY, Ordering::SeqCst);
|
||||
Err(unsafe { SignalToken::from_raw(ptr) })
|
||||
}
|
||||
|
||||
pub fn recv(&self, deadline: Option<Instant>) -> Result<T, Failure<T>> {
|
||||
@ -376,7 +377,7 @@ impl<T> Packet<T> {
|
||||
// of time until the data is actually sent.
|
||||
if was_upgrade {
|
||||
assert_eq!(unsafe { *self.queue.consumer_addition().steals.get() }, 0);
|
||||
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
|
||||
return Ok(true);
|
||||
}
|
||||
|
||||
@ -389,7 +390,7 @@ impl<T> Packet<T> {
|
||||
// If we were previously disconnected, then we know for sure that there
|
||||
// is no thread in to_wake, so just keep going
|
||||
let has_data = if prev == DISCONNECTED {
|
||||
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
|
||||
true // there is data, that data is that we're disconnected
|
||||
} else {
|
||||
let cur = prev + steals + 1;
|
||||
@ -412,7 +413,7 @@ impl<T> Packet<T> {
|
||||
if prev < 0 {
|
||||
drop(self.take_to_wake());
|
||||
} else {
|
||||
while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != 0 {
|
||||
while self.queue.producer_addition().to_wake.load(Ordering::SeqCst) != EMPTY {
|
||||
thread::yield_now();
|
||||
}
|
||||
}
|
||||
@ -451,6 +452,6 @@ impl<T> Drop for Packet<T> {
|
||||
// `to_wake`, so this assert cannot be removed with also removing
|
||||
// the `to_wake` assert.
|
||||
assert_eq!(self.queue.producer_addition().cnt.load(Ordering::SeqCst), DISCONNECTED);
|
||||
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(self.queue.producer_addition().to_wake.load(Ordering::SeqCst), EMPTY);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user