Remove the parallel version.

This commit is contained in:
Camille GILLOT 2021-03-16 19:32:00 +01:00
parent 8208872fa2
commit e1c99e5fcc

View File

@ -4,18 +4,13 @@ use super::query::DepGraphQuery;
use super::{DepKind, DepNode, DepNodeIndex};
use rustc_data_structures::fingerprint::Fingerprint;
use rustc_data_structures::fx::FxHashMap;
use rustc_data_structures::sync::{AtomicU32, Lock, Lrc, Ordering};
use rustc_index::vec::IndexVec;
use rustc_data_structures::sync::{Lock, Lrc};
use rustc_index::vec::{Idx, IndexVec};
use rustc_serialize::opaque::{self, FileEncodeResult, FileEncoder, IntEncodedWithFixedSize};
use rustc_serialize::{Decodable, Decoder, Encodable};
use smallvec::SmallVec;
use std::convert::TryInto;
#[cfg(parallel_compiler)]
use {
rustc_data_structures::sync::WorkerLocal, rustc_index::vec::Idx, std::sync::mpsc, std::thread,
};
// The maximum value of `SerializedDepNodeIndex` leaves the upper two bits
// unused so that we can store multiple index types in `CompressedHybridIndex`,
// and use those bits to encode which index type it contains.
@ -146,12 +141,8 @@ fn encode_node<K: DepKind>(
) -> FileEncodeResult {
#[cfg(debug_assertions)]
if let Some(record_graph) = &_record_graph {
if let Some(record_graph) = &mut if cfg!(parallel_compiler) {
Some(record_graph.lock())
} else {
// Do not ICE when a query is called from within `with_query`.
record_graph.try_lock()
} {
// Do not ICE when a query is called from within `with_query`.
if let Some(record_graph) = &mut record_graph.try_lock() {
record_graph.push(_index, node.node, &node.edges);
}
}
@ -190,19 +181,8 @@ fn encode_counts(
encoder.flush()
}
#[cfg(not(parallel_compiler))]
pub struct GraphEncoder<K: DepKind> {
status: Lock<(FileEncoder, usize, FileEncodeResult)>,
counter: AtomicU32,
record_graph: Option<Lrc<Lock<DepGraphQuery<K>>>>,
record_stats: Option<Lrc<Lock<Stats<K>>>>,
}
#[cfg(parallel_compiler)]
pub struct GraphEncoder<K: DepKind> {
send: WorkerLocal<mpsc::Sender<(DepNodeIndex, NodeInfo<K>)>>,
thread: thread::JoinHandle<FileEncodeResult>,
counter: AtomicU32,
status: Lock<(FileEncoder, DepNodeIndex, usize, FileEncodeResult)>,
record_graph: Option<Lrc<Lock<DepGraphQuery<K>>>>,
record_stats: Option<Lrc<Lock<Stats<K>>>>,
}
@ -228,29 +208,8 @@ impl<K: DepKind + Encodable<FileEncoder>> GraphEncoder<K> {
} else {
None
};
let counter = AtomicU32::new(0);
#[cfg(not(parallel_compiler))]
{
let status = Lock::new((encoder, 0, Ok(())));
GraphEncoder { status, counter, record_graph, record_stats }
}
#[cfg(parallel_compiler)]
{
let (send, recv) = mpsc::channel();
let thread = {
let record_graph = record_graph.clone();
let record_stats = record_stats.clone();
thread::spawn(move || {
encode_graph(encoder, recv, |encoder, index, node| {
encode_node(encoder, index, node, &record_graph, &record_stats)
})
})
};
let send = WorkerLocal::new(move |_| send.clone());
GraphEncoder { send, thread, counter, record_graph, record_stats }
}
let status = Lock::new((encoder, DepNodeIndex::new(0), 0, Ok(())));
GraphEncoder { status, record_graph, record_stats }
}
pub(crate) fn with_query(&self, f: impl Fn(&DepGraphQuery<K>)) {
@ -314,19 +273,17 @@ impl<K: DepKind + Encodable<FileEncoder>> GraphEncoder<K> {
eprintln!("[incremental]");
}
}
}
#[cfg(not(parallel_compiler))]
impl<K: DepKind + Encodable<FileEncoder>> GraphEncoder<K> {
pub(crate) fn send(
&self,
node: DepNode<K>,
fingerprint: Fingerprint,
edges: SmallVec<[DepNodeIndex; 8]>,
) -> DepNodeIndex {
let index = self.counter.fetch_add(1, Ordering::SeqCst);
let index = DepNodeIndex::from_u32(index);
let &mut (ref mut encoder, ref mut edge_count, ref mut result) = &mut *self.status.lock();
let &mut (ref mut encoder, ref mut next_index, ref mut edge_count, ref mut result) =
&mut *self.status.lock();
let index = next_index.clone();
next_index.increment_by(1);
*edge_count += edges.len();
*result = std::mem::replace(result, Ok(())).and_then(|()| {
let node = NodeInfo { node, fingerprint, edges };
@ -336,89 +293,10 @@ impl<K: DepKind + Encodable<FileEncoder>> GraphEncoder<K> {
}
pub fn finish(self) -> FileEncodeResult {
let (encoder, edge_count, result) = self.status.into_inner();
let (encoder, node_count, edge_count, result) = self.status.into_inner();
let () = result?;
let node_count = self.counter.into_inner() as usize;
let node_count = node_count.index();
encode_counts(encoder, node_count, edge_count)
}
}
#[cfg(parallel_compiler)]
impl<K: DepKind + Encodable<FileEncoder>> GraphEncoder<K> {
pub(crate) fn send(
&self,
node: DepNode<K>,
fingerprint: Fingerprint,
edges: SmallVec<[DepNodeIndex; 8]>,
) -> DepNodeIndex {
let node = NodeInfo { node, fingerprint, edges };
let index = self.counter.fetch_add(1, Ordering::SeqCst);
let index = DepNodeIndex::from_u32(index);
self.send.send((index, node)).unwrap();
index
}
pub fn finish(self) -> FileEncodeResult {
std::mem::drop(self.send);
self.thread.join().unwrap()
}
}
#[cfg(parallel_compiler)]
#[instrument(skip(encoder, recv, process))]
fn encode_graph<K: DepKind + Encodable<FileEncoder>>(
mut encoder: FileEncoder,
recv: mpsc::Receiver<(DepNodeIndex, NodeInfo<K>)>,
process: impl Fn(&mut FileEncoder, DepNodeIndex, &NodeInfo<K>) -> FileEncodeResult,
) -> FileEncodeResult {
let mut edge_count: usize = 0;
let node_count: usize = ordered_recv(recv, |index, node| {
edge_count += node.edges.len();
process(&mut encoder, index, node)
})?;
encode_counts(encoder, node_count, edge_count)
}
/// Since there are multiple producers assigning the DepNodeIndex using an atomic,
/// the messages may not arrive in order. This function sorts them as they come.
#[cfg(parallel_compiler)]
fn ordered_recv<K: DepKind + Encodable<opaque::FileEncoder>>(
recv: mpsc::Receiver<(DepNodeIndex, NodeInfo<K>)>,
mut f: impl FnMut(DepNodeIndex, &NodeInfo<K>) -> FileEncodeResult,
) -> Result<usize, std::io::Error> {
let mut pending = Vec::<(DepNodeIndex, _)>::new();
let mut expected = DepNodeIndex::new(0);
// INVARIANT: No message can arrive with an index less than `expected`.
'outer: loop {
pending.sort_by_key(|n| n.0);
for (index, node) in pending.drain_filter(|(index, _)| {
if *index == expected {
expected.increment_by(1);
true
} else {
false
}
}) {
f(index, &node)?;
}
while let Ok((index, node)) = recv.recv() {
if index > expected {
pending.push((index, node));
} else if index == expected {
f(index, &node)?;
expected.increment_by(1);
continue 'outer;
} else {
panic!("Unexpected index {:?} while waiting for {:?}", index, expected);
}
}
break;
}
Ok(expected.as_u32() as usize)
}