mirror of
https://github.com/rust-lang/rust.git
synced 2024-11-25 08:13:41 +00:00
Add ParallelGuard
type to handle unwinding in parallel sections
This commit is contained in:
parent
51eb8427bf
commit
b56acac41d
@ -41,6 +41,7 @@
|
||||
//! [^2] `MTLockRef` is a typedef.
|
||||
|
||||
pub use crate::marker::*;
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::{BuildHasher, Hash};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
@ -103,6 +104,37 @@ mod mode {
|
||||
|
||||
pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
|
||||
|
||||
/// A guard used to hold panics that occur during a parallel section to later by unwound.
|
||||
/// This is used for the parallel compiler to prevent fatal errors from non-deterministically
|
||||
/// hiding errors by ensuring that everything in the section has completed executing before
|
||||
/// continuing with unwinding. It's also used for the non-parallel code to ensure error message
|
||||
/// output match the parallel compiler for testing purposes.
|
||||
pub struct ParallelGuard {
|
||||
panic: Lock<Option<Box<dyn Any + std::marker::Send + 'static>>>,
|
||||
}
|
||||
|
||||
impl ParallelGuard {
|
||||
#[inline]
|
||||
pub fn new() -> Self {
|
||||
ParallelGuard { panic: Lock::new(None) }
|
||||
}
|
||||
|
||||
pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
|
||||
catch_unwind(AssertUnwindSafe(f))
|
||||
.map_err(|err| {
|
||||
*self.panic.lock() = Some(err);
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn unwind(self) {
|
||||
if let Some(panic) = self.panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
cfg_if! {
|
||||
if #[cfg(not(parallel_compiler))] {
|
||||
use std::ops::Add;
|
||||
@ -198,66 +230,37 @@ cfg_if! {
|
||||
where A: FnOnce() -> RA,
|
||||
B: FnOnce() -> RB
|
||||
{
|
||||
(oper_a(), oper_b())
|
||||
let guard = ParallelGuard::new();
|
||||
let a = guard.run(oper_a);
|
||||
let b = guard.run(oper_b);
|
||||
guard.unwind();
|
||||
(a.unwrap(), b.unwrap())
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! parallel {
|
||||
($($blocks:block),*) => {{
|
||||
// We catch panics here ensuring that all the blocks execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
$(
|
||||
if let Err(p) = ::std::panic::catch_unwind(
|
||||
::std::panic::AssertUnwindSafe(|| $blocks)
|
||||
) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
)*
|
||||
if let Some(panic) = panic {
|
||||
::std::panic::resume_unwind(panic);
|
||||
}
|
||||
let mut guard = $crate::sync::ParallelGuard::new();
|
||||
$(guard.run(|| $blocks);)*
|
||||
guard.unwind();
|
||||
}}
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
let guard = ParallelGuard::new();
|
||||
t.into_iter().for_each(|i| {
|
||||
if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
guard.unwind();
|
||||
}
|
||||
|
||||
pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
|
||||
t: T,
|
||||
mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
|
||||
) -> C {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let mut panic = None;
|
||||
let r = t.into_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
let guard = ParallelGuard::new();
|
||||
let r = t.into_iter().filter_map(|i| guard.run(|| map(i))).collect();
|
||||
guard.unwind();
|
||||
r
|
||||
}
|
||||
|
||||
@ -380,7 +383,11 @@ cfg_if! {
|
||||
let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()()));
|
||||
(a.into_inner(), b.into_inner())
|
||||
} else {
|
||||
(oper_a(), oper_b())
|
||||
let guard = ParallelGuard::new();
|
||||
let a = guard.run(oper_a);
|
||||
let b = guard.run(oper_b);
|
||||
guard.unwind();
|
||||
(a.unwrap(), b.unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
@ -415,28 +422,10 @@ cfg_if! {
|
||||
// of a single threaded rustc.
|
||||
parallel!(impl $fblock [] [$($blocks),*]);
|
||||
} else {
|
||||
// We catch panics here ensuring that all the blocks execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
if let Err(p) = ::std::panic::catch_unwind(
|
||||
::std::panic::AssertUnwindSafe(|| $fblock)
|
||||
) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
$(
|
||||
if let Err(p) = ::std::panic::catch_unwind(
|
||||
::std::panic::AssertUnwindSafe(|| $blocks)
|
||||
) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
)*
|
||||
if let Some(panic) = panic {
|
||||
::std::panic::resume_unwind(panic);
|
||||
}
|
||||
let guard = $crate::sync::ParallelGuard::new();
|
||||
guard.run(|| $fblock);
|
||||
$(guard.run(|| $blocks);)*
|
||||
guard.unwind();
|
||||
}
|
||||
};
|
||||
}
|
||||
@ -449,31 +438,17 @@ cfg_if! {
|
||||
) {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let for_each = FromDyn::from(for_each);
|
||||
let panic: Mutex<Option<_>> = Mutex::new(None);
|
||||
t.into_par_iter().for_each(|i| if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
|
||||
let mut l = panic.lock();
|
||||
if l.is_none() {
|
||||
*l = Some(p)
|
||||
}
|
||||
let guard = ParallelGuard::new();
|
||||
t.into_par_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
|
||||
if let Some(panic) = panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
guard.unwind();
|
||||
} else {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
// This makes behavior consistent with the parallel compiler.
|
||||
let mut panic = None;
|
||||
let guard = ParallelGuard::new();
|
||||
t.into_iter().for_each(|i| {
|
||||
if let Err(p) = catch_unwind(AssertUnwindSafe(|| for_each(i))) {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
}
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
guard.unwind();
|
||||
}
|
||||
}
|
||||
|
||||
@ -487,43 +462,15 @@ cfg_if! {
|
||||
map: impl Fn(I) -> R + DynSync + DynSend
|
||||
) -> C {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let panic: Mutex<Option<_>> = Mutex::new(None);
|
||||
let map = FromDyn::from(map);
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let r = t.into_par_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
let mut l = panic.lock();
|
||||
if l.is_none() {
|
||||
*l = Some(p);
|
||||
}
|
||||
None
|
||||
},
|
||||
}
|
||||
}).collect();
|
||||
|
||||
if let Some(panic) = panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
let guard = ParallelGuard::new();
|
||||
let r = t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect();
|
||||
guard.unwind();
|
||||
r
|
||||
} else {
|
||||
// We catch panics here ensuring that all the loop iterations execute.
|
||||
let mut panic = None;
|
||||
let r = t.into_iter().filter_map(|i| {
|
||||
match catch_unwind(AssertUnwindSafe(|| map(i))) {
|
||||
Ok(r) => Some(r),
|
||||
Err(p) => {
|
||||
if panic.is_none() {
|
||||
panic = Some(p);
|
||||
}
|
||||
None
|
||||
}
|
||||
}
|
||||
}).collect();
|
||||
if let Some(panic) = panic {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
let guard = ParallelGuard::new();
|
||||
let r = t.into_iter().filter_map(|i| guard.run(|| map(i))).collect();
|
||||
guard.unwind();
|
||||
r
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user