Manage check state updates in main_loop to reduce lock contention

This commit is contained in:
Emil Lauridsen 2020-01-15 15:50:49 +01:00
parent fdb13dade9
commit 478ba65f8d
3 changed files with 64 additions and 44 deletions

View File

@ -38,7 +38,7 @@ pub struct CheckOptions {
#[derive(Debug)]
pub struct CheckWatcher {
pub task_recv: Receiver<CheckTask>,
pub shared: Arc<RwLock<CheckWatcherSharedState>>,
pub state: Arc<RwLock<CheckState>>,
cmd_send: Option<Sender<CheckCommand>>,
handle: Option<JoinHandle<()>>,
}
@ -46,22 +46,21 @@ pub struct CheckWatcher {
impl CheckWatcher {
pub fn new(options: &CheckOptions, workspace_root: PathBuf) -> CheckWatcher {
let options = options.clone();
let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new()));
let state = Arc::new(RwLock::new(CheckState::new()));
let (task_send, task_recv) = unbounded::<CheckTask>();
let (cmd_send, cmd_recv) = unbounded::<CheckCommand>();
let shared_ = shared.clone();
let handle = std::thread::spawn(move || {
let mut check = CheckWatcherState::new(options, workspace_root, shared_);
let mut check = CheckWatcherState::new(options, workspace_root);
check.run(&task_send, &cmd_recv);
});
CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), shared }
CheckWatcher { task_recv, cmd_send: Some(cmd_send), handle: Some(handle), state }
}
/// Returns a CheckWatcher that doesn't actually do anything
pub fn dummy() -> CheckWatcher {
let shared = Arc::new(RwLock::new(CheckWatcherSharedState::new()));
CheckWatcher { task_recv: never(), cmd_send: None, handle: None, shared }
let state = Arc::new(RwLock::new(CheckState::new()));
CheckWatcher { task_recv: never(), cmd_send: None, handle: None, state }
}
/// Schedule a re-start of the cargo check worker.
@ -89,14 +88,14 @@ impl std::ops::Drop for CheckWatcher {
}
#[derive(Debug)]
pub struct CheckWatcherSharedState {
pub struct CheckState {
diagnostic_collection: HashMap<Url, Vec<Diagnostic>>,
suggested_fix_collection: HashMap<Url, Vec<SuggestedFix>>,
}
impl CheckWatcherSharedState {
fn new() -> CheckWatcherSharedState {
CheckWatcherSharedState {
impl CheckState {
fn new() -> CheckState {
CheckState {
diagnostic_collection: HashMap::new(),
suggested_fix_collection: HashMap::new(),
}
@ -104,15 +103,11 @@ impl CheckWatcherSharedState {
/// Clear the cached diagnostics, and schedule updating diagnostics by the
/// server, to clear stale results.
pub fn clear(&mut self, task_send: &Sender<CheckTask>) {
pub fn clear(&mut self) -> Vec<Url> {
let cleared_files: Vec<Url> = self.diagnostic_collection.keys().cloned().collect();
self.diagnostic_collection.clear();
self.suggested_fix_collection.clear();
for uri in cleared_files {
task_send.send(CheckTask::Update(uri.clone())).unwrap();
}
cleared_files
}
pub fn diagnostics_for(&self, uri: &Url) -> Option<&[Diagnostic]> {
@ -123,6 +118,13 @@ impl CheckWatcherSharedState {
self.suggested_fix_collection.get(uri).map(|d| d.as_slice())
}
pub fn add_diagnostic_with_fixes(&mut self, file_uri: Url, diagnostic: DiagnosticWithFixes) {
for fix in diagnostic.suggested_fixes {
self.add_suggested_fix_for_diagnostic(fix, &diagnostic.diagnostic);
}
self.add_diagnostic(file_uri, diagnostic.diagnostic);
}
fn add_diagnostic(&mut self, file_uri: Url, diagnostic: Diagnostic) {
let diagnostics = self.diagnostic_collection.entry(file_uri).or_default();
@ -158,8 +160,11 @@ impl CheckWatcherSharedState {
#[derive(Debug)]
pub enum CheckTask {
/// Request a update of the given files diagnostics
Update(Url),
/// Request a clearing of all cached diagnostics from the check watcher
ClearDiagnostics,
/// Request adding a diagnostic with fixes included to a file
AddDiagnostic(Url, DiagnosticWithFixes),
/// Request check progress notification to client
Status(WorkDoneProgress),
@ -175,21 +180,15 @@ struct CheckWatcherState {
workspace_root: PathBuf,
watcher: WatchThread,
last_update_req: Option<Instant>,
shared: Arc<RwLock<CheckWatcherSharedState>>,
}
impl CheckWatcherState {
fn new(
options: CheckOptions,
workspace_root: PathBuf,
shared: Arc<RwLock<CheckWatcherSharedState>>,
) -> CheckWatcherState {
fn new(options: CheckOptions, workspace_root: PathBuf) -> CheckWatcherState {
CheckWatcherState {
options,
workspace_root,
watcher: WatchThread::dummy(),
last_update_req: None,
shared,
}
}
@ -215,7 +214,7 @@ impl CheckWatcherState {
if self.should_recheck() {
self.last_update_req.take();
self.shared.write().clear(task_send);
task_send.send(CheckTask::ClearDiagnostics).unwrap();
// By replacing the watcher, we drop the previous one which
// causes it to shut down automatically.
@ -240,7 +239,7 @@ impl CheckWatcherState {
}
}
fn handle_message(&mut self, msg: CheckEvent, task_send: &Sender<CheckTask>) {
fn handle_message(&self, msg: CheckEvent, task_send: &Sender<CheckTask>) {
match msg {
CheckEvent::Begin => {
task_send
@ -279,18 +278,9 @@ impl CheckWatcherState {
};
let MappedRustDiagnostic { location, diagnostic, suggested_fixes } = map_result;
let file_uri = location.uri.clone();
if !suggested_fixes.is_empty() {
for suggested_fix in suggested_fixes {
self.shared
.write()
.add_suggested_fix_for_diagnostic(suggested_fix, &diagnostic);
}
}
self.shared.write().add_diagnostic(file_uri, diagnostic);
task_send.send(CheckTask::Update(location.uri)).unwrap();
let diagnostic = DiagnosticWithFixes { diagnostic, suggested_fixes };
task_send.send(CheckTask::AddDiagnostic(location.uri, diagnostic)).unwrap();
}
CheckEvent::Msg(Message::BuildScriptExecuted(_msg)) => {}
@ -299,6 +289,12 @@ impl CheckWatcherState {
}
}
#[derive(Debug)]
pub struct DiagnosticWithFixes {
diagnostic: Diagnostic,
suggested_fixes: Vec<SuggestedFix>,
}
/// WatchThread exists to wrap around the communication needed to be able to
/// run `cargo check` without blocking. Currently the Rust standard library
/// doesn't provide a way to read sub-process output without blocking, so we

View File

@ -337,11 +337,34 @@ fn loop_turn(
loop_state.in_flight_libraries -= 1;
}
Event::CheckWatcher(task) => match task {
CheckTask::Update(uri) => {
CheckTask::ClearDiagnostics => {
let cleared_files = world_state.check_watcher.state.write().clear();
// Send updated diagnostics for each cleared file
for url in cleared_files {
let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?;
if let Some(file_id) = world_state.vfs.read().path2file(&path) {
let params = handlers::publish_diagnostics(
&world_state.snapshot(),
FileId(file_id.0),
)?;
let not = notification_new::<req::PublishDiagnostics>(params);
task_sender.send(Task::Notify(not)).unwrap();
}
}
}
CheckTask::AddDiagnostic(url, diagnostic) => {
world_state
.check_watcher
.state
.write()
.add_diagnostic_with_fixes(url.clone(), diagnostic);
// We manually send a diagnostic update when the watcher asks
// us to, to avoid the issue of having to change the file to
// receive updated diagnostics.
let path = uri.to_file_path().map_err(|()| format!("invalid uri: {}", uri))?;
let path = url.to_file_path().map_err(|()| format!("invalid uri: {}", url))?;
if let Some(file_id) = world_state.vfs.read().path2file(&path) {
let params =
handlers::publish_diagnostics(&world_state.snapshot(), FileId(file_id.0))?;
@ -349,6 +372,7 @@ fn loop_turn(
task_sender.send(Task::Notify(not)).unwrap();
}
}
CheckTask::Status(progress) => {
let params = req::ProgressParams {
token: req::ProgressToken::String("rustAnalyzer/cargoWatcher".to_string()),

View File

@ -13,7 +13,7 @@ use lsp_server::ErrorCode;
use lsp_types::Url;
use parking_lot::RwLock;
use ra_cargo_watch::{
url_from_path_with_drive_lowercasing, CheckOptions, CheckWatcher, CheckWatcherSharedState,
url_from_path_with_drive_lowercasing, CheckOptions, CheckState, CheckWatcher,
};
use ra_ide::{
Analysis, AnalysisChange, AnalysisHost, CrateGraph, FeatureFlags, FileId, LibraryData,
@ -64,7 +64,7 @@ pub struct WorldSnapshot {
pub analysis: Analysis,
pub vfs: Arc<RwLock<Vfs>>,
pub latest_requests: Arc<RwLock<LatestRequests>>,
pub check_watcher: Arc<RwLock<CheckWatcherSharedState>>,
pub check_watcher: Arc<RwLock<CheckState>>,
}
impl WorldState {
@ -220,7 +220,7 @@ impl WorldState {
analysis: self.analysis_host.analysis(),
vfs: Arc::clone(&self.vfs),
latest_requests: Arc::clone(&self.latest_requests),
check_watcher: self.check_watcher.shared.clone(),
check_watcher: self.check_watcher.state.clone(),
}
}