session.rs (72964B)
1 use std::collections::{HashMap, HashSet}; 2 use std::path::PathBuf; 3 use std::sync::mpsc::Receiver; 4 use std::time::Instant; 5 6 use crate::agent_status::AgentStatus; 7 use crate::backend::BackendType; 8 use crate::config::AiMode; 9 use crate::focus_queue::FocusPriority; 10 use crate::git_status::GitStatusCache; 11 use crate::messages::{ 12 AnswerSummary, CompactionInfo, ExecutedTool, PermissionResponse, PermissionResponseType, 13 QuestionAnswer, SessionInfo, SubagentStatus, 14 }; 15 use crate::session_events::ThreadingState; 16 use crate::{DaveApiResponse, Message}; 17 use claude_agent_sdk_rs::PermissionMode; 18 use tokio::sync::oneshot; 19 use uuid::Uuid; 20 21 pub type SessionId = u32; 22 23 /// Convert PermissionMode to a stable string for nostr tags. 24 pub fn permission_mode_to_str(mode: PermissionMode) -> &'static str { 25 match mode { 26 PermissionMode::Default => "default", 27 PermissionMode::Plan => "plan", 28 PermissionMode::AcceptEdits => "accept_edits", 29 PermissionMode::BypassPermissions => "bypass", 30 } 31 } 32 33 /// Parse PermissionMode from a nostr tag string. 34 pub fn permission_mode_from_str(s: &str) -> PermissionMode { 35 match s { 36 "plan" => PermissionMode::Plan, 37 "accept_edits" => PermissionMode::AcceptEdits, 38 "bypass" => PermissionMode::BypassPermissions, 39 _ => PermissionMode::Default, 40 } 41 } 42 43 /// Whether this session runs locally or is observed remotely via relays. 44 #[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] 45 pub enum SessionSource { 46 /// Local Claude process running on this machine. 47 #[default] 48 Local, 49 /// Remote session observed via relay events (no local process). 50 Remote, 51 } 52 53 /// Session metadata for display in chat headers 54 pub struct SessionDetails { 55 pub title: String, 56 /// User-set title that takes precedence over the auto-generated one. 57 pub custom_title: Option<String>, 58 pub hostname: String, 59 pub cwd: Option<PathBuf>, 60 /// Home directory of the machine where this session originated. 61 /// Used to abbreviate cwd paths for remote sessions. 62 pub home_dir: String, 63 } 64 65 impl SessionDetails { 66 /// Returns custom_title if set, otherwise the auto-generated title. 67 pub fn display_title(&self) -> &str { 68 self.custom_title.as_deref().unwrap_or(&self.title) 69 } 70 } 71 72 /// Tracks the "Compact & Approve" lifecycle. 73 /// 74 /// Button click → `WaitingForCompaction` (intent recorded). 75 /// CompactionComplete → `ReadyToProceed` (compaction finished, safe to send). 76 /// Stream-end (local) or compaction_complete event (remote) → consume and fire. 77 #[derive(Default, Clone, Copy, PartialEq)] 78 pub enum CompactAndProceedState { 79 /// No compact-and-proceed in progress. 80 #[default] 81 Idle, 82 /// User clicked "Compact & Approve"; waiting for compaction to finish. 83 WaitingForCompaction, 84 /// Compaction finished; send "Proceed" on the next safe opportunity 85 /// (stream-end for local, immediately for remote). 86 ReadyToProceed, 87 } 88 89 /// State for permission response with message 90 #[derive(Default, Clone, Copy, PartialEq)] 91 pub enum PermissionMessageState { 92 #[default] 93 None, 94 /// User pressed Shift+1, waiting for message then will Allow 95 TentativeAccept, 96 /// User pressed Shift+2, waiting for message then will Deny 97 TentativeDeny, 98 } 99 100 /// Consolidated permission tracking for a session. 101 /// 102 /// Bundles the local oneshot channels (for local sessions), the note-ID 103 /// mapping (for linking relay responses), and the already-responded set 104 /// (for remote sessions) into a single struct. 105 pub struct PermissionTracker { 106 /// Local oneshot senders waiting for the user to allow/deny. 107 pub pending: HashMap<Uuid, oneshot::Sender<PermissionResponse>>, 108 /// Maps permission-request UUID → nostr note ID of the published request. 109 pub request_note_ids: HashMap<Uuid, [u8; 32]>, 110 /// Permission UUIDs that have already been responded to. 111 pub responded: HashSet<Uuid>, 112 } 113 114 impl PermissionTracker { 115 pub fn new() -> Self { 116 Self { 117 pending: HashMap::new(), 118 request_note_ids: HashMap::new(), 119 responded: HashSet::new(), 120 } 121 } 122 123 /// Whether there are unresolved local permission requests. 124 pub fn has_pending(&self) -> bool { 125 !self.pending.is_empty() 126 } 127 128 /// Resolve a permission request. This is the ONLY place resolution state 129 /// is updated — both `handle_permission_response` and 130 /// `handle_question_response` funnel through here. 131 pub fn resolve( 132 &mut self, 133 chat: &mut [Message], 134 request_id: Uuid, 135 response_type: PermissionResponseType, 136 answer_summary: Option<AnswerSummary>, 137 is_remote: bool, 138 oneshot_response: Option<PermissionResponse>, 139 ) { 140 // 1. Update the PermissionRequest message in chat 141 for msg in chat.iter_mut() { 142 if let Message::PermissionRequest(req) = msg { 143 if req.id == request_id { 144 req.response = Some(response_type); 145 if answer_summary.is_some() { 146 req.answer_summary = answer_summary; 147 } 148 break; 149 } 150 } 151 } 152 153 // 2. Update PermissionTracker state 154 if is_remote { 155 self.responded.insert(request_id); 156 } else if let Some(response) = oneshot_response { 157 if let Some(sender) = self.pending.remove(&request_id) { 158 if sender.send(response).is_err() { 159 tracing::error!( 160 "failed to send permission response for request {}", 161 request_id 162 ); 163 } 164 } else { 165 tracing::warn!("no pending permission found for request {}", request_id); 166 } 167 } 168 } 169 170 /// Merge loaded permission state from restored events. 171 pub fn merge_loaded( 172 &mut self, 173 responded: HashSet<Uuid>, 174 request_note_ids: HashMap<Uuid, [u8; 32]>, 175 ) { 176 self.responded = responded; 177 self.request_note_ids.extend(request_note_ids); 178 } 179 } 180 181 impl Default for PermissionTracker { 182 fn default() -> Self { 183 Self::new() 184 } 185 } 186 187 /// Agentic-mode specific session data (Claude backend only) 188 pub struct AgenticSessionData { 189 /// Permission state (pending channels, note IDs, responded set) 190 pub permissions: PermissionTracker, 191 /// Position in the RTS scene (in scene coordinates) 192 pub scene_position: egui::Vec2, 193 /// Permission mode for Claude (Default or Plan) 194 pub permission_mode: PermissionMode, 195 /// State for permission response message (tentative accept/deny) 196 pub permission_message_state: PermissionMessageState, 197 /// State for pending AskUserQuestion responses (keyed by request UUID) 198 pub question_answers: HashMap<Uuid, Vec<QuestionAnswer>>, 199 /// Current question index for multi-question AskUserQuestion (keyed by request UUID) 200 pub question_index: HashMap<Uuid, usize>, 201 /// Working directory for claude-code subprocess 202 pub cwd: PathBuf, 203 /// Session info from Claude Code CLI (tools, model, agents, etc.) 204 pub session_info: Option<SessionInfo>, 205 /// Indices of subagent messages in chat (keyed by task_id) 206 pub subagent_indices: HashMap<String, usize>, 207 /// Whether conversation compaction is in progress 208 pub is_compacting: bool, 209 /// Info from the last completed compaction (for display) 210 pub last_compaction: Option<CompactionInfo>, 211 /// Claude session ID to resume (UUID from Claude CLI's session storage) 212 /// When set, the backend will use --resume to continue this session 213 pub resume_session_id: Option<String>, 214 /// Git status cache for this session's working directory 215 pub git_status: GitStatusCache, 216 /// Threading state for live kind-1988 event generation. 217 pub live_threading: ThreadingState, 218 /// Subscription for remote kind-1988 events (permission responses, commands). 219 /// Set up once when the session's claude_session_id becomes known. 220 pub conversation_action_sub: Option<nostrdb::Subscription>, 221 /// Status as reported by the remote desktop's kind-31988 event. 222 /// Only meaningful when session source is Remote. 223 pub remote_status: Option<AgentStatus>, 224 /// Timestamp of the kind-31988 event that last set `remote_status`. 225 /// Used to ignore older replaceable event revisions that arrive out of order. 226 pub remote_status_ts: u64, 227 /// Subscription for live kind-1988 conversation events from relays. 228 /// Used by remote sessions to receive new messages in real-time. 229 pub live_conversation_sub: Option<nostrdb::Subscription>, 230 /// Note IDs we've already processed from live conversation polling. 231 /// Prevents duplicate messages when events are loaded during restore 232 /// and then appear again via the subscription. 233 pub seen_note_ids: HashSet<[u8; 32]>, 234 /// Tracks the "Compact & Approve" lifecycle. 235 pub compact_and_proceed: CompactAndProceedState, 236 /// Accumulated usage metrics across queries in this session. 237 pub usage: crate::messages::UsageInfo, 238 /// Runtime allowlist for auto-accepting permissions this session. 239 /// For Bash: stores binary names (first word of command). 240 /// For other tools: stores the tool name. 241 pub runtime_allows: HashSet<String>, 242 /// Stable Nostr event identity for this session (d-tag for kind-31988 243 /// and kind-1988 events). Generated at creation, never changes. 244 /// Separate from the Claude CLI session ID used for `--resume`. 245 pub event_id: String, 246 } 247 248 impl AgenticSessionData { 249 pub fn new(id: SessionId, cwd: PathBuf) -> Self { 250 // Arrange sessions in a grid pattern 251 let col = (id as i32 - 1) % 4; 252 let row = (id as i32 - 1) / 4; 253 let x = col as f32 * 150.0 - 225.0; // Center around origin 254 let y = row as f32 * 150.0 - 75.0; 255 256 let git_status = GitStatusCache::new(cwd.clone()); 257 258 AgenticSessionData { 259 permissions: PermissionTracker::new(), 260 scene_position: egui::Vec2::new(x, y), 261 permission_mode: PermissionMode::Default, 262 permission_message_state: PermissionMessageState::None, 263 question_answers: HashMap::new(), 264 question_index: HashMap::new(), 265 cwd, 266 session_info: None, 267 subagent_indices: HashMap::new(), 268 is_compacting: false, 269 last_compaction: None, 270 resume_session_id: None, 271 git_status, 272 live_threading: ThreadingState::new(), 273 conversation_action_sub: None, 274 remote_status: None, 275 remote_status_ts: 0, 276 live_conversation_sub: None, 277 seen_note_ids: HashSet::new(), 278 compact_and_proceed: CompactAndProceedState::Idle, 279 usage: Default::default(), 280 runtime_allows: HashSet::new(), 281 event_id: uuid::Uuid::new_v4().to_string(), 282 } 283 } 284 285 /// Extract the runtime allow key from a permission request. 286 /// For Bash: first word of the command (binary name). 287 /// For other tools: the tool name itself. 288 fn runtime_allow_key(tool_name: &str, tool_input: &serde_json::Value) -> Option<String> { 289 if tool_name == "Bash" { 290 tool_input 291 .get("command") 292 .and_then(|v| v.as_str()) 293 .and_then(|cmd| cmd.split_whitespace().next()) 294 .map(|s| s.to_string()) 295 } else { 296 Some(tool_name.to_string()) 297 } 298 } 299 300 /// Check if a permission request matches the runtime allowlist. 301 pub fn should_runtime_allow(&self, tool_name: &str, tool_input: &serde_json::Value) -> bool { 302 if let Some(key) = Self::runtime_allow_key(tool_name, tool_input) { 303 self.runtime_allows.contains(&key) 304 } else { 305 false 306 } 307 } 308 309 /// Add a runtime allow rule from a permission request. 310 /// Returns the key that was added (for logging). 311 pub fn add_runtime_allow( 312 &mut self, 313 tool_name: &str, 314 tool_input: &serde_json::Value, 315 ) -> Option<String> { 316 let key = Self::runtime_allow_key(tool_name, tool_input)?; 317 self.runtime_allows.insert(key.clone()); 318 Some(key) 319 } 320 321 /// Stable Nostr event identity (d-tag for kind-1988 / kind-31988). 322 /// 323 /// This is always available — every session gets a UUID at creation. 324 /// It is independent of the Claude CLI session ID. 325 pub fn event_session_id(&self) -> &str { 326 &self.event_id 327 } 328 329 /// Get the CLI session ID for backend `--resume`. 330 /// 331 /// Returns the real Claude CLI session ID. `None` means the backend 332 /// hasn't started yet (no session to resume). 333 pub fn cli_resume_id(&self) -> Option<&str> { 334 self.session_info 335 .as_ref() 336 .and_then(|i| i.claude_session_id.as_deref()) 337 .or(self.resume_session_id.as_deref()) 338 } 339 340 /// Update a subagent's output (appending new content, keeping only the tail) 341 pub fn update_subagent_output( 342 &mut self, 343 chat: &mut [Message], 344 task_id: &str, 345 new_output: &str, 346 ) { 347 if let Some(&idx) = self.subagent_indices.get(task_id) { 348 if let Some(Message::Subagent(subagent)) = chat.get_mut(idx) { 349 subagent.output.push_str(new_output); 350 // Keep only the most recent content up to max_output_size 351 if subagent.output.len() > subagent.max_output_size { 352 let keep_from = subagent.output.len() - subagent.max_output_size; 353 subagent.output = subagent.output[keep_from..].to_string(); 354 } 355 } 356 } 357 } 358 359 /// Mark a subagent as completed 360 pub fn complete_subagent(&mut self, chat: &mut [Message], task_id: &str, result: &str) { 361 if let Some(&idx) = self.subagent_indices.get(task_id) { 362 if let Some(Message::Subagent(subagent)) = chat.get_mut(idx) { 363 subagent.status = SubagentStatus::Completed; 364 subagent.output = result.to_string(); 365 } 366 } 367 } 368 369 /// Try to fold a tool result into its parent subagent. 370 /// Returns None if folded, Some(result) if it couldn't be folded. 371 pub fn fold_tool_result( 372 &self, 373 chat: &mut [Message], 374 result: ExecutedTool, 375 ) -> Option<ExecutedTool> { 376 let Some(parent_id) = result.parent_task_id.as_ref() else { 377 return Some(result); 378 }; 379 let Some(&idx) = self.subagent_indices.get(parent_id) else { 380 return Some(result); 381 }; 382 if let Some(Message::Subagent(subagent)) = chat.get_mut(idx) { 383 subagent.tool_results.push(result); 384 None 385 } else { 386 Some(result) 387 } 388 } 389 } 390 391 /// Tracks the lifecycle of a dispatch to the AI backend. 392 /// 393 /// Transitions: 394 /// - `Idle → AwaitingResponse` when `send_user_message_for()` dispatches 395 /// - `AwaitingResponse → Streaming` when the backend produces content 396 /// - `Streaming | AwaitingResponse → Idle` at stream end 397 #[derive(Clone, Copy, Debug, Default, PartialEq)] 398 pub enum DispatchState { 399 /// No active dispatch. 400 #[default] 401 Idle, 402 /// Dispatched `count` trailing user messages; backend hasn't 403 /// produced visible content yet. 404 AwaitingResponse { count: usize }, 405 /// Backend is actively producing content for this dispatch. 406 Streaming { dispatched_count: usize }, 407 } 408 409 impl DispatchState { 410 /// Number of user messages that were dispatched in the current batch. 411 /// Used by `append_token` for insert position and UI for queued indicator. 412 pub fn dispatched_count(&self) -> usize { 413 match self { 414 DispatchState::Idle => 0, 415 DispatchState::AwaitingResponse { count } => *count, 416 DispatchState::Streaming { dispatched_count } => *dispatched_count, 417 } 418 } 419 420 /// Transition: backend produced content. 421 /// `AwaitingResponse → Streaming`; other states unchanged. 422 pub fn backend_responded(&mut self) { 423 if let DispatchState::AwaitingResponse { count } = *self { 424 *self = DispatchState::Streaming { 425 dispatched_count: count, 426 }; 427 } 428 } 429 430 /// Transition: stream ended. Resets to `Idle`. 431 pub fn stream_ended(&mut self) { 432 *self = DispatchState::Idle; 433 } 434 } 435 436 /// A single chat session with Dave 437 pub struct ChatSession { 438 pub id: SessionId, 439 pub chat: Vec<Message>, 440 pub input: String, 441 pub incoming_tokens: Option<Receiver<DaveApiResponse>>, 442 /// Handle to the background task processing this session's AI requests. 443 /// Aborted on drop to clean up the subprocess. 444 pub task_handle: Option<tokio::task::JoinHandle<()>>, 445 /// Tracks the dispatch lifecycle for redispatch and insert-position logic. 446 pub dispatch_state: DispatchState, 447 /// Cached status for the agent (derived from session state) 448 cached_status: AgentStatus, 449 /// Set when cached_status changes, cleared after publishing state event 450 pub state_dirty: bool, 451 /// Whether this session's input should be focused on the next frame 452 pub focus_requested: bool, 453 /// AI interaction mode for this session (Chat vs Agentic) 454 pub ai_mode: AiMode, 455 /// Agentic-mode specific data (None in Chat mode) 456 pub agentic: Option<AgenticSessionData>, 457 /// Whether this session is local (has a Claude process) or remote (relay-only). 458 pub source: SessionSource, 459 /// Session metadata for display (title, hostname, cwd) 460 pub details: SessionDetails, 461 /// Which backend this session uses (Claude, Codex, etc.) 462 pub backend_type: BackendType, 463 /// When the last AI response token was received (for "5m ago" display) 464 pub last_activity: Option<Instant>, 465 /// Focus indicator dot state (persisted in kind-31988 note). 466 /// Set on status transitions, cleared when user dismisses it. 467 pub indicator: Option<FocusPriority>, 468 } 469 470 impl Drop for ChatSession { 471 fn drop(&mut self) { 472 if let Some(handle) = self.task_handle.take() { 473 handle.abort(); 474 } 475 } 476 } 477 478 impl ChatSession { 479 pub fn new(id: SessionId, cwd: PathBuf, ai_mode: AiMode, backend_type: BackendType) -> Self { 480 let details_cwd = if ai_mode == AiMode::Agentic { 481 Some(cwd.clone()) 482 } else { 483 None 484 }; 485 let agentic = match ai_mode { 486 AiMode::Agentic => Some(AgenticSessionData::new(id, cwd)), 487 AiMode::Chat => None, 488 }; 489 490 ChatSession { 491 id, 492 chat: vec![], 493 input: String::new(), 494 incoming_tokens: None, 495 task_handle: None, 496 dispatch_state: DispatchState::Idle, 497 cached_status: AgentStatus::Idle, 498 state_dirty: true, 499 focus_requested: false, 500 ai_mode, 501 agentic, 502 source: SessionSource::Local, 503 details: SessionDetails { 504 title: "New Chat".to_string(), 505 custom_title: None, 506 hostname: String::new(), 507 cwd: details_cwd, 508 home_dir: dirs::home_dir() 509 .map(|h| h.to_string_lossy().to_string()) 510 .unwrap_or_default(), 511 }, 512 backend_type, 513 last_activity: None, 514 indicator: None, 515 } 516 } 517 518 /// Create a new session that resumes an existing Claude conversation 519 pub fn new_resumed( 520 id: SessionId, 521 cwd: PathBuf, 522 resume_session_id: String, 523 title: String, 524 ai_mode: AiMode, 525 backend_type: BackendType, 526 ) -> Self { 527 let mut session = Self::new(id, cwd, ai_mode, backend_type); 528 if let Some(ref mut agentic) = session.agentic { 529 if !resume_session_id.is_empty() { 530 agentic.resume_session_id = Some(resume_session_id); 531 } 532 } 533 session.details.title = title; 534 session 535 } 536 537 // === Helper methods for accessing agentic data === 538 539 /// Get agentic data, panics if not in agentic mode (use in agentic-only code paths) 540 pub fn agentic(&self) -> &AgenticSessionData { 541 self.agentic 542 .as_ref() 543 .expect("agentic data only available in Agentic mode") 544 } 545 546 /// Get mutable agentic data 547 pub fn agentic_mut(&mut self) -> &mut AgenticSessionData { 548 self.agentic 549 .as_mut() 550 .expect("agentic data only available in Agentic mode") 551 } 552 553 /// Check if session has agentic capabilities 554 pub fn is_agentic(&self) -> bool { 555 self.agentic.is_some() 556 } 557 558 /// Check if this is a remote session (observed via relay, no local process) 559 pub fn is_remote(&self) -> bool { 560 self.source == SessionSource::Remote 561 } 562 563 /// Check if session has pending permission requests 564 pub fn has_pending_permissions(&self) -> bool { 565 if self.is_remote() { 566 // Remote: check for unresponded PermissionRequest messages in chat 567 let responded = self.agentic.as_ref().map(|a| &a.permissions.responded); 568 return self.chat.iter().any(|msg| { 569 if let Message::PermissionRequest(req) = msg { 570 req.response.is_none() && responded.is_none_or(|ids| !ids.contains(&req.id)) 571 } else { 572 false 573 } 574 }); 575 } 576 // Local: check oneshot senders 577 self.agentic 578 .as_ref() 579 .is_some_and(|a| a.permissions.has_pending()) 580 } 581 582 /// Check if session is in plan mode 583 pub fn is_plan_mode(&self) -> bool { 584 self.agentic 585 .as_ref() 586 .is_some_and(|a| a.permission_mode == PermissionMode::Plan) 587 } 588 589 /// Get the current permission mode (defaults to Default for non-agentic) 590 pub fn permission_mode(&self) -> PermissionMode { 591 self.agentic 592 .as_ref() 593 .map(|a| a.permission_mode) 594 .unwrap_or(PermissionMode::Default) 595 } 596 597 /// Get the working directory (agentic only) 598 pub fn cwd(&self) -> Option<&PathBuf> { 599 self.agentic.as_ref().map(|a| &a.cwd) 600 } 601 602 /// Update a subagent's output (appending new content, keeping only the tail) 603 pub fn update_subagent_output(&mut self, task_id: &str, new_output: &str) { 604 if let Some(ref mut agentic) = self.agentic { 605 agentic.update_subagent_output(&mut self.chat, task_id, new_output); 606 } 607 } 608 609 /// Mark a subagent as completed 610 pub fn complete_subagent(&mut self, task_id: &str, result: &str) { 611 if let Some(ref mut agentic) = self.agentic { 612 agentic.complete_subagent(&mut self.chat, task_id, result); 613 } 614 } 615 616 /// Try to fold a tool result into its parent subagent. 617 /// Returns None if folded, Some(result) if it couldn't be folded. 618 pub fn fold_tool_result(&mut self, result: ExecutedTool) -> Option<ExecutedTool> { 619 if let Some(ref agentic) = self.agentic { 620 agentic.fold_tool_result(&mut self.chat, result) 621 } else { 622 Some(result) 623 } 624 } 625 626 /// Update the session title from the last message (user or assistant) 627 pub fn update_title_from_last_message(&mut self) { 628 for msg in self.chat.iter().rev() { 629 let text: &str = match msg { 630 Message::User(text) => text, 631 Message::Assistant(msg) => msg.text(), 632 _ => continue, 633 }; 634 // Use first ~30 chars of last message as title 635 let title: String = text.chars().take(30).collect(); 636 let new_title = if text.len() > 30 { 637 format!("{}...", title) 638 } else { 639 title 640 }; 641 if new_title != self.details.title { 642 self.details.title = new_title; 643 self.state_dirty = true; 644 } 645 break; 646 } 647 } 648 649 /// Get the current status of this session/agent 650 pub fn status(&self) -> AgentStatus { 651 self.cached_status 652 } 653 654 /// Update the cached status based on current session state. 655 /// Sets `state_dirty` when the status actually changes. 656 /// Also sets the focus indicator when transitioning to a notable state. 657 pub fn update_status(&mut self) { 658 let new_status = self.derive_status(); 659 if new_status != self.cached_status { 660 self.cached_status = new_status; 661 if let Some(priority) = FocusPriority::from_status(new_status) { 662 // Set indicator when entering a notable state 663 self.indicator = Some(priority); 664 } else if self.indicator.is_some() { 665 // Clear stale indicator when agent resumes work 666 self.indicator = None; 667 } 668 self.state_dirty = true; 669 } 670 } 671 672 /// Derive status from the current session state 673 fn derive_status(&self) -> AgentStatus { 674 // Remote sessions derive status from the kind-31988 state event, 675 // but override to NeedsInput if there are unresponded permission requests. 676 if self.is_remote() { 677 if self.has_pending_permissions() { 678 return AgentStatus::NeedsInput; 679 } 680 return self 681 .agentic 682 .as_ref() 683 .and_then(|a| a.remote_status) 684 .unwrap_or(AgentStatus::Idle); 685 } 686 687 // Check for pending permission requests (needs input) - agentic only 688 if self.has_pending_permissions() { 689 return AgentStatus::NeedsInput; 690 } 691 692 // Check for error in last message 693 if let Some(Message::Error(_)) = self.chat.last() { 694 return AgentStatus::Error; 695 } 696 697 // Check if actively working (has task handle and receiving tokens) 698 if self.task_handle.is_some() && self.incoming_tokens.is_some() { 699 return AgentStatus::Working; 700 } 701 702 // Check if done (has messages and no active task) 703 if !self.chat.is_empty() && self.task_handle.is_none() { 704 // Check if the last meaningful message was from assistant 705 for msg in self.chat.iter().rev() { 706 match msg { 707 Message::Assistant(_) | Message::CompactionComplete(_) => { 708 return AgentStatus::Done; 709 } 710 Message::User(_) => return AgentStatus::Idle, // Waiting for response 711 Message::Error(_) => return AgentStatus::Error, 712 _ => continue, 713 } 714 } 715 } 716 717 AgentStatus::Idle 718 } 719 } 720 721 /// Tracks a pending external editor process 722 pub struct EditorJob { 723 /// The spawned editor process 724 pub child: std::process::Child, 725 /// Path to the temp file being edited 726 pub temp_path: PathBuf, 727 /// Session ID that initiated the editor 728 pub session_id: SessionId, 729 } 730 731 /// Manages multiple chat sessions 732 pub struct SessionManager { 733 sessions: HashMap<SessionId, ChatSession>, 734 order: Vec<SessionId>, // Sorted by recency (most recent first) 735 active: Option<SessionId>, 736 next_id: SessionId, 737 /// Pending external editor job (only one at a time) 738 pub pending_editor: Option<EditorJob>, 739 /// Cached agent grouping by hostname. Each entry is (hostname, session IDs 740 /// in recency order). Rebuilt via `rebuild_host_groups()` when sessions or 741 /// hostnames change. 742 host_groups: Vec<(String, Vec<SessionId>)>, 743 /// Cached chat session IDs in recency order. Rebuilt alongside host_groups. 744 chat_ids: Vec<SessionId>, 745 } 746 747 impl Default for SessionManager { 748 fn default() -> Self { 749 Self::new() 750 } 751 } 752 753 impl SessionManager { 754 pub fn new() -> Self { 755 SessionManager { 756 sessions: HashMap::new(), 757 order: Vec::new(), 758 active: None, 759 next_id: 1, 760 pending_editor: None, 761 host_groups: Vec::new(), 762 chat_ids: Vec::new(), 763 } 764 } 765 766 /// Create a new session with the given cwd and make it active 767 pub fn new_session( 768 &mut self, 769 cwd: PathBuf, 770 ai_mode: AiMode, 771 backend_type: BackendType, 772 ) -> SessionId { 773 let id = self.next_id; 774 self.next_id += 1; 775 776 let session = ChatSession::new(id, cwd, ai_mode, backend_type); 777 self.sessions.insert(id, session); 778 self.order.insert(0, id); // Most recent first 779 self.active = Some(id); 780 self.rebuild_host_groups(); 781 782 id 783 } 784 785 /// Create a new session that resumes an existing Claude conversation 786 pub fn new_resumed_session( 787 &mut self, 788 cwd: PathBuf, 789 resume_session_id: String, 790 title: String, 791 ai_mode: AiMode, 792 backend_type: BackendType, 793 ) -> SessionId { 794 let id = self.next_id; 795 self.next_id += 1; 796 797 let session = 798 ChatSession::new_resumed(id, cwd, resume_session_id, title, ai_mode, backend_type); 799 self.sessions.insert(id, session); 800 self.order.insert(0, id); // Most recent first 801 self.active = Some(id); 802 self.rebuild_host_groups(); 803 804 id 805 } 806 807 /// Get a reference to the active session 808 pub fn get_active(&self) -> Option<&ChatSession> { 809 self.active.and_then(|id| self.sessions.get(&id)) 810 } 811 812 /// Get a mutable reference to the active session 813 pub fn get_active_mut(&mut self) -> Option<&mut ChatSession> { 814 self.active.and_then(|id| self.sessions.get_mut(&id)) 815 } 816 817 /// Get the active session ID 818 pub fn active_id(&self) -> Option<SessionId> { 819 self.active 820 } 821 822 /// Switch to a different session 823 pub fn switch_to(&mut self, id: SessionId) -> bool { 824 if self.sessions.contains_key(&id) { 825 self.active = Some(id); 826 true 827 } else { 828 false 829 } 830 } 831 832 /// Delete a session 833 /// Returns true if the session was deleted, false if it didn't exist. 834 /// If the last session is deleted, active will be None and the caller 835 /// should open the directory picker to create a new session. 836 pub fn delete_session(&mut self, id: SessionId) -> bool { 837 if self.sessions.remove(&id).is_some() { 838 self.order.retain(|&x| x != id); 839 840 // If we deleted the active session, switch to another 841 if self.active == Some(id) { 842 self.active = self.order.first().copied(); 843 } 844 self.rebuild_host_groups(); 845 true 846 } else { 847 false 848 } 849 } 850 851 /// Get sessions in order of recency (most recent first) 852 pub fn sessions_ordered(&self) -> Vec<&ChatSession> { 853 self.order 854 .iter() 855 .filter_map(|id| self.sessions.get(id)) 856 .collect() 857 } 858 859 /// Update the recency of a session (move to front of order) 860 pub fn touch(&mut self, id: SessionId) { 861 if self.sessions.contains_key(&id) { 862 self.order.retain(|&x| x != id); 863 self.order.insert(0, id); 864 } 865 } 866 867 /// Get the number of sessions 868 pub fn len(&self) -> usize { 869 self.sessions.len() 870 } 871 872 /// Check if there are no sessions 873 pub fn is_empty(&self) -> bool { 874 self.sessions.is_empty() 875 } 876 877 /// Get a reference to a session by ID 878 pub fn get(&self, id: SessionId) -> Option<&ChatSession> { 879 self.sessions.get(&id) 880 } 881 882 /// Get a mutable reference to a session by ID 883 pub fn get_mut(&mut self, id: SessionId) -> Option<&mut ChatSession> { 884 self.sessions.get_mut(&id) 885 } 886 887 /// Iterate over all sessions 888 pub fn iter(&self) -> impl Iterator<Item = &ChatSession> { 889 self.sessions.values() 890 } 891 892 /// Iterate over all sessions mutably 893 pub fn iter_mut(&mut self) -> impl Iterator<Item = &mut ChatSession> { 894 self.sessions.values_mut() 895 } 896 897 /// Update status for all sessions 898 pub fn update_all_statuses(&mut self) { 899 for session in self.sessions.values_mut() { 900 session.update_status(); 901 } 902 } 903 904 /// Get the first session that needs attention (NeedsInput status) 905 pub fn find_needs_attention(&self) -> Option<SessionId> { 906 for session in self.sessions.values() { 907 if session.status() == AgentStatus::NeedsInput { 908 return Some(session.id); 909 } 910 } 911 None 912 } 913 914 /// Get all session IDs 915 pub fn session_ids(&self) -> Vec<SessionId> { 916 self.order.clone() 917 } 918 919 /// Get cached agent session groups by hostname. 920 /// Each entry is (hostname, session IDs in recency order). 921 pub fn host_groups(&self) -> &[(String, Vec<SessionId>)] { 922 &self.host_groups 923 } 924 925 /// Get cached chat session IDs in recency order. 926 pub fn chat_ids(&self) -> &[SessionId] { 927 &self.chat_ids 928 } 929 930 /// Session IDs in visual/display order (host groups then chats). 931 /// Keybinding numbers (Ctrl+1-9) map to this order. 932 pub fn visual_order(&self) -> Vec<SessionId> { 933 let mut ids = Vec::new(); 934 for (_, group_ids) in &self.host_groups { 935 ids.extend_from_slice(group_ids); 936 } 937 ids.extend_from_slice(&self.chat_ids); 938 ids 939 } 940 941 /// Get a session's index in the recency-ordered list (for keyboard shortcuts). 942 pub fn session_index(&self, id: SessionId) -> Option<usize> { 943 self.order.iter().position(|&oid| oid == id) 944 } 945 946 /// Rebuild the cached hostname groups from current sessions and order. 947 /// Call after adding/removing sessions or changing a session's hostname. 948 pub fn rebuild_host_groups(&mut self) { 949 self.host_groups.clear(); 950 self.chat_ids.clear(); 951 952 for &id in &self.order { 953 if let Some(session) = self.sessions.get(&id) { 954 if session.ai_mode != AiMode::Agentic { 955 if session.ai_mode == AiMode::Chat { 956 self.chat_ids.push(id); 957 } 958 continue; 959 } 960 let hostname = &session.details.hostname; 961 if let Some(group) = self.host_groups.iter_mut().find(|(h, _)| h == hostname) { 962 group.1.push(id); 963 } else { 964 self.host_groups.push((hostname.clone(), vec![id])); 965 } 966 } 967 } 968 969 // Sort groups by hostname for stable ordering 970 self.host_groups.sort_by(|a, b| a.0.cmp(&b.0)); 971 } 972 } 973 974 impl ChatSession { 975 /// Whether the session is actively streaming a response from the backend. 976 pub fn is_streaming(&self) -> bool { 977 self.incoming_tokens.is_some() 978 } 979 980 /// Whether a dispatch is active (message sent to backend, waiting for 981 /// or receiving response). This is more reliable than `is_streaming()` 982 /// because it covers the window between dispatch and first token arrival. 983 pub fn is_dispatched(&self) -> bool { 984 !matches!(self.dispatch_state, DispatchState::Idle) 985 } 986 987 /// Append a streaming token to the current assistant message. 988 /// 989 /// If the last message is an Assistant, append there. Otherwise 990 /// search backwards through only trailing User messages (queued 991 /// ones) for a still-streaming Assistant. If none is found, 992 /// create a new Assistant — inserted after the dispatched user 993 /// message but before any queued ones. 994 /// 995 /// We intentionally do NOT search past ToolCalls, ToolResponse, 996 /// or other non-User messages. When Claude sends text → tool 997 /// call → more text, the post-tool tokens must go into a NEW 998 /// Assistant so the tool call appears between the two text blocks. 999 pub fn append_token(&mut self, token: &str) { 1000 // Content arrived — transition AwaitingResponse → Streaming. 1001 self.dispatch_state.backend_responded(); 1002 self.last_activity = Some(Instant::now()); 1003 1004 // Fast path: last message is the active assistant response 1005 if let Some(Message::Assistant(msg)) = self.chat.last_mut() { 1006 msg.push_token(token); 1007 return; 1008 } 1009 1010 // Slow path: look backwards through only trailing User messages. 1011 // If we find a streaming Assistant just before them, append there. 1012 let mut appended = false; 1013 for m in self.chat.iter_mut().rev() { 1014 match m { 1015 Message::User(_) => continue, // skip queued user messages 1016 Message::Assistant(msg) if msg.is_streaming() => { 1017 msg.push_token(token); 1018 appended = true; 1019 break; 1020 } 1021 _ => break, // stop at ToolCalls, ToolResponse, finalized Assistant, etc. 1022 } 1023 } 1024 1025 if !appended { 1026 // No streaming assistant reachable — start a new one. 1027 // Insert after the dispatched user messages but before 1028 // any newly queued ones so the response appears in the 1029 // right order and queued messages trigger redispatch. 1030 let mut msg = crate::messages::AssistantMessage::new(); 1031 msg.push_token(token); 1032 1033 let trailing_start = self 1034 .chat 1035 .iter() 1036 .rposition(|m| !matches!(m, Message::User(_))) 1037 .map(|i| i + 1) 1038 .unwrap_or(0); 1039 1040 // Skip past the dispatched user messages (default 1 for 1041 // single dispatch, more for batch redispatch) 1042 let skip = self.dispatch_state.dispatched_count().max(1); 1043 let insert_pos = (trailing_start + skip).min(self.chat.len()); 1044 self.chat.insert(insert_pos, Message::Assistant(msg)); 1045 } 1046 } 1047 1048 /// Finalize the last assistant message (cache parsed markdown, etc). 1049 /// 1050 /// Searches backwards because queued user messages may appear after 1051 /// the assistant response in the chat. 1052 pub fn finalize_last_assistant(&mut self) { 1053 for msg in self.chat.iter_mut().rev() { 1054 if let Message::Assistant(assistant) = msg { 1055 assistant.finalize(); 1056 return; 1057 } 1058 } 1059 } 1060 1061 /// Get the text of the last assistant message. 1062 /// 1063 /// Searches backwards because queued user messages may appear after 1064 /// the assistant response in the chat. 1065 pub fn last_assistant_text(&self) -> Option<String> { 1066 self.chat.iter().rev().find_map(|m| match m { 1067 Message::Assistant(msg) => { 1068 let text = msg.text().to_string(); 1069 if text.is_empty() { 1070 None 1071 } else { 1072 Some(text) 1073 } 1074 } 1075 _ => None, 1076 }) 1077 } 1078 1079 /// Whether the session has an unanswered user message at the end of the 1080 /// chat that needs to be dispatched to the backend. 1081 pub fn has_pending_user_message(&self) -> bool { 1082 matches!(self.chat.last(), Some(Message::User(_))) 1083 } 1084 1085 /// Whether a newly arrived remote user message should be dispatched to 1086 /// the backend right now. Returns false if a dispatch is already 1087 /// active — the message is already in chat and will be picked up 1088 /// when the current stream finishes. 1089 pub fn should_dispatch_remote_message(&self) -> bool { 1090 !self.is_dispatched() && self.has_pending_user_message() 1091 } 1092 1093 /// Mark the current trailing user messages as dispatched to the backend. 1094 /// Call this when starting a new stream for this session. 1095 pub fn mark_dispatched(&mut self) { 1096 let count = self.trailing_user_count(); 1097 self.dispatch_state = DispatchState::AwaitingResponse { count }; 1098 } 1099 1100 /// Count trailing user messages at the end of the chat. 1101 pub fn trailing_user_count(&self) -> usize { 1102 self.chat 1103 .iter() 1104 .rev() 1105 .take_while(|m| matches!(m, Message::User(_))) 1106 .count() 1107 } 1108 1109 /// Whether the session needs a re-dispatch after a stream ends. 1110 /// This catches user messages that arrived while we were streaming. 1111 /// 1112 /// Uses `dispatch_state` to distinguish genuinely new messages from 1113 /// messages that were already dispatched: 1114 /// 1115 /// - `Streaming`: backend responded, so any trailing user messages 1116 /// are genuinely new (queued during the response). 1117 /// - `AwaitingResponse`: backend returned empty. Only redispatch if 1118 /// NEW messages arrived beyond what was dispatched (prevents the 1119 /// infinite loop on empty responses). 1120 /// - `Idle`: nothing to redispatch. 1121 pub fn needs_redispatch_after_stream_end(&self) -> bool { 1122 match self.dispatch_state { 1123 DispatchState::Streaming { .. } => self.has_pending_user_message(), 1124 DispatchState::AwaitingResponse { count } => self.trailing_user_count() > count, 1125 DispatchState::Idle => false, 1126 } 1127 } 1128 1129 /// If "Compact & Approve" has reached ReadyToProceed, consume the state, 1130 /// push a "Proceed" user message, and return true. 1131 /// 1132 /// Called from: 1133 /// - Local sessions: at stream-end in process_events() 1134 /// - Remote sessions: on compaction_complete in poll_remote_conversation_events() 1135 pub fn take_compact_and_proceed(&mut self) -> bool { 1136 let dominated = self 1137 .agentic 1138 .as_ref() 1139 .is_none_or(|a| a.compact_and_proceed != CompactAndProceedState::ReadyToProceed); 1140 1141 if dominated { 1142 return false; 1143 } 1144 1145 self.agentic.as_mut().unwrap().compact_and_proceed = CompactAndProceedState::Idle; 1146 self.chat 1147 .push(Message::User("Proceed with implementing the plan.".into())); 1148 true 1149 } 1150 } 1151 1152 #[cfg(test)] 1153 mod tests { 1154 use super::*; 1155 use crate::config::AiMode; 1156 use crate::messages::AssistantMessage; 1157 use std::sync::mpsc; 1158 1159 fn test_session() -> ChatSession { 1160 ChatSession::new( 1161 1, 1162 PathBuf::from("/tmp"), 1163 AiMode::Agentic, 1164 BackendType::Claude, 1165 ) 1166 } 1167 1168 #[test] 1169 fn dispatch_when_idle_with_user_message() { 1170 let mut session = test_session(); 1171 session.chat.push(Message::User("hello".into())); 1172 assert!(session.should_dispatch_remote_message()); 1173 } 1174 1175 #[test] 1176 fn no_dispatch_while_streaming() { 1177 let mut session = test_session(); 1178 session.chat.push(Message::User("hello".into())); 1179 1180 // Dispatch and start streaming 1181 let _tx = make_streaming(&mut session); 1182 1183 // New user message arrives while streaming 1184 session.chat.push(Message::User("another".into())); 1185 assert!(!session.should_dispatch_remote_message()); 1186 } 1187 1188 #[test] 1189 fn redispatch_after_stream_ends_with_pending_user_message() { 1190 let mut session = test_session(); 1191 session.chat.push(Message::User("msg1".into())); 1192 1193 // Dispatch and start streaming 1194 let tx = make_streaming(&mut session); 1195 1196 // Assistant responds via append_token (transitions to Streaming) 1197 session.append_token("response"); 1198 session.finalize_last_assistant(); 1199 1200 // New user message arrives while stream is still open 1201 session.chat.push(Message::User("msg2".into())); 1202 1203 // Stream ends 1204 drop(tx); 1205 session.incoming_tokens = None; 1206 1207 assert!(session.needs_redispatch_after_stream_end()); 1208 } 1209 1210 #[test] 1211 fn no_redispatch_when_assistant_is_last() { 1212 let mut session = test_session(); 1213 session.chat.push(Message::User("hello".into())); 1214 1215 // Dispatch and start streaming 1216 let tx = make_streaming(&mut session); 1217 1218 // Backend responds 1219 session.append_token("done"); 1220 session.finalize_last_assistant(); 1221 1222 drop(tx); 1223 session.incoming_tokens = None; 1224 1225 assert!(!session.needs_redispatch_after_stream_end()); 1226 } 1227 1228 /// The key bug scenario: multiple remote messages arrive across frames 1229 /// while streaming. None should trigger dispatch. After stream ends, 1230 /// the last pending message should trigger redispatch. 1231 #[test] 1232 fn multiple_remote_messages_while_streaming() { 1233 let mut session = test_session(); 1234 1235 // First message — dispatched normally 1236 session.chat.push(Message::User("msg1".into())); 1237 assert!(session.should_dispatch_remote_message()); 1238 1239 // Dispatch and start streaming 1240 let tx = make_streaming(&mut session); 1241 1242 // Messages arrive one per frame while streaming 1243 session.chat.push(Message::User("msg2".into())); 1244 assert!(!session.should_dispatch_remote_message()); 1245 1246 session.chat.push(Message::User("msg3".into())); 1247 assert!(!session.should_dispatch_remote_message()); 1248 1249 // Stream ends (backend didn't produce content — e.g. connection dropped) 1250 drop(tx); 1251 session.incoming_tokens = None; 1252 1253 // Should redispatch — new messages arrived beyond what was dispatched 1254 assert!(session.needs_redispatch_after_stream_end()); 1255 } 1256 1257 // ---- append_token tests ---- 1258 1259 #[test] 1260 fn append_token_creates_assistant_when_empty() { 1261 let mut session = test_session(); 1262 session.append_token("hello"); 1263 assert!(matches!(session.chat.last(), Some(Message::Assistant(_)))); 1264 assert_eq!(session.last_assistant_text().unwrap(), "hello"); 1265 } 1266 1267 #[test] 1268 fn append_token_extends_existing_assistant() { 1269 let mut session = test_session(); 1270 session.chat.push(Message::User("hi".into())); 1271 session.append_token("hel"); 1272 session.append_token("lo"); 1273 assert_eq!(session.last_assistant_text().unwrap(), "hello"); 1274 assert!(matches!(session.chat.last(), Some(Message::Assistant(_)))); 1275 } 1276 1277 /// The key bug this prevents: tokens arriving after a queued user 1278 /// message must NOT create a new Assistant that buries the queued 1279 /// message. They should append to the existing Assistant before it. 1280 #[test] 1281 fn tokens_after_queued_message_dont_bury_it() { 1282 let mut session = test_session(); 1283 1284 // User sends initial message, dispatched and streaming starts 1285 session.chat.push(Message::User("hello".into())); 1286 let _tx = make_streaming(&mut session); 1287 session.append_token("Sure, "); 1288 session.append_token("I can "); 1289 1290 // User queues a follow-up while streaming 1291 session.chat.push(Message::User("also do this".into())); 1292 1293 // More tokens arrive from the CURRENT stream (not the queued msg) 1294 session.append_token("help!"); 1295 1296 // The queued user message must still be last 1297 assert!( 1298 matches!(session.chat.last(), Some(Message::User(_))), 1299 "queued user message should still be the last message" 1300 ); 1301 assert!(session.has_pending_user_message()); 1302 1303 // Tokens should have been appended to the existing assistant 1304 assert_eq!(session.last_assistant_text().unwrap(), "Sure, I can help!"); 1305 1306 // After stream ends, redispatch should fire 1307 assert!(session.needs_redispatch_after_stream_end()); 1308 } 1309 1310 /// Multiple queued messages: all should remain after the assistant 1311 /// response, and redispatch should still trigger. 1312 #[test] 1313 fn multiple_queued_messages_preserved() { 1314 let mut session = test_session(); 1315 1316 session.chat.push(Message::User("first".into())); 1317 let _tx = make_streaming(&mut session); 1318 session.append_token("response"); 1319 1320 // Queue two messages 1321 session.chat.push(Message::User("second".into())); 1322 session.chat.push(Message::User("third".into())); 1323 1324 // More tokens arrive 1325 session.append_token(" done"); 1326 1327 // Last message should still be the queued user message 1328 assert!(session.has_pending_user_message()); 1329 assert!(session.needs_redispatch_after_stream_end()); 1330 1331 // Assistant text should be the combined response 1332 assert_eq!(session.last_assistant_text().unwrap(), "response done"); 1333 } 1334 1335 /// After a turn is finalized, a new user message is sent and Claude 1336 /// responds. Tokens for the NEW response must create a new Assistant 1337 /// after the user message, not append to the finalized old one. 1338 /// This was the root cause of the infinite redispatch loop. 1339 #[test] 1340 fn tokens_after_finalized_turn_create_new_assistant() { 1341 let mut session = test_session(); 1342 1343 // Complete turn 1 1344 session.chat.push(Message::User("hello".into())); 1345 session.append_token("first response"); 1346 session.finalize_last_assistant(); 1347 1348 // User sends a new message (primary, not queued) 1349 session.chat.push(Message::User("follow up".into())); 1350 1351 // Tokens arrive from Claude's new response 1352 session.append_token("second "); 1353 session.append_token("response"); 1354 1355 // The new tokens must be in a NEW assistant after the user message 1356 assert!( 1357 matches!(session.chat.last(), Some(Message::Assistant(_))), 1358 "new assistant should be the last message" 1359 ); 1360 assert_eq!(session.last_assistant_text().unwrap(), "second response"); 1361 1362 // The old assistant should still have its original text 1363 let first_assistant_text = session 1364 .chat 1365 .iter() 1366 .find_map(|m| match m { 1367 Message::Assistant(msg) => { 1368 let t = msg.text().to_string(); 1369 if t == "first response" { 1370 Some(t) 1371 } else { 1372 None 1373 } 1374 } 1375 _ => None, 1376 }) 1377 .expect("original assistant should still exist"); 1378 assert_eq!(first_assistant_text, "first response"); 1379 1380 // No pending user message — assistant is last 1381 assert!(!session.has_pending_user_message()); 1382 } 1383 1384 /// When a queued message arrives before the first token, the new 1385 /// Assistant must be inserted between the dispatched user message 1386 /// and the queued one, not after the queued one. 1387 #[test] 1388 fn queued_before_first_token_ordering() { 1389 let mut session = test_session(); 1390 1391 // Turn 1 complete 1392 session.chat.push(Message::User("hello".into())); 1393 session.append_token("response 1"); 1394 session.finalize_last_assistant(); 1395 1396 // User sends a new message, dispatched to Claude (single dispatch) 1397 session.chat.push(Message::User("follow up".into())); 1398 session.mark_dispatched(); 1399 1400 // User queues another message BEFORE any tokens arrive 1401 session.chat.push(Message::User("queued msg".into())); 1402 1403 // Now first token arrives from Claude's response to "follow up" 1404 session.append_token("response "); 1405 session.append_token("2"); 1406 1407 // Expected order: User("follow up"), Assistant("response 2"), User("queued msg") 1408 let msgs: Vec<&str> = session 1409 .chat 1410 .iter() 1411 .filter_map(|m| match m { 1412 Message::User(s) if s == "follow up" => Some("U:follow up"), 1413 Message::User(s) if s == "queued msg" => Some("U:queued msg"), 1414 Message::Assistant(a) if a.text() == "response 2" => Some("A:response 2"), 1415 _ => None, 1416 }) 1417 .collect(); 1418 assert_eq!( 1419 msgs, 1420 vec!["U:follow up", "A:response 2", "U:queued msg"], 1421 "assistant response should appear between dispatched and queued messages" 1422 ); 1423 1424 // Queued message should still be last → triggers redispatch 1425 assert!(session.has_pending_user_message()); 1426 } 1427 1428 /// Text → tool call → more text: post-tool tokens must create a 1429 /// new Assistant so the tool call appears between the two text blocks, 1430 /// not get appended to the pre-tool Assistant (which would push the 1431 /// tool call to the bottom). 1432 #[test] 1433 fn tokens_after_tool_call_create_new_assistant() { 1434 let mut session = test_session(); 1435 1436 session.chat.push(Message::User("do something".into())); 1437 session.append_token("Let me read that file."); 1438 1439 // Tool call arrives mid-stream 1440 let tool = crate::tools::ToolCall::invalid( 1441 "call-1".into(), 1442 Some("Read".into()), 1443 None, 1444 "test".into(), 1445 ); 1446 session.chat.push(Message::ToolCalls(vec![tool])); 1447 session 1448 .chat 1449 .push(Message::ToolResponse(crate::tools::ToolResponse::error( 1450 "call-1".into(), 1451 "test result".into(), 1452 ))); 1453 1454 // More tokens arrive after the tool call 1455 session.append_token("Here is what I found."); 1456 1457 // Verify ordering: Assistant, ToolCalls, ToolResponse, Assistant 1458 let labels: Vec<&str> = session 1459 .chat 1460 .iter() 1461 .map(|m| match m { 1462 Message::User(_) => "User", 1463 Message::Assistant(_) => "Assistant", 1464 Message::ToolCalls(_) => "ToolCalls", 1465 Message::ToolResponse(_) => "ToolResponse", 1466 _ => "Other", 1467 }) 1468 .collect(); 1469 assert_eq!( 1470 labels, 1471 vec![ 1472 "User", 1473 "Assistant", 1474 "ToolCalls", 1475 "ToolResponse", 1476 "Assistant" 1477 ], 1478 "post-tool tokens should be in a new assistant, not appended to the first" 1479 ); 1480 1481 // Verify content of each assistant 1482 let assistants: Vec<String> = session 1483 .chat 1484 .iter() 1485 .filter_map(|m| match m { 1486 Message::Assistant(a) => Some(a.text().to_string()), 1487 _ => None, 1488 }) 1489 .collect(); 1490 assert_eq!(assistants[0], "Let me read that file."); 1491 assert_eq!(assistants[1], "Here is what I found."); 1492 } 1493 1494 // ---- finalize_last_assistant tests ---- 1495 1496 #[test] 1497 fn finalize_finds_assistant_before_queued_messages() { 1498 let mut session = test_session(); 1499 1500 session.chat.push(Message::User("hi".into())); 1501 session.append_token("response"); 1502 session.chat.push(Message::User("queued".into())); 1503 1504 // Should finalize without panicking, even though last() is User 1505 session.finalize_last_assistant(); 1506 1507 // Verify the queued message is still there 1508 assert!(session.has_pending_user_message()); 1509 } 1510 1511 // ---- status tests ---- 1512 1513 /// Helper to put a session into "streaming" state. 1514 /// Also calls `mark_dispatched()` to mirror what `send_user_message_for()` 1515 /// does in real code — the trailing user messages are marked as dispatched. 1516 fn make_streaming(session: &mut ChatSession) -> mpsc::Sender<DaveApiResponse> { 1517 session.mark_dispatched(); 1518 let (tx, rx) = mpsc::channel::<DaveApiResponse>(); 1519 session.incoming_tokens = Some(rx); 1520 tx 1521 } 1522 1523 #[test] 1524 fn status_idle_initially() { 1525 let session = test_session(); 1526 assert_eq!(session.status(), AgentStatus::Idle); 1527 } 1528 1529 #[test] 1530 fn status_idle_with_pending_user_message() { 1531 let mut session = test_session(); 1532 session.chat.push(Message::User("hello".into())); 1533 session.update_status(); 1534 // No task handle or incoming tokens → Idle 1535 assert_eq!(session.status(), AgentStatus::Idle); 1536 } 1537 1538 #[test] 1539 fn status_done_when_assistant_is_last() { 1540 let mut session = test_session(); 1541 session.chat.push(Message::User("hello".into())); 1542 session 1543 .chat 1544 .push(Message::Assistant(AssistantMessage::from_text( 1545 "reply".into(), 1546 ))); 1547 session.update_status(); 1548 assert_eq!(session.status(), AgentStatus::Done); 1549 } 1550 1551 // ---- batch redispatch lifecycle tests ---- 1552 1553 /// Simulates the full lifecycle of queued message batch dispatch: 1554 /// 1. User sends message → dispatched 1555 /// 2. While streaming, user queues 3 more messages 1556 /// 3. Stream ends → needs_redispatch is true 1557 /// 4. On redispatch, get_pending_user_messages collects all 3 1558 /// 5. After redispatch, new tokens create response after all queued msgs 1559 #[test] 1560 fn batch_redispatch_full_lifecycle() { 1561 let mut session = test_session(); 1562 use crate::backend::shared; 1563 1564 // Step 1: User sends first message, it gets dispatched (single) 1565 session.chat.push(Message::User("hello".into())); 1566 assert!(session.should_dispatch_remote_message()); 1567 1568 // Backend starts streaming (mark_dispatched called by make_streaming) 1569 let tx = make_streaming(&mut session); 1570 assert!(session.is_streaming()); 1571 assert!(!session.should_dispatch_remote_message()); 1572 1573 // First tokens arrive 1574 session.append_token("Sure, "); 1575 session.append_token("I can help."); 1576 1577 // Step 2: User queues 3 messages while streaming 1578 session.chat.push(Message::User("also".into())); 1579 session.chat.push(Message::User("do this".into())); 1580 session.chat.push(Message::User("and this".into())); 1581 1582 // Should NOT dispatch while streaming 1583 assert!(!session.should_dispatch_remote_message()); 1584 1585 // More tokens arrive — should append to the streaming assistant, 1586 // not create new ones after the queued messages 1587 session.append_token(" Let me "); 1588 session.append_token("check."); 1589 1590 // Verify the assistant text is continuous 1591 assert_eq!( 1592 session.last_assistant_text().unwrap(), 1593 "Sure, I can help. Let me check." 1594 ); 1595 1596 // Queued messages should still be at the end 1597 assert!(session.has_pending_user_message()); 1598 1599 // Step 3: Stream ends 1600 session.finalize_last_assistant(); 1601 drop(tx); 1602 session.incoming_tokens = None; 1603 1604 assert!(!session.is_streaming()); 1605 assert!(session.needs_redispatch_after_stream_end()); 1606 1607 // Step 4: At redispatch time, get_pending_user_messages should 1608 // collect ALL trailing user messages 1609 let prompt = shared::get_pending_user_messages(&session.chat); 1610 assert_eq!(prompt, "also\ndo this\nand this"); 1611 1612 // Step 5: Backend dispatches with the batch prompt (3 messages) 1613 let _tx2 = make_streaming(&mut session); 1614 1615 // New tokens arrive — should create a new assistant after ALL 1616 // dispatched messages (since they were all sent in the batch) 1617 session.append_token("OK, doing all three."); 1618 1619 // Verify chat order: response 2 should come after all 3 1620 // batch-dispatched user messages 1621 let types: Vec<&str> = session 1622 .chat 1623 .iter() 1624 .map(|m| match m { 1625 Message::User(_) => "User", 1626 Message::Assistant(_) => "Assistant", 1627 _ => "?", 1628 }) 1629 .collect(); 1630 assert_eq!( 1631 types, 1632 // Turn 1: User → Assistant 1633 // Turn 2: User, User, User (batch) → Assistant 1634 vec!["User", "Assistant", "User", "User", "User", "Assistant"], 1635 ); 1636 // Verify the second assistant has the right text 1637 assert_eq!( 1638 session.last_assistant_text().unwrap(), 1639 "OK, doing all three." 1640 ); 1641 } 1642 1643 /// When all queued messages are batch-dispatched, no redispatch 1644 /// should be needed after the second stream completes (assuming 1645 /// no new messages arrive). 1646 #[test] 1647 fn no_double_redispatch_after_batch() { 1648 let mut session = test_session(); 1649 1650 // Turn 1: single dispatch 1651 session.chat.push(Message::User("first".into())); 1652 let tx = make_streaming(&mut session); 1653 session.append_token("response 1"); 1654 session.chat.push(Message::User("queued A".into())); 1655 session.chat.push(Message::User("queued B".into())); 1656 session.finalize_last_assistant(); 1657 drop(tx); 1658 session.incoming_tokens = None; 1659 assert!(session.needs_redispatch_after_stream_end()); 1660 1661 // Turn 2: batch redispatch handles both queued messages 1662 let tx2 = make_streaming(&mut session); 1663 session.append_token("response 2"); 1664 session.finalize_last_assistant(); 1665 drop(tx2); 1666 session.incoming_tokens = None; 1667 1668 // No more pending user messages after the assistant response 1669 assert!( 1670 !session.needs_redispatch_after_stream_end(), 1671 "should not need another redispatch when no new messages arrived" 1672 ); 1673 } 1674 1675 /// When a stream ends with an error (no tokens produced), the 1676 /// Error message should prevent infinite redispatch. 1677 #[test] 1678 fn error_prevents_redispatch_loop() { 1679 let mut session = test_session(); 1680 1681 session.chat.push(Message::User("hello".into())); 1682 let tx = make_streaming(&mut session); 1683 1684 // Error arrives (no tokens were sent) 1685 session 1686 .chat 1687 .push(Message::Error("context window exceeded".into())); 1688 1689 // Stream ends 1690 drop(tx); 1691 session.incoming_tokens = None; 1692 1693 assert!( 1694 !session.needs_redispatch_after_stream_end(), 1695 "error should prevent redispatch" 1696 ); 1697 } 1698 1699 /// When the backend returns immediately with no content (e.g. a 1700 /// skill command it can't handle), the dispatched user message is 1701 /// still the last in chat. Without the trailing-count guard this 1702 /// would trigger an infinite redispatch loop. 1703 #[test] 1704 fn empty_response_prevents_redispatch_loop() { 1705 let mut session = test_session(); 1706 1707 session 1708 .chat 1709 .push(Message::User("/refactor something".into())); 1710 let tx = make_streaming(&mut session); 1711 1712 // Backend returns immediately — no tokens, no tools, nothing 1713 session.finalize_last_assistant(); 1714 drop(tx); 1715 session.incoming_tokens = None; 1716 1717 assert!( 1718 !session.needs_redispatch_after_stream_end(), 1719 "should not redispatch already-dispatched messages with empty response" 1720 ); 1721 } 1722 1723 /// Verify chat ordering when queued messages arrive before any 1724 /// tokens, and after tokens, across a full batch lifecycle. 1725 #[test] 1726 fn chat_ordering_with_mixed_timing() { 1727 let mut session = test_session(); 1728 1729 // Turn 1 complete 1730 session.chat.push(Message::User("hello".into())); 1731 session.append_token("hi there"); 1732 session.finalize_last_assistant(); 1733 1734 // User sends new message (single dispatch) 1735 session.chat.push(Message::User("question".into())); 1736 let tx = make_streaming(&mut session); 1737 1738 // Queued BEFORE first token 1739 session.chat.push(Message::User("early queue".into())); 1740 1741 // First token arrives 1742 session.append_token("answer "); 1743 1744 // Queued AFTER first token 1745 session.chat.push(Message::User("late queue".into())); 1746 1747 // More tokens 1748 session.append_token("here"); 1749 1750 // Verify: assistant response should be between dispatched 1751 // user and the queued messages 1752 let types: Vec<String> = session 1753 .chat 1754 .iter() 1755 .map(|m| match m { 1756 Message::User(s) => format!("U:{}", s), 1757 Message::Assistant(a) => format!("A:{}", a.text()), 1758 _ => "?".into(), 1759 }) 1760 .collect(); 1761 1762 // The key constraint: "answer here" must appear after 1763 // "question" and before the queued messages 1764 let answer_pos = types.iter().position(|t| t == "A:answer here").unwrap(); 1765 let question_pos = types.iter().position(|t| t == "U:question").unwrap(); 1766 let early_pos = types.iter().position(|t| t == "U:early queue").unwrap(); 1767 let late_pos = types.iter().position(|t| t == "U:late queue").unwrap(); 1768 1769 assert!( 1770 answer_pos > question_pos, 1771 "answer should come after the dispatched question" 1772 ); 1773 assert!( 1774 early_pos > answer_pos || late_pos > answer_pos, 1775 "at least one queued message should be after the answer" 1776 ); 1777 1778 // Finalize and check redispatch 1779 session.finalize_last_assistant(); 1780 drop(tx); 1781 session.incoming_tokens = None; 1782 assert!(session.needs_redispatch_after_stream_end()); 1783 } 1784 1785 /// Queued indicator detection: helper that mimics what the UI does 1786 /// to find which messages are "queued". 1787 fn find_queued_indices( 1788 chat: &[Message], 1789 is_working: bool, 1790 dispatch_state: DispatchState, 1791 ) -> Vec<usize> { 1792 if !is_working { 1793 return vec![]; 1794 } 1795 let last_non_user = chat.iter().rposition(|m| !matches!(m, Message::User(_))); 1796 let queued_from = match last_non_user { 1797 Some(i) if matches!(chat[i], Message::Assistant(ref m) if m.is_streaming()) => { 1798 let first_trailing = i + 1; 1799 if first_trailing < chat.len() { 1800 Some(first_trailing) 1801 } else { 1802 None 1803 } 1804 } 1805 Some(i) => { 1806 let first_trailing = i + 1; 1807 let skip = dispatch_state.dispatched_count().max(1); 1808 let queued_start = first_trailing + skip; 1809 if queued_start < chat.len() { 1810 Some(queued_start) 1811 } else { 1812 None 1813 } 1814 } 1815 None => None, 1816 }; 1817 match queued_from { 1818 Some(qi) => (qi..chat.len()) 1819 .filter(|&i| matches!(chat[i], Message::User(_))) 1820 .collect(), 1821 None => vec![], 1822 } 1823 } 1824 1825 #[test] 1826 fn queued_indicator_before_first_token() { 1827 // Chat: [...finalized Asst], User("dispatched"), User("queued") 1828 // No streaming assistant yet → dispatched is being processed, 1829 // only "queued" should show the indicator. 1830 let mut session = test_session(); 1831 session.chat.push(Message::User("prev".into())); 1832 session 1833 .chat 1834 .push(Message::Assistant(AssistantMessage::from_text( 1835 "prev reply".into(), 1836 ))); 1837 session.chat.push(Message::User("dispatched".into())); 1838 session.chat.push(Message::User("queued 1".into())); 1839 session.chat.push(Message::User("queued 2".into())); 1840 1841 // Single dispatch 1842 let queued = find_queued_indices( 1843 &session.chat, 1844 true, 1845 DispatchState::AwaitingResponse { count: 1 }, 1846 ); 1847 let queued_texts: Vec<&str> = queued 1848 .iter() 1849 .map(|&i| match &session.chat[i] { 1850 Message::User(s) => s.as_str(), 1851 _ => "?", 1852 }) 1853 .collect(); 1854 assert_eq!( 1855 queued_texts, 1856 vec!["queued 1", "queued 2"], 1857 "dispatched message should not be marked as queued" 1858 ); 1859 } 1860 1861 #[test] 1862 fn queued_indicator_during_streaming() { 1863 // Chat: User("dispatched"), Assistant(streaming), User("queued") 1864 // Streaming assistant separates dispatched from queued. 1865 let mut session = test_session(); 1866 session.chat.push(Message::User("dispatched".into())); 1867 session.append_token("streaming..."); 1868 session.chat.push(Message::User("queued 1".into())); 1869 session.chat.push(Message::User("queued 2".into())); 1870 1871 // Dispatch state doesn't matter here — streaming assistant 1872 // branch doesn't use the dispatched count 1873 let queued = find_queued_indices( 1874 &session.chat, 1875 true, 1876 DispatchState::AwaitingResponse { count: 1 }, 1877 ); 1878 let queued_texts: Vec<&str> = queued 1879 .iter() 1880 .map(|&i| match &session.chat[i] { 1881 Message::User(s) => s.as_str(), 1882 _ => "?", 1883 }) 1884 .collect(); 1885 assert_eq!( 1886 queued_texts, 1887 vec!["queued 1", "queued 2"], 1888 "all user messages after streaming assistant should be queued" 1889 ); 1890 } 1891 1892 #[test] 1893 fn queued_indicator_not_working() { 1894 // When not working, nothing should be marked as queued 1895 let mut session = test_session(); 1896 session.chat.push(Message::User("msg 1".into())); 1897 session.chat.push(Message::User("msg 2".into())); 1898 1899 let queued = find_queued_indices(&session.chat, false, DispatchState::Idle); 1900 assert!( 1901 queued.is_empty(), 1902 "nothing should be queued when not working" 1903 ); 1904 } 1905 1906 #[test] 1907 fn queued_indicator_no_queued_messages() { 1908 // Working but only one user message → nothing queued 1909 let mut session = test_session(); 1910 session 1911 .chat 1912 .push(Message::Assistant(AssistantMessage::from_text( 1913 "prev".into(), 1914 ))); 1915 session.chat.push(Message::User("only one".into())); 1916 1917 let queued = find_queued_indices( 1918 &session.chat, 1919 true, 1920 DispatchState::AwaitingResponse { count: 1 }, 1921 ); 1922 assert!( 1923 queued.is_empty(), 1924 "single dispatched message should not be queued" 1925 ); 1926 } 1927 1928 #[test] 1929 fn queued_indicator_after_tool_call_with_streaming() { 1930 // Chat: User, Asst, ToolCalls, ToolResponse, Asst(streaming), User(queued) 1931 let mut session = test_session(); 1932 session.chat.push(Message::User("do something".into())); 1933 session.append_token("Let me check."); 1934 1935 let tool = 1936 crate::tools::ToolCall::invalid("c1".into(), Some("Read".into()), None, "test".into()); 1937 session.chat.push(Message::ToolCalls(vec![tool])); 1938 session 1939 .chat 1940 .push(Message::ToolResponse(crate::tools::ToolResponse::error( 1941 "c1".into(), 1942 "result".into(), 1943 ))); 1944 1945 // Post-tool tokens create new streaming assistant 1946 session.append_token("Found it."); 1947 session.chat.push(Message::User("queued".into())); 1948 1949 let queued = find_queued_indices( 1950 &session.chat, 1951 true, 1952 DispatchState::AwaitingResponse { count: 1 }, 1953 ); 1954 let queued_texts: Vec<&str> = queued 1955 .iter() 1956 .map(|&i| match &session.chat[i] { 1957 Message::User(s) => s.as_str(), 1958 _ => "?", 1959 }) 1960 .collect(); 1961 assert_eq!(queued_texts, vec!["queued"]); 1962 } 1963 1964 /// Batch dispatch: when 3 messages were dispatched together, 1965 /// none should show "queued" before the first token arrives. 1966 #[test] 1967 fn queued_indicator_batch_dispatch_no_queued() { 1968 let mut session = test_session(); 1969 session 1970 .chat 1971 .push(Message::Assistant(AssistantMessage::from_text( 1972 "prev reply".into(), 1973 ))); 1974 session.chat.push(Message::User("a".into())); 1975 session.chat.push(Message::User("b".into())); 1976 session.chat.push(Message::User("c".into())); 1977 1978 // All 3 were batch-dispatched 1979 let queued = find_queued_indices( 1980 &session.chat, 1981 true, 1982 DispatchState::AwaitingResponse { count: 3 }, 1983 ); 1984 assert!( 1985 queued.is_empty(), 1986 "all 3 messages were dispatched — none should show queued" 1987 ); 1988 } 1989 1990 /// Batch dispatch with new message queued after: 3 dispatched, 1991 /// then 1 more arrives. Only the new one should be "queued". 1992 #[test] 1993 fn queued_indicator_batch_with_new_queued() { 1994 let mut session = test_session(); 1995 session 1996 .chat 1997 .push(Message::Assistant(AssistantMessage::from_text( 1998 "prev reply".into(), 1999 ))); 2000 session.chat.push(Message::User("a".into())); 2001 session.chat.push(Message::User("b".into())); 2002 session.chat.push(Message::User("c".into())); 2003 session.chat.push(Message::User("new queued".into())); 2004 2005 // 3 were dispatched, 1 new arrival 2006 let queued = find_queued_indices( 2007 &session.chat, 2008 true, 2009 DispatchState::AwaitingResponse { count: 3 }, 2010 ); 2011 let queued_texts: Vec<&str> = queued 2012 .iter() 2013 .map(|&i| match &session.chat[i] { 2014 Message::User(s) => s.as_str(), 2015 _ => "?", 2016 }) 2017 .collect(); 2018 assert_eq!( 2019 queued_texts, 2020 vec!["new queued"], 2021 "only the message after the batch should be queued" 2022 ); 2023 } 2024 }