Rollup merge of - RalfJung:scope-race, r=m-ou-se

fix data race in thread::scope

Puts the `ScopeData` into an `Arc` so it sticks around as long as we need it.
This means one extra `Arc::clone` per spawned scoped thread, which I hope is fine.

Fixes https://github.com/rust-lang/rust/issues/98498
r? `````@m-ou-se`````
This commit is contained in:
Matthias Krüger 2022-06-30 19:55:51 +02:00 committed by GitHub
commit ebecc13106
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 17 additions and 10 deletions
library/std/src/thread

View File

@ -159,6 +159,7 @@ use crate::cell::UnsafeCell;
use crate::ffi::{CStr, CString}; use crate::ffi::{CStr, CString};
use crate::fmt; use crate::fmt;
use crate::io; use crate::io;
use crate::marker::PhantomData;
use crate::mem; use crate::mem;
use crate::num::NonZeroU64; use crate::num::NonZeroU64;
use crate::num::NonZeroUsize; use crate::num::NonZeroUsize;
@ -462,7 +463,7 @@ impl Builder {
unsafe fn spawn_unchecked_<'a, 'scope, F, T>( unsafe fn spawn_unchecked_<'a, 'scope, F, T>(
self, self,
f: F, f: F,
scope_data: Option<&'scope scoped::ScopeData>, scope_data: Option<Arc<scoped::ScopeData>>,
) -> io::Result<JoinInner<'scope, T>> ) -> io::Result<JoinInner<'scope, T>>
where where
F: FnOnce() -> T, F: FnOnce() -> T,
@ -479,8 +480,11 @@ impl Builder {
})); }));
let their_thread = my_thread.clone(); let their_thread = my_thread.clone();
let my_packet: Arc<Packet<'scope, T>> = let my_packet: Arc<Packet<'scope, T>> = Arc::new(Packet {
Arc::new(Packet { scope: scope_data, result: UnsafeCell::new(None) }); scope: scope_data,
result: UnsafeCell::new(None),
_marker: PhantomData,
});
let their_packet = my_packet.clone(); let their_packet = my_packet.clone();
let output_capture = crate::io::set_output_capture(None); let output_capture = crate::io::set_output_capture(None);
@ -507,7 +511,7 @@ impl Builder {
unsafe { *their_packet.result.get() = Some(try_result) }; unsafe { *their_packet.result.get() = Some(try_result) };
}; };
if let Some(scope_data) = scope_data { if let Some(scope_data) = &my_packet.scope {
scope_data.increment_num_running_threads(); scope_data.increment_num_running_threads();
} }
@ -1298,8 +1302,9 @@ pub type Result<T> = crate::result::Result<T, Box<dyn Any + Send + 'static>>;
// An Arc to the packet is stored into a `JoinInner` which in turns is placed // An Arc to the packet is stored into a `JoinInner` which in turns is placed
// in `JoinHandle`. // in `JoinHandle`.
struct Packet<'scope, T> { struct Packet<'scope, T> {
scope: Option<&'scope scoped::ScopeData>, scope: Option<Arc<scoped::ScopeData>>,
result: UnsafeCell<Option<Result<T>>>, result: UnsafeCell<Option<Result<T>>>,
_marker: PhantomData<Option<&'scope scoped::ScopeData>>,
} }
// Due to the usage of `UnsafeCell` we need to manually implement Sync. // Due to the usage of `UnsafeCell` we need to manually implement Sync.
@ -1330,7 +1335,7 @@ impl<'scope, T> Drop for Packet<'scope, T> {
rtabort!("thread result panicked on drop"); rtabort!("thread result panicked on drop");
} }
// Book-keeping so the scope knows when it's done. // Book-keeping so the scope knows when it's done.
if let Some(scope) = self.scope { if let Some(scope) = &self.scope {
// Now that there will be no more user code running on this thread // Now that there will be no more user code running on this thread
// that can use 'scope, mark the thread as 'finished'. // that can use 'scope, mark the thread as 'finished'.
// It's important we only do this after the `result` has been dropped, // It's important we only do this after the `result` has been dropped,

View File

@ -11,7 +11,7 @@ use crate::sync::Arc;
/// See [`scope`] for details. /// See [`scope`] for details.
#[stable(feature = "scoped_threads", since = "1.63.0")] #[stable(feature = "scoped_threads", since = "1.63.0")]
pub struct Scope<'scope, 'env: 'scope> { pub struct Scope<'scope, 'env: 'scope> {
data: ScopeData, data: Arc<ScopeData>,
/// Invariance over 'scope, to make sure 'scope cannot shrink, /// Invariance over 'scope, to make sure 'scope cannot shrink,
/// which is necessary for soundness. /// which is necessary for soundness.
/// ///
@ -130,12 +130,14 @@ pub fn scope<'env, F, T>(f: F) -> T
where where
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T, F: for<'scope> FnOnce(&'scope Scope<'scope, 'env>) -> T,
{ {
// We put the `ScopeData` into an `Arc` so that other threads can finish their
// `decrement_num_running_threads` even after this function returns.
let scope = Scope { let scope = Scope {
data: ScopeData { data: Arc::new(ScopeData {
num_running_threads: AtomicUsize::new(0), num_running_threads: AtomicUsize::new(0),
main_thread: current(), main_thread: current(),
a_thread_panicked: AtomicBool::new(false), a_thread_panicked: AtomicBool::new(false),
}, }),
env: PhantomData, env: PhantomData,
scope: PhantomData, scope: PhantomData,
}; };
@ -250,7 +252,7 @@ impl Builder {
F: FnOnce() -> T + Send + 'scope, F: FnOnce() -> T + Send + 'scope,
T: Send + 'scope, T: Send + 'scope,
{ {
Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(&scope.data)) }?)) Ok(ScopedJoinHandle(unsafe { self.spawn_unchecked_(f, Some(scope.data.clone())) }?))
} }
} }