[naga xtask] Run validation jobs in parallel, using jobserver.

This commit is contained in:
Jim Blandy 2023-12-19 13:51:08 -08:00
parent 85ab6327d3
commit 403db446e4
5 changed files with 120 additions and 23 deletions

27
naga/xtask/Cargo.lock generated
View File

@ -54,6 +54,12 @@ version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
[[package]]
name = "hermit-abi"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7"
[[package]] [[package]]
name = "hlsl-snapshots" name = "hlsl-snapshots"
version = "0.1.0" version = "0.1.0"
@ -84,6 +90,15 @@ dependencies = [
"cfg-if", "cfg-if",
] ]
[[package]]
name = "jobserver"
version = "0.1.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.4.0" version = "1.4.0"
@ -120,6 +135,16 @@ version = "0.1.19"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed7a94da6c6181c35d043fc61c43ac96d3a5d739e7b8027f77650ba41504d6ab" checksum = "ed7a94da6c6181c35d043fc61c43ac96d3a5d739e7b8027f77650ba41504d6ab"
[[package]]
name = "num_cpus"
version = "1.16.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "number_prefix" name = "number_prefix"
version = "0.4.0" version = "0.4.0"
@ -242,7 +267,9 @@ dependencies = [
"glob", "glob",
"hlsl-snapshots", "hlsl-snapshots",
"indicatif", "indicatif",
"jobserver",
"log", "log",
"num_cpus",
"pico-args", "pico-args",
"shell-words", "shell-words",
"which", "which",

View File

@ -10,7 +10,9 @@ env_logger = { version = "0.10.0", default-features = false }
glob = "0.3.1" glob = "0.3.1"
hlsl-snapshots = { path = "../hlsl-snapshots"} hlsl-snapshots = { path = "../hlsl-snapshots"}
indicatif = "0.17" indicatif = "0.17"
jobserver = "0.1"
log = "0.4.17" log = "0.4.17"
num_cpus = "1.16"
pico-args = "0.5.0" pico-args = "0.5.0"
shell-words = "1.1.0" shell-words = "1.1.0"
which = "4.4.0" which = "4.4.0"

View File

@ -0,0 +1,34 @@
//! Running jobs in parallel, with a controlled degree of concurrency.
use std::sync::OnceLock;
use jobserver::Client;
static JOB_SERVER: OnceLock<Client> = OnceLock::new();
pub fn init() {
JOB_SERVER.get_or_init(|| {
// Try to connect to a jobserver inherited from our parent.
if let Some(client) = unsafe { Client::from_env() } {
log::debug!("connected to inherited jobserver client");
client
} else {
// Otherwise, start our own jobserver.
log::debug!("no inherited jobserver client; creating a new jobserver");
Client::new(num_cpus::get()).expect("failed to create jobserver")
}
});
}
/// Wait until it is okay to start a new job, and then spawn a thread running `body`.
pub fn start_job_thread<F>(body: F) -> anyhow::Result<()>
where
F: FnOnce() + Send + 'static,
{
let acquired = JOB_SERVER.get().unwrap().acquire()?;
std::thread::spawn(move || {
body();
drop(acquired);
});
Ok(())
}

View File

@ -13,6 +13,7 @@ use crate::{
mod cli; mod cli;
mod fs; mod fs;
mod glob; mod glob;
mod jobserver;
mod path; mod path;
mod process; mod process;
mod result; mod result;
@ -25,6 +26,8 @@ fn main() -> ExitCode {
.format_indent(Some(0)) .format_indent(Some(0))
.init(); .init();
jobserver::init();
let args = Args::parse(); let args = Args::parse();
match run(args) { match run(args) {

View File

@ -13,15 +13,37 @@ use crate::{
process::{which, EasyCommand}, process::{which, EasyCommand},
}; };
type Job = Box<dyn FnOnce() -> anyhow::Result<()> + Send + std::panic::UnwindSafe + 'static>;
pub(crate) fn validate(cmd: ValidateSubcommand) -> anyhow::Result<()> { pub(crate) fn validate(cmd: ValidateSubcommand) -> anyhow::Result<()> {
let mut jobs = vec![]; let mut jobs = vec![];
collect_validation_jobs(&mut jobs, cmd)?; collect_validation_jobs(&mut jobs, cmd)?;
let progress_bar = indicatif::ProgressBar::new(jobs.len() as u64); let progress_bar = indicatif::ProgressBar::new(jobs.len() as u64);
let (tx_results, rx_results) = std::sync::mpsc::channel();
let enqueuing_thread = std::thread::spawn(move || -> anyhow::Result<()> {
for job in jobs {
let tx_results = tx_results.clone();
crate::jobserver::start_job_thread(move || {
let result = match std::panic::catch_unwind(|| job()) {
Ok(result) => result,
Err(payload) => Err(match payload.downcast_ref::<&str>() {
Some(message) => {
anyhow::anyhow!("Validation job thread panicked: {}", message)
}
None => anyhow::anyhow!("Validation job thread panicked"),
}),
};
tx_results.send(result).unwrap();
})?;
}
Ok(())
});
let mut all_good = true; let mut all_good = true;
for job in jobs { for result in rx_results {
if let Err(error) = job() { if let Err(error) = result {
all_good = false; all_good = false;
progress_bar.suspend(|| { progress_bar.suspend(|| {
eprintln!("{:#}", error); eprintln!("{:#}", error);
@ -35,29 +57,12 @@ pub(crate) fn validate(cmd: ValidateSubcommand) -> anyhow::Result<()> {
if !all_good { if !all_good {
bail!("failed to validate one or more files, see above output for more details") bail!("failed to validate one or more files, see above output for more details")
} }
Ok(())
}
type Job = Box<dyn FnOnce() -> anyhow::Result<()>>; if let Err(error) = enqueuing_thread.join().unwrap() {
bail!("Error enqueuing jobs:\n{:#}", error);
fn push_job_for_each_file(
top_dir: impl AsRef<Path>,
pattern: impl AsRef<Path>,
jobs: &mut Vec<Job>,
f: impl FnOnce(std::path::PathBuf) -> anyhow::Result<()> + Clone + 'static,
) {
crate::glob::for_each_file(top_dir, pattern, move |path_result| {
// Let each job closure stand on its own.
let f = f.clone();
jobs.push(Box::new(|| f(path_result?)));
});
}
/// Call `f` to extend `jobs`, but if `f` itself fails, push a job that reports that.
fn try_push_job(jobs: &mut Vec<Job>, f: impl FnOnce(&mut Vec<Job>) -> anyhow::Result<()>) {
if let Err(error) = f(jobs) {
jobs.push(Box::new(|| Err(error)));
} }
Ok(())
} }
fn collect_validation_jobs(jobs: &mut Vec<Job>, cmd: ValidateSubcommand) -> anyhow::Result<()> { fn collect_validation_jobs(jobs: &mut Vec<Job>, cmd: ValidateSubcommand) -> anyhow::Result<()> {
@ -142,6 +147,30 @@ fn collect_validation_jobs(jobs: &mut Vec<Job>, cmd: ValidateSubcommand) -> anyh
Ok(()) Ok(())
} }
fn push_job_for_each_file(
top_dir: impl AsRef<Path>,
pattern: impl AsRef<Path>,
jobs: &mut Vec<Job>,
f: impl FnOnce(std::path::PathBuf) -> anyhow::Result<()>
+ Clone
+ Send
+ std::panic::UnwindSafe
+ 'static,
) {
crate::glob::for_each_file(top_dir, pattern, move |path_result| {
// Let each job closure stand on its own.
let f = f.clone();
jobs.push(Box::new(|| f(path_result?)));
});
}
/// Call `f` to extend `jobs`, but if `f` itself fails, push a job that reports that.
fn try_push_job(jobs: &mut Vec<Job>, f: impl FnOnce(&mut Vec<Job>) -> anyhow::Result<()>) {
if let Err(error) = f(jobs) {
jobs.push(Box::new(|| Err(error)));
}
}
fn validate_spirv(path: &Path, spirv_as: &str, spirv_val: &str) -> anyhow::Result<()> { fn validate_spirv(path: &Path, spirv_as: &str, spirv_val: &str) -> anyhow::Result<()> {
let second_line = { let second_line = {
let mut file = BufReader::new(open_file(path)?); let mut file = BufReader::new(open_file(path)?);
@ -233,6 +262,8 @@ fn push_job_for_each_hlsl_config_item(
jobs: &mut Vec<Job>, jobs: &mut Vec<Job>,
validator: impl FnMut(&Path, hlsl_snapshots::ConfigItem, &str) -> anyhow::Result<()> validator: impl FnMut(&Path, hlsl_snapshots::ConfigItem, &str) -> anyhow::Result<()>
+ Clone + Clone
+ Send
+ std::panic::UnwindSafe
+ 'static, + 'static,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let hlsl_snapshots::Config { let hlsl_snapshots::Config {