From 797253e9ebe036dfef77befc82194e95649f576f Mon Sep 17 00:00:00 2001 From: Karthik Nadig Date: Mon, 6 Apr 2026 11:51:10 -0700 Subject: [PATCH 1/2] fix: prevent refresh coordinator deadlock and reset missing-env reporting on reconfigure (Fixes #396, Fixes #395) --- crates/pet/src/jsonrpc.rs | 200 +++++++++++++++++++++++++++++++++++++- 1 file changed, 199 insertions(+), 1 deletion(-) diff --git a/crates/pet/src/jsonrpc.rs b/crates/pet/src/jsonrpc.rs index 9ba97787..c5bc16b6 100644 --- a/crates/pet/src/jsonrpc.rs +++ b/crates/pet/src/jsonrpc.rs @@ -217,8 +217,56 @@ impl RefreshCoordinator { *state = RefreshCoordinatorState::Idle; self.changed.notify_all(); } + RefreshCoordinatorState::Running(active) if active.key == *key => { + // Recovery path: if begin_completion() panicked, the state was + // restored to Running before the unwind. Transition to Idle so + // waiters are not stuck forever. + *state = RefreshCoordinatorState::Idle; + self.changed.notify_all(); + } RefreshCoordinatorState::Idle => {} - _ => {} + _ => { + // Mismatched key — another refresh owns this state. Log and + // leave it alone; the owning refresh will clean up. + error!( + "force_complete_request called with mismatched key; current state not owned by caller" + ); + } + } + } +} + +/// Safety guard created when a refresh thread takes ownership of the `Running` +/// state. If the thread exits the `Start` arm without ever constructing a +/// `RefreshCompletionGuard` (e.g., because `begin_completion` panics), this +/// guard calls `force_complete_request` to transition the coordinator back to +/// `Idle`, preventing a permanent deadlock. +struct RefreshSafetyGuard<'a> { + coordinator: &'a RefreshCoordinator, + key: RefreshKey, + disarmed: bool, +} + +impl<'a> RefreshSafetyGuard<'a> { + fn new(coordinator: &'a RefreshCoordinator, key: RefreshKey) -> Self { + Self { + coordinator, + key, + disarmed: false, + } + } + + /// Disarm the safety guard once a `RefreshCompletionGuard` takes over + /// responsibility for the state transition. + fn disarm(&mut self) { + self.disarmed = true; + } +} + +impl Drop for RefreshSafetyGuard<'_> { + fn drop(&mut self) { + if !self.disarmed { + self.coordinator.force_complete_request(&self.key); } } } @@ -530,6 +578,11 @@ pub fn handle_configure(context: Arc, id: u32, params: Value) { state.config.cache_directory = Some(cache_directory); } state.generation += 1; + // Reset missing-env reporting so that the next refresh + // after reconfiguration can trigger it again (Fixes #395). + // Done inside the write lock to avoid a TOCTOU window with + // concurrent refresh threads reading the generation. + MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release); trace!( "Configuring locators with generation {}: {:?}", state.generation, @@ -886,6 +939,15 @@ pub fn handle_refresh(context: Arc, id: u32, params: Value) { context.refresh_coordinator.wait_until_idle(); } RefreshRegistration::Start => { + // Safety guard: if anything in this arm panics + // (including begin_completion), force the + // coordinator back to Idle so waiters are not + // stuck forever. + let mut safety_guard = RefreshSafetyGuard::new( + &context.refresh_coordinator, + refresh_key.clone(), + ); + let refresh_result = panic::catch_unwind(AssertUnwindSafe(|| { execute_refresh( context.as_ref(), @@ -901,6 +963,7 @@ pub fn handle_refresh(context: Arc, id: u32, params: Value) { &context.refresh_coordinator, &refresh_key, ); + safety_guard.disarm(); finish_refresh_replies(&mut completion_guard, &refresh_result); report_refresh_follow_up(execution); } @@ -913,6 +976,7 @@ pub fn handle_refresh(context: Arc, id: u32, params: Value) { &context.refresh_coordinator, &refresh_key, ); + safety_guard.disarm(); finish_refresh_errors( &mut completion_guard, "Refresh failed unexpectedly", @@ -1898,4 +1962,138 @@ mod tests { assert_eq!(result_config.executables, Some(vec![executable])); assert!(matches!(search_scope, Some(SearchScope::Workspace))); } + + /// Test for #396: force_complete_request recovers from Running state. + /// When begin_completion() cannot be reached (e.g., the thread panics before + /// constructing a RefreshCompletionGuard), force_complete_request must still + /// transition Running → Idle to unblock waiters. + #[test] + fn test_force_complete_request_recovers_from_running_state() { + let coordinator = RefreshCoordinator::default(); + let key = make_refresh_key(1, RefreshOptions::default()); + + // State → Running(key) + assert!(matches!( + coordinator.register_request(1, key.clone()), + RefreshRegistration::Start + )); + + // Simulate recovery: force_complete_request from Running state. + coordinator.force_complete_request(&key); + + // Verify we're back to Idle and can start a new refresh. + assert!(matches!( + coordinator.register_request(2, key.clone()), + RefreshRegistration::Start + )); + } + + /// Test for #396: RefreshSafetyGuard transitions Running → Idle on drop + /// when begin_completion is never reached. + #[test] + fn test_safety_guard_recovers_running_state_on_drop() { + let coordinator = Arc::new(RefreshCoordinator::default()); + let key = make_refresh_key(1, RefreshOptions::default()); + let other_key = make_refresh_key( + 1, + RefreshOptions { + search_kind: Some(PythonEnvironmentKind::Venv), + search_paths: None, + }, + ); + + assert!(matches!( + coordinator.register_request(1, key.clone()), + RefreshRegistration::Start + )); + + let (state_tx, state_rx) = mpsc::channel(); + let waiter = { + let coordinator = coordinator.clone(); + let other_key = other_key.clone(); + thread::spawn(move || { + // Different key → returns Wait (not Joined). + assert!(matches!( + coordinator.register_request(2, other_key.clone()), + RefreshRegistration::Wait + )); + state_tx.send("waiting").unwrap(); + coordinator.wait_until_idle(); + state_tx.send("idle").unwrap(); + }) + }; + + assert_eq!(state_rx.recv().unwrap(), "waiting"); + + // Create and immediately drop the safety guard without disarming it. + // This simulates the thread dying before begin_completion. + { + let _guard = RefreshSafetyGuard::new(&coordinator, key.clone()); + } + + // Waiter should be unblocked. + assert_eq!(state_rx.recv().unwrap(), "idle"); + waiter.join().unwrap(); + } + + /// Test for #396: RefreshSafetyGuard does NOT interfere when disarmed + /// (normal path where RefreshCompletionGuard takes over). + #[test] + fn test_safety_guard_disarmed_does_not_interfere() { + let coordinator = RefreshCoordinator::default(); + let key = make_refresh_key(1, RefreshOptions::default()); + + assert!(matches!( + coordinator.register_request(1, key.clone()), + RefreshRegistration::Start + )); + + { + let mut safety_guard = RefreshSafetyGuard::new(&coordinator, key.clone()); + let mut completion_guard = RefreshCompletionGuard::begin(&coordinator, &key); + safety_guard.disarm(); + let ids = completion_guard.drain_request_ids(); + assert_eq!(ids, vec![1]); + assert!(completion_guard.finish_if_no_pending()); + } + + // Should be Idle — can start a new refresh. + assert!(matches!( + coordinator.register_request(2, key.clone()), + RefreshRegistration::Start + )); + } + + /// Test for #395: configure resets MISSING_ENVS_REPORTING_STATE so that + /// subsequent refreshes can trigger missing-env reporting again. + #[test] + fn test_configure_resets_completed_missing_env_reporting() { + let _guard = MISSING_ENVS_TEST_LOCK.lock().unwrap(); + + let configuration = Arc::new(RwLock::new(ConfigurationState { + generation: 1, + config: Configuration::default(), + })); + + // Simulate a completed first refresh. + MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release); + assert!(try_begin_missing_env_reporting(configuration.as_ref(), 1)); + complete_missing_env_reporting(1); + + // Missing-env reporting is now exhausted. + assert!(!try_begin_missing_env_reporting(configuration.as_ref(), 1)); + + // Simulate what handle_configure does: bump generation and reset. + { + let mut state = configuration.write().unwrap(); + state.generation = 2; + MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release); + } + + // Missing-env reporting should work again for the new generation. + assert!(try_begin_missing_env_reporting(configuration.as_ref(), 2)); + + // Cleanup. + MISSING_ENVS_REPORTING_STATE.store(MISSING_ENVS_AVAILABLE, Ordering::Release); + } } From 9c60b2e30fbbc4a7b8b8410d4c0de5946ea73a2e Mon Sep 17 00:00:00 2001 From: Karthik Nadig Date: Mon, 6 Apr 2026 12:57:52 -0700 Subject: [PATCH 2/2] fix: address review feedback - detailed mismatch logging and avoid extra key clone (PR #409) --- crates/pet/src/jsonrpc.rs | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/crates/pet/src/jsonrpc.rs b/crates/pet/src/jsonrpc.rs index c5bc16b6..4efe3a00 100644 --- a/crates/pet/src/jsonrpc.rs +++ b/crates/pet/src/jsonrpc.rs @@ -225,11 +225,22 @@ impl RefreshCoordinator { self.changed.notify_all(); } RefreshCoordinatorState::Idle => {} - _ => { + RefreshCoordinatorState::Completing(active) => { // Mismatched key — another refresh owns this state. Log and // leave it alone; the owning refresh will clean up. error!( - "force_complete_request called with mismatched key; current state not owned by caller" + "force_complete_request called with mismatched key while coordinator was Completing; caller key: {:?}, active key: {:?}", + key, + active.key + ); + } + RefreshCoordinatorState::Running(active) => { + // Mismatched key — another refresh owns this state. Log and + // leave it alone; the owning refresh will clean up. + error!( + "force_complete_request called with mismatched key while coordinator was Running; caller key: {:?}, active key: {:?}", + key, + active.key ); } } @@ -943,10 +954,10 @@ pub fn handle_refresh(context: Arc, id: u32, params: Value) { // (including begin_completion), force the // coordinator back to Idle so waiters are not // stuck forever. - let mut safety_guard = RefreshSafetyGuard::new( - &context.refresh_coordinator, - refresh_key.clone(), - ); + // Move refresh_key into the guard to avoid an + // extra clone of potentially large search_paths. + let mut safety_guard = + RefreshSafetyGuard::new(&context.refresh_coordinator, refresh_key); let refresh_result = panic::catch_unwind(AssertUnwindSafe(|| { execute_refresh( @@ -961,7 +972,7 @@ pub fn handle_refresh(context: Arc, id: u32, params: Value) { let refresh_result = execution.result.clone(); let mut completion_guard = RefreshCompletionGuard::begin( &context.refresh_coordinator, - &refresh_key, + &safety_guard.key, ); safety_guard.disarm(); finish_refresh_replies(&mut completion_guard, &refresh_result); @@ -974,7 +985,7 @@ pub fn handle_refresh(context: Arc, id: u32, params: Value) { ); let mut completion_guard = RefreshCompletionGuard::begin( &context.refresh_coordinator, - &refresh_key, + &safety_guard.key, ); safety_guard.disarm(); finish_refresh_errors(