diff --git a/naga/xtask/Cargo.lock b/naga/xtask/Cargo.lock index 6d2d13459..a1727a897 100644 --- a/naga/xtask/Cargo.lock +++ b/naga/xtask/Cargo.lock @@ -54,6 +54,12 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "hermit-abi" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d77f7ec81a6d05a3abb01ab6eb7590f6083d08449fe5a1c8b1e620283546ccb7" + [[package]] name = "hlsl-snapshots" version = "0.1.0" @@ -84,6 +90,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "jobserver" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c37f63953c4c63420ed5fd3d6d398c719489b9f872b9fa683262f8edd363c7d" +dependencies = [ + "libc", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -120,6 +135,16 @@ version = "0.1.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed7a94da6c6181c35d043fc61c43ac96d3a5d739e7b8027f77650ba41504d6ab" +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -242,7 +267,9 @@ dependencies = [ "glob", "hlsl-snapshots", "indicatif", + "jobserver", "log", + "num_cpus", "pico-args", "shell-words", "which", diff --git a/naga/xtask/Cargo.toml b/naga/xtask/Cargo.toml index f1b5ff8a7..159e54a58 100644 --- a/naga/xtask/Cargo.toml +++ b/naga/xtask/Cargo.toml @@ -10,7 +10,9 @@ env_logger = { version = "0.10.0", default-features = false } glob = "0.3.1" hlsl-snapshots = { path = "../hlsl-snapshots"} indicatif = "0.17" +jobserver = "0.1" log = "0.4.17" +num_cpus = "1.16" pico-args = "0.5.0" shell-words = "1.1.0" which = "4.4.0" diff --git a/naga/xtask/src/jobserver.rs b/naga/xtask/src/jobserver.rs new file mode 100644 index 000000000..f035f81b7 --- /dev/null +++ b/naga/xtask/src/jobserver.rs @@ -0,0 +1,34 @@ +//! Running jobs in parallel, with a controlled degree of concurrency. + +use std::sync::OnceLock; + +use jobserver::Client; + +static JOB_SERVER: OnceLock = OnceLock::new(); + +pub fn init() { + JOB_SERVER.get_or_init(|| { + // Try to connect to a jobserver inherited from our parent. + if let Some(client) = unsafe { Client::from_env() } { + log::debug!("connected to inherited jobserver client"); + client + } else { + // Otherwise, start our own jobserver. + log::debug!("no inherited jobserver client; creating a new jobserver"); + Client::new(num_cpus::get()).expect("failed to create jobserver") + } + }); +} + +/// Wait until it is okay to start a new job, and then spawn a thread running `body`. +pub fn start_job_thread(body: F) -> anyhow::Result<()> +where + F: FnOnce() + Send + 'static, +{ + let acquired = JOB_SERVER.get().unwrap().acquire()?; + std::thread::spawn(move || { + body(); + drop(acquired); + }); + Ok(()) +} diff --git a/naga/xtask/src/main.rs b/naga/xtask/src/main.rs index 1ab648912..aed7f48c7 100644 --- a/naga/xtask/src/main.rs +++ b/naga/xtask/src/main.rs @@ -13,6 +13,7 @@ use crate::{ mod cli; mod fs; mod glob; +mod jobserver; mod path; mod process; mod result; @@ -25,6 +26,8 @@ fn main() -> ExitCode { .format_indent(Some(0)) .init(); + jobserver::init(); + let args = Args::parse(); match run(args) { diff --git a/naga/xtask/src/validate.rs b/naga/xtask/src/validate.rs index 1d907525f..cb27786c5 100644 --- a/naga/xtask/src/validate.rs +++ b/naga/xtask/src/validate.rs @@ -13,15 +13,37 @@ use crate::{ process::{which, EasyCommand}, }; +type Job = Box anyhow::Result<()> + Send + std::panic::UnwindSafe + 'static>; + pub(crate) fn validate(cmd: ValidateSubcommand) -> anyhow::Result<()> { let mut jobs = vec![]; collect_validation_jobs(&mut jobs, cmd)?; let progress_bar = indicatif::ProgressBar::new(jobs.len() as u64); + let (tx_results, rx_results) = std::sync::mpsc::channel(); + let enqueuing_thread = std::thread::spawn(move || -> anyhow::Result<()> { + for job in jobs { + let tx_results = tx_results.clone(); + crate::jobserver::start_job_thread(move || { + let result = match std::panic::catch_unwind(|| job()) { + Ok(result) => result, + Err(payload) => Err(match payload.downcast_ref::<&str>() { + Some(message) => { + anyhow::anyhow!("Validation job thread panicked: {}", message) + } + None => anyhow::anyhow!("Validation job thread panicked"), + }), + }; + tx_results.send(result).unwrap(); + })?; + } + Ok(()) + }); + let mut all_good = true; - for job in jobs { - if let Err(error) = job() { + for result in rx_results { + if let Err(error) = result { all_good = false; progress_bar.suspend(|| { eprintln!("{:#}", error); @@ -35,29 +57,12 @@ pub(crate) fn validate(cmd: ValidateSubcommand) -> anyhow::Result<()> { if !all_good { bail!("failed to validate one or more files, see above output for more details") } - Ok(()) -} -type Job = Box anyhow::Result<()>>; - -fn push_job_for_each_file( - top_dir: impl AsRef, - pattern: impl AsRef, - jobs: &mut Vec, - f: impl FnOnce(std::path::PathBuf) -> anyhow::Result<()> + Clone + 'static, -) { - crate::glob::for_each_file(top_dir, pattern, move |path_result| { - // Let each job closure stand on its own. - let f = f.clone(); - jobs.push(Box::new(|| f(path_result?))); - }); -} - -/// Call `f` to extend `jobs`, but if `f` itself fails, push a job that reports that. -fn try_push_job(jobs: &mut Vec, f: impl FnOnce(&mut Vec) -> anyhow::Result<()>) { - if let Err(error) = f(jobs) { - jobs.push(Box::new(|| Err(error))); + if let Err(error) = enqueuing_thread.join().unwrap() { + bail!("Error enqueuing jobs:\n{:#}", error); } + + Ok(()) } fn collect_validation_jobs(jobs: &mut Vec, cmd: ValidateSubcommand) -> anyhow::Result<()> { @@ -142,6 +147,30 @@ fn collect_validation_jobs(jobs: &mut Vec, cmd: ValidateSubcommand) -> anyh Ok(()) } +fn push_job_for_each_file( + top_dir: impl AsRef, + pattern: impl AsRef, + jobs: &mut Vec, + f: impl FnOnce(std::path::PathBuf) -> anyhow::Result<()> + + Clone + + Send + + std::panic::UnwindSafe + + 'static, +) { + crate::glob::for_each_file(top_dir, pattern, move |path_result| { + // Let each job closure stand on its own. + let f = f.clone(); + jobs.push(Box::new(|| f(path_result?))); + }); +} + +/// Call `f` to extend `jobs`, but if `f` itself fails, push a job that reports that. +fn try_push_job(jobs: &mut Vec, f: impl FnOnce(&mut Vec) -> anyhow::Result<()>) { + if let Err(error) = f(jobs) { + jobs.push(Box::new(|| Err(error))); + } +} + fn validate_spirv(path: &Path, spirv_as: &str, spirv_val: &str) -> anyhow::Result<()> { let second_line = { let mut file = BufReader::new(open_file(path)?); @@ -233,6 +262,8 @@ fn push_job_for_each_hlsl_config_item( jobs: &mut Vec, validator: impl FnMut(&Path, hlsl_snapshots::ConfigItem, &str) -> anyhow::Result<()> + Clone + + Send + + std::panic::UnwindSafe + 'static, ) -> anyhow::Result<()> { let hlsl_snapshots::Config {