mirror of
https://github.com/rust-lang/rust.git
synced 2024-10-30 14:01:51 +00:00
Rollup merge of #115548 - Zoxc:parallel-extract, r=wesleywiser
Extract parallel operations in `rustc_data_structures::sync` into a new `parallel` submodule This extracts parallel operations in `rustc_data_structures::sync` into a new `parallel` submodule. This cuts down on the size of the large `cfg_if!` in `sync` and makes it easier to compare between serial and parallel variants.
This commit is contained in:
commit
f3cc59b741
@ -41,12 +41,9 @@
|
||||
//! [^2] `MTLockRef` is a typedef.
|
||||
|
||||
pub use crate::marker::*;
|
||||
use parking_lot::Mutex;
|
||||
use std::any::Any;
|
||||
use std::collections::HashMap;
|
||||
use std::hash::{BuildHasher, Hash};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
|
||||
|
||||
mod lock;
|
||||
pub use lock::{Lock, LockGuard, Mode};
|
||||
@ -54,6 +51,11 @@ pub use lock::{Lock, LockGuard, Mode};
|
||||
mod worker_local;
|
||||
pub use worker_local::{Registry, WorkerLocal};
|
||||
|
||||
mod parallel;
|
||||
#[cfg(parallel_compiler)]
|
||||
pub use parallel::scope;
|
||||
pub use parallel::{join, par_for_each_in, par_map, parallel_guard};
|
||||
|
||||
pub use std::sync::atomic::Ordering;
|
||||
pub use std::sync::atomic::Ordering::SeqCst;
|
||||
|
||||
@ -107,37 +109,6 @@ mod mode {
|
||||
|
||||
pub use mode::{is_dyn_thread_safe, set_dyn_thread_safe_mode};
|
||||
|
||||
/// A guard used to hold panics that occur during a parallel section to later by unwound.
|
||||
/// This is used for the parallel compiler to prevent fatal errors from non-deterministically
|
||||
/// hiding errors by ensuring that everything in the section has completed executing before
|
||||
/// continuing with unwinding. It's also used for the non-parallel code to ensure error message
|
||||
/// output match the parallel compiler for testing purposes.
|
||||
pub struct ParallelGuard {
|
||||
panic: Mutex<Option<Box<dyn Any + std::marker::Send + 'static>>>,
|
||||
}
|
||||
|
||||
impl ParallelGuard {
|
||||
pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
|
||||
catch_unwind(AssertUnwindSafe(f))
|
||||
.map_err(|err| {
|
||||
*self.panic.lock() = Some(err);
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
}
|
||||
|
||||
/// This gives access to a fresh parallel guard in the closure and will unwind any panics
|
||||
/// caught in it after the closure returns.
|
||||
#[inline]
|
||||
pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
|
||||
let guard = ParallelGuard { panic: Mutex::new(None) };
|
||||
let ret = f(&guard);
|
||||
if let Some(panic) = guard.panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
cfg_if! {
|
||||
if #[cfg(not(parallel_compiler))] {
|
||||
use std::ops::Add;
|
||||
@ -229,44 +200,6 @@ cfg_if! {
|
||||
pub type AtomicU32 = Atomic<u32>;
|
||||
pub type AtomicU64 = Atomic<u64>;
|
||||
|
||||
pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
|
||||
where A: FnOnce() -> RA,
|
||||
B: FnOnce() -> RB
|
||||
{
|
||||
let (a, b) = parallel_guard(|guard| {
|
||||
let a = guard.run(oper_a);
|
||||
let b = guard.run(oper_b);
|
||||
(a, b)
|
||||
});
|
||||
(a.unwrap(), b.unwrap())
|
||||
}
|
||||
|
||||
#[macro_export]
|
||||
macro_rules! parallel {
|
||||
($($blocks:block),*) => {{
|
||||
$crate::sync::parallel_guard(|guard| {
|
||||
$(guard.run(|| $blocks);)*
|
||||
});
|
||||
}}
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item) + Sync + Send) {
|
||||
parallel_guard(|guard| {
|
||||
t.into_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
|
||||
t: T,
|
||||
mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
|
||||
) -> C {
|
||||
parallel_guard(|guard| {
|
||||
t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
|
||||
})
|
||||
}
|
||||
|
||||
pub use std::rc::Rc as Lrc;
|
||||
pub use std::rc::Weak as Weak;
|
||||
pub use std::cell::Ref as ReadGuard;
|
||||
@ -372,105 +305,6 @@ cfg_if! {
|
||||
|
||||
use std::thread;
|
||||
|
||||
#[inline]
|
||||
pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
|
||||
where
|
||||
A: FnOnce() -> RA + DynSend,
|
||||
B: FnOnce() -> RB + DynSend,
|
||||
{
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let oper_a = FromDyn::from(oper_a);
|
||||
let oper_b = FromDyn::from(oper_b);
|
||||
let (a, b) = rayon::join(move || FromDyn::from(oper_a.into_inner()()), move || FromDyn::from(oper_b.into_inner()()));
|
||||
(a.into_inner(), b.into_inner())
|
||||
} else {
|
||||
let (a, b) = parallel_guard(|guard| {
|
||||
let a = guard.run(oper_a);
|
||||
let b = guard.run(oper_b);
|
||||
(a, b)
|
||||
});
|
||||
(a.unwrap(), b.unwrap())
|
||||
}
|
||||
}
|
||||
|
||||
// This function only works when `mode::is_dyn_thread_safe()`.
|
||||
pub fn scope<'scope, OP, R>(op: OP) -> R
|
||||
where
|
||||
OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
|
||||
R: DynSend,
|
||||
{
|
||||
let op = FromDyn::from(op);
|
||||
rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
|
||||
}
|
||||
|
||||
/// Runs a list of blocks in parallel. The first block is executed immediately on
|
||||
/// the current thread. Use that for the longest running block.
|
||||
#[macro_export]
|
||||
macro_rules! parallel {
|
||||
(impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
|
||||
parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
|
||||
};
|
||||
(impl $fblock:block [$($blocks:expr,)*] []) => {
|
||||
::rustc_data_structures::sync::scope(|s| {
|
||||
$(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks);
|
||||
s.spawn(move |_| block.into_inner()());)*
|
||||
(|| $fblock)();
|
||||
});
|
||||
};
|
||||
($fblock:block, $($blocks:block),*) => {
|
||||
if rustc_data_structures::sync::is_dyn_thread_safe() {
|
||||
// Reverse the order of the later blocks since Rayon executes them in reverse order
|
||||
// when using a single thread. This ensures the execution order matches that
|
||||
// of a single threaded rustc.
|
||||
parallel!(impl $fblock [] [$($blocks),*]);
|
||||
} else {
|
||||
$crate::sync::parallel_guard(|guard| {
|
||||
guard.run(|| $fblock);
|
||||
$(guard.run(|| $blocks);)*
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
|
||||
pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
|
||||
t: T,
|
||||
for_each: impl Fn(I) + DynSync + DynSend
|
||||
) {
|
||||
parallel_guard(|guard| {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let for_each = FromDyn::from(for_each);
|
||||
t.into_par_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
} else {
|
||||
t.into_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn par_map<
|
||||
I,
|
||||
T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
|
||||
R: std::marker::Send,
|
||||
C: FromIterator<R> + FromParallelIterator<R>
|
||||
>(
|
||||
t: T,
|
||||
map: impl Fn(I) -> R + DynSync + DynSend
|
||||
) -> C {
|
||||
parallel_guard(|guard| {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let map = FromDyn::from(map);
|
||||
t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()
|
||||
} else {
|
||||
t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// This makes locks panic if they are already held.
|
||||
/// It is only useful when you are running in a single thread
|
||||
const ERROR_CHECKING: bool = false;
|
||||
|
188
compiler/rustc_data_structures/src/sync/parallel.rs
Normal file
188
compiler/rustc_data_structures/src/sync/parallel.rs
Normal file
@ -0,0 +1,188 @@
|
||||
//! This module defines parallel operations that are implemented in
|
||||
//! one way for the serial compiler, and another way the parallel compiler.
|
||||
|
||||
#![allow(dead_code)]
|
||||
|
||||
use parking_lot::Mutex;
|
||||
use std::any::Any;
|
||||
use std::panic::{catch_unwind, resume_unwind, AssertUnwindSafe};
|
||||
|
||||
#[cfg(not(parallel_compiler))]
|
||||
pub use disabled::*;
|
||||
#[cfg(parallel_compiler)]
|
||||
pub use enabled::*;
|
||||
|
||||
/// A guard used to hold panics that occur during a parallel section to later by unwound.
|
||||
/// This is used for the parallel compiler to prevent fatal errors from non-deterministically
|
||||
/// hiding errors by ensuring that everything in the section has completed executing before
|
||||
/// continuing with unwinding. It's also used for the non-parallel code to ensure error message
|
||||
/// output match the parallel compiler for testing purposes.
|
||||
pub struct ParallelGuard {
|
||||
panic: Mutex<Option<Box<dyn Any + Send + 'static>>>,
|
||||
}
|
||||
|
||||
impl ParallelGuard {
|
||||
pub fn run<R>(&self, f: impl FnOnce() -> R) -> Option<R> {
|
||||
catch_unwind(AssertUnwindSafe(f))
|
||||
.map_err(|err| {
|
||||
*self.panic.lock() = Some(err);
|
||||
})
|
||||
.ok()
|
||||
}
|
||||
}
|
||||
|
||||
/// This gives access to a fresh parallel guard in the closure and will unwind any panics
|
||||
/// caught in it after the closure returns.
|
||||
#[inline]
|
||||
pub fn parallel_guard<R>(f: impl FnOnce(&ParallelGuard) -> R) -> R {
|
||||
let guard = ParallelGuard { panic: Mutex::new(None) };
|
||||
let ret = f(&guard);
|
||||
if let Some(panic) = guard.panic.into_inner() {
|
||||
resume_unwind(panic);
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
mod disabled {
|
||||
use crate::sync::parallel_guard;
|
||||
|
||||
#[macro_export]
|
||||
#[cfg(not(parallel_compiler))]
|
||||
macro_rules! parallel {
|
||||
($($blocks:block),*) => {{
|
||||
$crate::sync::parallel_guard(|guard| {
|
||||
$(guard.run(|| $blocks);)*
|
||||
});
|
||||
}}
|
||||
}
|
||||
|
||||
pub fn join<A, B, RA, RB>(oper_a: A, oper_b: B) -> (RA, RB)
|
||||
where
|
||||
A: FnOnce() -> RA,
|
||||
B: FnOnce() -> RB,
|
||||
{
|
||||
let (a, b) = parallel_guard(|guard| {
|
||||
let a = guard.run(oper_a);
|
||||
let b = guard.run(oper_b);
|
||||
(a, b)
|
||||
});
|
||||
(a.unwrap(), b.unwrap())
|
||||
}
|
||||
|
||||
pub fn par_for_each_in<T: IntoIterator>(t: T, mut for_each: impl FnMut(T::Item)) {
|
||||
parallel_guard(|guard| {
|
||||
t.into_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
})
|
||||
}
|
||||
|
||||
pub fn par_map<T: IntoIterator, R, C: FromIterator<R>>(
|
||||
t: T,
|
||||
mut map: impl FnMut(<<T as IntoIterator>::IntoIter as Iterator>::Item) -> R,
|
||||
) -> C {
|
||||
parallel_guard(|guard| t.into_iter().filter_map(|i| guard.run(|| map(i))).collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(parallel_compiler)]
|
||||
mod enabled {
|
||||
use crate::sync::{mode, parallel_guard, DynSend, DynSync, FromDyn};
|
||||
|
||||
/// Runs a list of blocks in parallel. The first block is executed immediately on
|
||||
/// the current thread. Use that for the longest running block.
|
||||
#[macro_export]
|
||||
macro_rules! parallel {
|
||||
(impl $fblock:block [$($c:expr,)*] [$block:expr $(, $rest:expr)*]) => {
|
||||
parallel!(impl $fblock [$block, $($c,)*] [$($rest),*])
|
||||
};
|
||||
(impl $fblock:block [$($blocks:expr,)*] []) => {
|
||||
::rustc_data_structures::sync::scope(|s| {
|
||||
$(let block = rustc_data_structures::sync::FromDyn::from(|| $blocks);
|
||||
s.spawn(move |_| block.into_inner()());)*
|
||||
(|| $fblock)();
|
||||
});
|
||||
};
|
||||
($fblock:block, $($blocks:block),*) => {
|
||||
if rustc_data_structures::sync::is_dyn_thread_safe() {
|
||||
// Reverse the order of the later blocks since Rayon executes them in reverse order
|
||||
// when using a single thread. This ensures the execution order matches that
|
||||
// of a single threaded rustc.
|
||||
parallel!(impl $fblock [] [$($blocks),*]);
|
||||
} else {
|
||||
$crate::sync::parallel_guard(|guard| {
|
||||
guard.run(|| $fblock);
|
||||
$(guard.run(|| $blocks);)*
|
||||
});
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
// This function only works when `mode::is_dyn_thread_safe()`.
|
||||
pub fn scope<'scope, OP, R>(op: OP) -> R
|
||||
where
|
||||
OP: FnOnce(&rayon::Scope<'scope>) -> R + DynSend,
|
||||
R: DynSend,
|
||||
{
|
||||
let op = FromDyn::from(op);
|
||||
rayon::scope(|s| FromDyn::from(op.into_inner()(s))).into_inner()
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn join<A, B, RA: DynSend, RB: DynSend>(oper_a: A, oper_b: B) -> (RA, RB)
|
||||
where
|
||||
A: FnOnce() -> RA + DynSend,
|
||||
B: FnOnce() -> RB + DynSend,
|
||||
{
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let oper_a = FromDyn::from(oper_a);
|
||||
let oper_b = FromDyn::from(oper_b);
|
||||
let (a, b) = rayon::join(
|
||||
move || FromDyn::from(oper_a.into_inner()()),
|
||||
move || FromDyn::from(oper_b.into_inner()()),
|
||||
);
|
||||
(a.into_inner(), b.into_inner())
|
||||
} else {
|
||||
super::disabled::join(oper_a, oper_b)
|
||||
}
|
||||
}
|
||||
|
||||
use rayon::iter::{FromParallelIterator, IntoParallelIterator, ParallelIterator};
|
||||
|
||||
pub fn par_for_each_in<I, T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>>(
|
||||
t: T,
|
||||
for_each: impl Fn(I) + DynSync + DynSend,
|
||||
) {
|
||||
parallel_guard(|guard| {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let for_each = FromDyn::from(for_each);
|
||||
t.into_par_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
} else {
|
||||
t.into_iter().for_each(|i| {
|
||||
guard.run(|| for_each(i));
|
||||
});
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
pub fn par_map<
|
||||
I,
|
||||
T: IntoIterator<Item = I> + IntoParallelIterator<Item = I>,
|
||||
R: std::marker::Send,
|
||||
C: FromIterator<R> + FromParallelIterator<R>,
|
||||
>(
|
||||
t: T,
|
||||
map: impl Fn(I) -> R + DynSync + DynSend,
|
||||
) -> C {
|
||||
parallel_guard(|guard| {
|
||||
if mode::is_dyn_thread_safe() {
|
||||
let map = FromDyn::from(map);
|
||||
t.into_par_iter().filter_map(|i| guard.run(|| map(i))).collect()
|
||||
} else {
|
||||
t.into_iter().filter_map(|i| guard.run(|| map(i))).collect()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user