lib.rs (137012B)
1 mod agent_status; 2 mod auto_accept; 3 mod avatar; 4 mod backend; 5 pub mod config; 6 pub mod events; 7 pub mod file_update; 8 mod focus_queue; 9 pub(crate) mod git_status; 10 pub mod ipc; 11 pub(crate) mod mesh; 12 mod messages; 13 mod path_normalize; 14 pub(crate) mod path_utils; 15 mod quaternion; 16 pub mod session; 17 pub mod session_converter; 18 pub mod session_discovery; 19 pub mod session_events; 20 pub mod session_jsonl; 21 pub mod session_loader; 22 pub mod session_reconstructor; 23 mod tools; 24 mod ui; 25 mod update; 26 mod vec3; 27 28 use agent_status::AgentStatus; 29 use backend::{ 30 AiBackend, BackendType, ClaudeBackend, CodexBackend, OpenAiBackend, RemoteOnlyBackend, 31 }; 32 use chrono::{Duration, Local}; 33 use egui_wgpu::RenderState; 34 use enostr::{KeypairUnowned, RelayPool}; 35 use focus_queue::FocusQueue; 36 use nostrdb::{Subscription, Transaction}; 37 use notedeck::{ 38 timed_serializer::TimedSerializer, ui::is_narrow, AppAction, AppContext, AppResponse, DataPath, 39 DataPathType, 40 }; 41 use std::collections::{HashMap, HashSet}; 42 use std::path::{Path, PathBuf}; 43 use std::string::ToString; 44 use std::sync::Arc; 45 use std::time::Instant; 46 47 pub use avatar::DaveAvatar; 48 pub use config::{AiMode, AiProvider, DaveSettings, ModelConfig}; 49 pub use messages::{ 50 AskUserQuestionInput, AssistantMessage, DaveApiResponse, ExecutedTool, Message, 51 PermissionResponse, PermissionResponseType, QuestionAnswer, SessionInfo, SubagentInfo, 52 SubagentStatus, 53 }; 54 pub use quaternion::Quaternion; 55 pub use session::{ChatSession, SessionId, SessionManager}; 56 pub use session_discovery::{discover_sessions, format_relative_time, ResumableSession}; 57 pub use tools::{ 58 PartialToolCall, QueryCall, QueryResponse, Tool, ToolCall, ToolCalls, ToolResponse, 59 ToolResponses, 60 }; 61 pub use ui::{ 62 check_keybindings, AgentScene, DaveAction, DaveResponse, DaveSettingsPanel, DaveUi, 63 DirectoryPicker, DirectoryPickerAction, KeyAction, KeyActionResult, OverlayResult, SceneAction, 64 SceneResponse, SceneViewAction, SendActionResult, SessionListAction, SessionListUi, 65 SessionPicker, SessionPickerAction, SettingsPanelAction, UiActionResult, 66 }; 67 pub use vec3::Vec3; 68 69 /// Default relay URL used for PNS event publishing and subscription. 70 const DEFAULT_PNS_RELAY: &str = "ws://relay.jb55.com/"; 71 72 /// Maximum consecutive negentropy sync rounds before stopping. 73 /// Each round pulls up to the relay's limit (typically 500 events), 74 /// so 20 rounds fetches up to ~10000 recent events. 75 const MAX_NEG_SYNC_ROUNDS: u8 = 20; 76 77 /// Normalize a relay URL to always have a trailing slash. 78 fn normalize_relay_url(url: String) -> String { 79 if url.ends_with('/') { 80 url 81 } else { 82 url + "/" 83 } 84 } 85 86 /// Extract a 32-byte secret key from a keypair. 87 fn secret_key_bytes(keypair: KeypairUnowned<'_>) -> Option<[u8; 32]> { 88 keypair.secret_key.map(|sk| { 89 sk.as_secret_bytes() 90 .try_into() 91 .expect("secret key is 32 bytes") 92 }) 93 } 94 95 /// A pending spawn command waiting to be built and published. 96 struct PendingSpawnCommand { 97 target_host: String, 98 cwd: PathBuf, 99 backend: BackendType, 100 } 101 102 /// Represents which full-screen overlay (if any) is currently active. 103 /// Data-carrying variants hold the state needed for that step in the 104 /// session-creation flow, replacing scattered `pending_*` fields. 105 #[derive(Debug, Clone, PartialEq, Eq, Default)] 106 pub enum DaveOverlay { 107 #[default] 108 None, 109 Settings, 110 HostPicker, 111 DirectoryPicker, 112 /// Backend has been chosen; showing resumable-session list. 113 SessionPicker { 114 backend: BackendType, 115 }, 116 /// Directory chosen; waiting for user to pick a backend. 117 BackendPicker { 118 cwd: PathBuf, 119 }, 120 } 121 122 pub struct Dave { 123 pool: RelayPool, 124 /// AI interaction mode (Chat vs Agentic) 125 ai_mode: AiMode, 126 /// Manages multiple chat sessions 127 session_manager: SessionManager, 128 /// A 3d representation of dave. 129 avatar: Option<DaveAvatar>, 130 /// Shared tools available to all sessions 131 tools: Arc<HashMap<String, Tool>>, 132 /// AI backends keyed by type — multiple may be available simultaneously 133 backends: HashMap<BackendType, Box<dyn AiBackend>>, 134 /// Which agentic backends are available (detected from PATH at startup) 135 available_backends: Vec<BackendType>, 136 /// Model configuration 137 model_config: ModelConfig, 138 /// Whether to show session list on mobile 139 show_session_list: bool, 140 /// User settings 141 settings: DaveSettings, 142 /// Settings panel UI state 143 settings_panel: DaveSettingsPanel, 144 /// RTS-style scene view 145 scene: AgentScene, 146 /// Whether to show scene view (vs classic chat view) 147 show_scene: bool, 148 /// Tracks when first Escape was pressed for interrupt confirmation 149 interrupt_pending_since: Option<Instant>, 150 /// Focus queue for agents needing attention 151 focus_queue: FocusQueue, 152 /// Auto-steal focus state: Disabled, Idle (enabled, nothing pending), 153 /// or Pending (enabled, waiting to fire / retrying). 154 auto_steal: focus_queue::AutoStealState, 155 /// The session ID to return to after processing all NeedsInput items 156 home_session: Option<SessionId>, 157 /// Directory picker for selecting working directory when creating sessions 158 directory_picker: DirectoryPicker, 159 /// Session picker for resuming existing Claude sessions 160 session_picker: SessionPicker, 161 /// Current overlay taking over the UI (if any) 162 active_overlay: DaveOverlay, 163 /// IPC listener for external spawn-agent commands 164 ipc_listener: Option<ipc::IpcListener>, 165 /// Pending archive conversion: (jsonl_path, dave_session_id, claude_session_id). 166 /// Set when resuming a session; processed in update() where AppContext is available. 167 pending_archive_convert: Option<(std::path::PathBuf, SessionId, String)>, 168 /// Waiting for ndb to finish indexing 1988 events so we can load messages. 169 pending_message_load: Option<PendingMessageLoad>, 170 /// Events waiting to be published to relays (queued from non-pool contexts). 171 pending_relay_events: Vec<session_events::BuiltEvent>, 172 /// Whether sessions have been restored from ndb on startup. 173 sessions_restored: bool, 174 /// Remote relay subscription ID for PNS events (kind-1080). 175 /// Used to discover session state events from other devices. 176 pns_relay_sub: Option<String>, 177 /// Local ndb subscription for kind-31988 session state events. 178 /// Fires when new session states are unwrapped from PNS events. 179 session_state_sub: Option<nostrdb::Subscription>, 180 /// Local ndb subscription for kind-31989 session command events. 181 session_command_sub: Option<nostrdb::Subscription>, 182 /// Command UUIDs already processed (dedup for spawn commands). 183 processed_commands: std::collections::HashSet<String>, 184 /// Spawn commands waiting to be built+published in update() where secret key is available. 185 pending_spawn_commands: Vec<PendingSpawnCommand>, 186 /// Permission responses queued for relay publishing (from remote sessions). 187 /// Built and published in the update loop where AppContext is available. 188 pending_perm_responses: Vec<PermissionPublish>, 189 /// Permission mode commands queued for relay publishing (observer → host). 190 pending_mode_commands: Vec<update::ModeCommandPublish>, 191 /// Sessions pending deletion state event publication. 192 /// Populated in delete_session(), drained in the update loop where AppContext is available. 193 pending_deletions: Vec<DeletedSessionInfo>, 194 /// Thread summaries pending processing. Queued by summarize_thread(), 195 /// resolved in update() where AppContext (ndb) is available. 196 pending_summaries: Vec<enostr::NoteId>, 197 /// Local machine hostname, included in session state events. 198 hostname: String, 199 /// PNS relay URL (configurable via DAVE_RELAY env or settings UI). 200 pns_relay_url: String, 201 /// Negentropy sync state for PNS event reconciliation. 202 neg_sync: enostr::negentropy::NegentropySync, 203 /// How many consecutive negentropy sync rounds have completed. 204 /// Reset on startup/reconnect, incremented each time events are found. 205 /// Caps at [`MAX_NEG_SYNC_ROUNDS`] to avoid pulling the entire history. 206 neg_sync_round: u8, 207 /// Persists DaveSettings to dave_settings.json 208 settings_serializer: TimedSerializer<DaveSettings>, 209 } 210 211 use update::PermissionPublish; 212 213 use crate::events::try_process_events_core; 214 215 /// Info captured from a session before deletion, for publishing a "deleted" state event. 216 struct DeletedSessionInfo { 217 claude_session_id: String, 218 title: String, 219 cwd: String, 220 home_dir: String, 221 backend: BackendType, 222 } 223 224 /// Subscription waiting for ndb to index 1988 conversation events. 225 struct PendingMessageLoad { 226 /// ndb subscription for kind-1988 events matching the session 227 sub: Subscription, 228 /// Dave's internal session ID 229 dave_session_id: SessionId, 230 /// Claude session ID (the `d` tag value) 231 claude_session_id: String, 232 } 233 234 /// PNS-wrap an event and ingest the 1080 wrapper into ndb. 235 /// 236 /// ndb's `process_pns` will unwrap it internally, making the inner 237 /// event queryable. This ensures 1080 events exist in ndb for relay sync. 238 fn pns_ingest(ndb: &nostrdb::Ndb, event_json: &str, secret_key: &[u8; 32]) { 239 let pns_keys = enostr::pns::derive_pns_keys(secret_key); 240 match session_events::wrap_pns(event_json, &pns_keys) { 241 Ok(pns_json) => { 242 // wrap_pns returns bare {…} JSON; use relay format 243 // ["EVENT", "subid", {…}] so ndb triggers PNS unwrapping 244 let wrapped = format!("[\"EVENT\", \"_pns\", {}]", pns_json); 245 if let Err(e) = ndb.process_event(&wrapped) { 246 tracing::warn!("failed to ingest PNS event: {:?}", e); 247 } 248 } 249 Err(e) => { 250 tracing::warn!("failed to PNS-wrap for local ingest: {}", e); 251 } 252 } 253 } 254 255 /// Ingest a freshly-built event: PNS-wrap into local ndb and push to the 256 /// relay publish queue. Logs on success with `event_desc` and on failure. 257 /// Returns `true` if the event was queued successfully. 258 fn queue_built_event( 259 result: Result<session_events::BuiltEvent, session_events::EventBuildError>, 260 event_desc: &str, 261 ndb: &nostrdb::Ndb, 262 sk: &[u8; 32], 263 queue: &mut Vec<session_events::BuiltEvent>, 264 ) -> bool { 265 match result { 266 Ok(evt) => { 267 tracing::info!("{}", event_desc); 268 pns_ingest(ndb, &evt.note_json, sk); 269 queue.push(evt); 270 true 271 } 272 Err(e) => { 273 tracing::error!("failed to build event ({}): {}", event_desc, e); 274 false 275 } 276 } 277 } 278 279 /// Build and ingest a live kind-1988 event into ndb (via PNS wrapping). 280 /// 281 /// Extracts cwd and session ID from the session's agentic data, 282 /// builds the event, PNS-wraps and ingests it, and returns the event 283 /// for relay publishing. 284 fn ingest_live_event( 285 session: &mut ChatSession, 286 ndb: &nostrdb::Ndb, 287 secret_key: &[u8; 32], 288 content: &str, 289 role: &str, 290 tool_id: Option<&str>, 291 tool_name: Option<&str>, 292 ) -> Option<session_events::BuiltEvent> { 293 let agentic = session.agentic.as_mut()?; 294 let session_id = agentic.event_session_id().to_string(); 295 let cwd = agentic.cwd.to_str(); 296 297 match session_events::build_live_event( 298 content, 299 role, 300 &session_id, 301 cwd, 302 tool_id, 303 tool_name, 304 &mut agentic.live_threading, 305 secret_key, 306 ) { 307 Ok(event) => { 308 // Mark as seen so we don't double-process when it echoes back from the relay 309 agentic.seen_note_ids.insert(event.note_id); 310 pns_ingest(ndb, &event.note_json, secret_key); 311 Some(event) 312 } 313 Err(e) => { 314 tracing::warn!("failed to build live event: {}", e); 315 None 316 } 317 } 318 } 319 320 /// Calculate an anonymous user_id from a keypair 321 /// Look up a backend by type from the map, falling back to Remote. 322 fn get_backend( 323 backends: &HashMap<BackendType, Box<dyn AiBackend>>, 324 bt: BackendType, 325 ) -> &dyn AiBackend { 326 backends 327 .get(&bt) 328 .or_else(|| backends.get(&BackendType::Remote)) 329 .unwrap() 330 .as_ref() 331 } 332 333 fn calculate_user_id(keypair: KeypairUnowned) -> String { 334 use sha2::{Digest, Sha256}; 335 // pubkeys have degraded privacy, don't do that 336 let key_input = keypair 337 .secret_key 338 .map(|sk| sk.as_secret_bytes()) 339 .unwrap_or(keypair.pubkey.bytes()); 340 let hex_key = hex::encode(key_input); 341 let input = format!("{hex_key}notedeck_dave_user_id"); 342 hex::encode(Sha256::digest(input)) 343 } 344 345 impl Dave { 346 pub fn avatar_mut(&mut self) -> Option<&mut DaveAvatar> { 347 self.avatar.as_mut() 348 } 349 350 fn _system_prompt() -> Message { 351 let now = Local::now(); 352 let yesterday = now - Duration::hours(24); 353 let date = now.format("%Y-%m-%d %H:%M:%S"); 354 let timestamp = now.timestamp(); 355 let yesterday_timestamp = yesterday.timestamp(); 356 357 Message::System(format!( 358 r#" 359 You are an AI agent for the nostr protocol called Dave, created by Damus. nostr is a decentralized social media and internet communications protocol. You are embedded in a nostr browser called 'Damus Notedeck'. 360 361 - The current date is {date} ({timestamp} unix timestamp if needed for queries). 362 363 - Yesterday (-24hrs) was {yesterday_timestamp}. You can use this in combination with `since` queries for pulling notes for summarizing notes the user might have missed while they were away. 364 365 # Response Guidelines 366 367 - You *MUST* call the present_notes tool with a list of comma-separated note id references when referring to notes so that the UI can display them. Do *NOT* include note id references in the text response, but you *SHOULD* use ^1, ^2, etc to reference note indices passed to present_notes. 368 - When a user asks for a digest instead of specific query terms, make sure to include both since and until to pull notes for the correct range. 369 - When tasked with open-ended queries such as looking for interesting notes or summarizing the day, make sure to add enough notes to the context (limit: 100-200) so that it returns enough data for summarization. 370 "# 371 )) 372 } 373 374 pub fn new( 375 render_state: Option<&RenderState>, 376 ndb: nostrdb::Ndb, 377 ctx: egui::Context, 378 path: &DataPath, 379 ) -> Self { 380 let settings_serializer = 381 TimedSerializer::new(path, DataPathType::Setting, "dave_settings.json".to_owned()); 382 383 // Load saved settings, falling back to env-var-based defaults 384 let (model_config, settings) = if let Some(saved_settings) = settings_serializer.get_item() 385 { 386 let config = ModelConfig::from_settings(&saved_settings); 387 (config, saved_settings) 388 } else { 389 let config = ModelConfig::default(); 390 let settings = DaveSettings::from_model_config(&config); 391 (config, settings) 392 }; 393 394 // Determine AI mode from backend type 395 let ai_mode = model_config.ai_mode(); 396 397 // Detect available agentic backends from PATH 398 let available_backends = config::available_agentic_backends(); 399 tracing::info!( 400 "detected {} agentic backends: {:?}", 401 available_backends.len(), 402 available_backends 403 ); 404 405 // Create backends for all available agentic CLIs + the configured primary 406 let mut backends: HashMap<BackendType, Box<dyn AiBackend>> = HashMap::new(); 407 408 for &bt in &available_backends { 409 match bt { 410 BackendType::Claude => { 411 backends.insert(BackendType::Claude, Box::new(ClaudeBackend::new())); 412 } 413 BackendType::Codex => { 414 backends.insert( 415 BackendType::Codex, 416 Box::new(CodexBackend::new( 417 std::env::var("CODEX_BINARY").unwrap_or_else(|_| "codex".to_string()), 418 )), 419 ); 420 } 421 _ => {} 422 } 423 } 424 425 // If the configured backend is OpenAI and not yet created, add it 426 if model_config.backend == BackendType::OpenAI { 427 use async_openai::Client; 428 let client = Client::with_config(model_config.to_api()); 429 backends.insert( 430 BackendType::OpenAI, 431 Box::new(OpenAiBackend::new(client, ndb.clone())), 432 ); 433 } 434 435 // Remote backend is always available for discovered sessions 436 backends.insert(BackendType::Remote, Box::new(RemoteOnlyBackend)); 437 438 let avatar = render_state.map(DaveAvatar::new); 439 let mut tools: HashMap<String, Tool> = HashMap::new(); 440 for tool in tools::dave_tools() { 441 tools.insert(tool.name().to_string(), tool); 442 } 443 444 let pns_relay_url = normalize_relay_url( 445 model_config 446 .pns_relay 447 .clone() 448 .unwrap_or_else(|| DEFAULT_PNS_RELAY.to_string()), 449 ); 450 451 let directory_picker = DirectoryPicker::new(); 452 453 // Create IPC listener for external spawn-agent commands 454 let ipc_listener = ipc::create_listener(ctx); 455 456 let hostname = gethostname::gethostname().to_string_lossy().into_owned(); 457 458 // In Chat mode, create a default session immediately and skip directory picker 459 // In Agentic mode, show directory picker on startup 460 let (session_manager, active_overlay) = match ai_mode { 461 AiMode::Chat => { 462 let mut manager = SessionManager::new(); 463 // Create a default session with current directory 464 let sid = manager.new_session( 465 std::env::current_dir().unwrap_or_default(), 466 ai_mode, 467 model_config.backend, 468 ); 469 if let Some(session) = manager.get_mut(sid) { 470 session.details.hostname = hostname.clone(); 471 } 472 manager.rebuild_host_groups(); 473 (manager, DaveOverlay::None) 474 } 475 AiMode::Agentic => (SessionManager::new(), DaveOverlay::DirectoryPicker), 476 }; 477 478 let pool = RelayPool::new(); 479 480 Dave { 481 pool, 482 ai_mode, 483 backends, 484 available_backends, 485 avatar, 486 session_manager, 487 tools: Arc::new(tools), 488 model_config, 489 show_session_list: false, 490 settings, 491 settings_panel: DaveSettingsPanel::new(), 492 scene: AgentScene::new(), 493 show_scene: false, // Default to list view 494 interrupt_pending_since: None, 495 focus_queue: FocusQueue::new(), 496 auto_steal: focus_queue::AutoStealState::Disabled, 497 home_session: None, 498 directory_picker, 499 session_picker: SessionPicker::new(), 500 active_overlay, 501 ipc_listener, 502 pending_archive_convert: None, 503 pending_message_load: None, 504 pending_relay_events: Vec::new(), 505 sessions_restored: false, 506 pns_relay_sub: None, 507 session_state_sub: None, 508 session_command_sub: None, 509 processed_commands: std::collections::HashSet::new(), 510 pending_spawn_commands: Vec::new(), 511 pending_perm_responses: Vec::new(), 512 pending_mode_commands: Vec::new(), 513 pending_deletions: Vec::new(), 514 pending_summaries: Vec::new(), 515 hostname, 516 pns_relay_url, 517 neg_sync: enostr::negentropy::NegentropySync::new(), 518 neg_sync_round: 0, 519 settings_serializer, 520 } 521 } 522 523 /// Get current settings for persistence 524 pub fn settings(&self) -> &DaveSettings { 525 &self.settings 526 } 527 528 /// Apply new settings and persist to disk. 529 /// Note: Provider changes require app restart to take effect. 530 pub fn apply_settings(&mut self, settings: DaveSettings) { 531 self.model_config = ModelConfig::from_settings(&settings); 532 self.pns_relay_url = normalize_relay_url( 533 settings 534 .pns_relay 535 .clone() 536 .unwrap_or_else(|| DEFAULT_PNS_RELAY.to_string()), 537 ); 538 self.settings_serializer.try_save(settings.clone()); 539 self.settings = settings; 540 } 541 542 /// Queue a thread summary request. The thread is fetched and formatted 543 /// in update() where AppContext (ndb) is available. 544 pub fn summarize_thread(&mut self, note_id: enostr::NoteId) { 545 self.pending_summaries.push(note_id); 546 } 547 548 /// Fetch the thread from ndb, format it, and create a session with the prompt. 549 fn build_summary_session( 550 &mut self, 551 ndb: &nostrdb::Ndb, 552 note_id: &enostr::NoteId, 553 ) -> Option<SessionId> { 554 let txn = Transaction::new(ndb).ok()?; 555 556 // Resolve to the root note of the thread 557 let clicked_note = ndb.get_note_by_id(&txn, note_id.bytes()).ok()?; 558 let root_id = nostrdb::NoteReply::new(clicked_note.tags()) 559 .root() 560 .map(|r| *r.id) 561 .unwrap_or(*note_id.bytes()); 562 563 let root_note = ndb.get_note_by_id(&txn, &root_id).ok()?; 564 let root_simple = tools::note_to_simple(&txn, ndb, &root_note); 565 566 // Fetch all replies referencing the root note 567 let filter = nostrdb::Filter::new().kinds([1]).event(&root_id).build(); 568 569 let replies = ndb.query(&txn, &[filter], 500).ok().unwrap_or_default(); 570 571 let mut simple_notes = vec![root_simple]; 572 for result in &replies { 573 if let Ok(note) = ndb.get_note_by_key(&txn, result.note_key) { 574 simple_notes.push(tools::note_to_simple(&txn, ndb, ¬e)); 575 } 576 } 577 578 let thread_json = tools::format_simple_notes_json(&simple_notes); 579 let system = format!( 580 "You are summarizing a nostr thread. \ 581 Here is the thread data:\n\n{}\n\n\ 582 When referencing specific notes in your summary, call the \ 583 present_notes tool with their note_ids so the UI can display them inline.", 584 thread_json 585 ); 586 587 let cwd = std::env::current_dir().unwrap_or_default(); 588 let id = update::create_session_with_cwd( 589 &mut self.session_manager, 590 &mut self.directory_picker, 591 &mut self.scene, 592 self.show_scene, 593 AiMode::Chat, 594 cwd, 595 &self.hostname, 596 self.model_config.backend, 597 None, 598 ); 599 600 if let Some(session) = self.session_manager.get_mut(id) { 601 session.chat.push(Message::System(system)); 602 603 // Show the root note inline so the user can see what's being summarized 604 let present = tools::ToolCall::new( 605 "summarize-thread".to_string(), 606 tools::ToolCalls::PresentNotes(tools::PresentNotesCall { 607 note_ids: vec![enostr::NoteId::new(root_id)], 608 }), 609 ); 610 session.chat.push(Message::ToolCalls(vec![present])); 611 612 session.chat.push(Message::User( 613 "Summarize this thread concisely.".to_string(), 614 )); 615 session.update_title_from_last_message(); 616 } 617 618 Some(id) 619 } 620 621 /// Process incoming tokens from the ai backend for ALL sessions. 622 /// Returns (sessions needing tool responses, events to publish to relays). 623 fn process_events( 624 &mut self, 625 app_ctx: &AppContext, 626 ) -> (HashSet<SessionId>, Vec<session_events::BuiltEvent>) { 627 // Track which sessions need to send tool responses 628 let mut needs_send: HashSet<SessionId> = HashSet::new(); 629 let mut events_to_publish: Vec<session_events::BuiltEvent> = Vec::new(); 630 let active_id = self.session_manager.active_id(); 631 632 // Extract secret key once for live event generation 633 let secret_key = secret_key_bytes(app_ctx.accounts.get_selected_account().keypair()); 634 635 // Get all session IDs to process 636 let session_ids = self.session_manager.session_ids(); 637 638 for session_id in session_ids { 639 // Take the receiver out to avoid borrow conflicts 640 let recvr = { 641 let Some(session) = self.session_manager.get_mut(session_id) else { 642 continue; 643 }; 644 session.incoming_tokens.take() 645 }; 646 647 let Some(recvr) = recvr else { 648 continue; 649 }; 650 651 while let Ok(res) = recvr.try_recv() { 652 // Nudge avatar only for active session 653 if active_id == Some(session_id) { 654 if let Some(avatar) = &mut self.avatar { 655 avatar.random_nudge(); 656 } 657 } 658 659 let Some(session) = self.session_manager.get_mut(session_id) else { 660 break; 661 }; 662 663 // Determine the live event to publish for this response. 664 // Centralised here so every response type that needs relay 665 // propagation is handled in one place. 666 let live_event: Option<(String, &str, Option<&str>)> = match &res { 667 DaveApiResponse::Failed(err) => Some((err.clone(), "error", None)), 668 DaveApiResponse::ToolResult(result) => Some(( 669 format!("{}: {}", result.tool_name, result.summary), 670 "tool_result", 671 Some(result.tool_name.as_str()), 672 )), 673 DaveApiResponse::CompactionStarted => { 674 Some((String::new(), "compaction_started", None)) 675 } 676 DaveApiResponse::CompactionComplete(info) => { 677 Some((info.pre_tokens.to_string(), "compaction_complete", None)) 678 } 679 // PermissionRequest has custom event building (below). 680 // Token, ToolCalls, SessionInfo, Subagent* don't publish. 681 _ => None, 682 }; 683 684 if let Some((content, role, tool_name)) = live_event { 685 if let Some(sk) = &secret_key { 686 if let Some(evt) = ingest_live_event( 687 session, 688 app_ctx.ndb, 689 sk, 690 &content, 691 role, 692 None, 693 tool_name, 694 ) { 695 events_to_publish.push(evt); 696 } 697 } 698 } 699 700 // Backend produced real content — transition dispatch 701 // state so redispatch knows the backend consumed our 702 // messages (AwaitingResponse → Streaming). 703 if !matches!( 704 res, 705 DaveApiResponse::SessionInfo(_) 706 | DaveApiResponse::CompactionStarted 707 | DaveApiResponse::CompactionComplete(_) 708 | DaveApiResponse::QueryComplete(_) 709 ) { 710 session.dispatch_state.backend_responded(); 711 } 712 713 match res { 714 DaveApiResponse::Failed(ref err) => { 715 session.chat.push(Message::Error(err.to_string())); 716 } 717 DaveApiResponse::Token(token) => { 718 session.append_token(&token); 719 } 720 DaveApiResponse::ToolCalls(toolcalls) => { 721 if handle_tool_calls(session, &toolcalls, app_ctx.ndb) { 722 needs_send.insert(session_id); 723 } 724 } 725 DaveApiResponse::PermissionRequest(pending) => { 726 handle_permission_request( 727 session, 728 pending, 729 &secret_key, 730 app_ctx.ndb, 731 &mut events_to_publish, 732 ); 733 } 734 DaveApiResponse::ToolResult(result) => { 735 handle_tool_result(session, result); 736 } 737 DaveApiResponse::SessionInfo(info) => { 738 handle_session_info(session, info, app_ctx.ndb); 739 } 740 DaveApiResponse::SubagentSpawned(subagent) => { 741 handle_subagent_spawned(session, subagent); 742 } 743 DaveApiResponse::SubagentOutput { task_id, output } => { 744 session.update_subagent_output(&task_id, &output); 745 } 746 DaveApiResponse::SubagentCompleted { task_id, result } => { 747 session.complete_subagent(&task_id, &result); 748 } 749 DaveApiResponse::CompactionStarted => { 750 if let Some(agentic) = &mut session.agentic { 751 agentic.is_compacting = true; 752 } 753 } 754 DaveApiResponse::CompactionComplete(info) => { 755 handle_compaction_complete(session, session_id, info); 756 } 757 DaveApiResponse::QueryComplete(info) => { 758 handle_query_complete(session, info); 759 } 760 } 761 } 762 763 // Check if channel is disconnected (stream ended) 764 match recvr.try_recv() { 765 Err(std::sync::mpsc::TryRecvError::Disconnected) => { 766 if let Some(session) = self.session_manager.get_mut(session_id) { 767 handle_stream_end( 768 session, 769 session_id, 770 &secret_key, 771 app_ctx.ndb, 772 &mut events_to_publish, 773 &mut needs_send, 774 ); 775 } 776 } 777 _ => { 778 // Channel still open, put receiver back 779 if let Some(session) = self.session_manager.get_mut(session_id) { 780 session.incoming_tokens = Some(recvr); 781 } 782 } 783 } 784 } 785 786 (needs_send, events_to_publish) 787 } 788 789 fn ui(&mut self, app_ctx: &mut AppContext, ui: &mut egui::Ui) -> DaveResponse { 790 // Check overlays first — take ownership so we can call &mut self 791 // methods freely. Put the variant back if the overlay stays open. 792 let overlay = std::mem::take(&mut self.active_overlay); 793 match overlay { 794 DaveOverlay::Settings => { 795 match ui::settings_overlay_ui(&mut self.settings_panel, &self.settings, ui) { 796 OverlayResult::ApplySettings(new_settings) => { 797 self.apply_settings(new_settings.clone()); 798 return DaveResponse::new(DaveAction::UpdateSettings(new_settings)); 799 } 800 OverlayResult::Close => {} 801 _ => { 802 self.active_overlay = DaveOverlay::Settings; 803 } 804 } 805 return DaveResponse::default(); 806 } 807 DaveOverlay::HostPicker => { 808 let has_sessions = !self.session_manager.is_empty(); 809 let known_hosts = self.known_remote_hosts(); 810 match ui::host_picker_overlay_ui(&self.hostname, &known_hosts, has_sessions, ui) { 811 OverlayResult::HostSelected(host) => { 812 self.directory_picker.target_host = host; 813 self.active_overlay = DaveOverlay::DirectoryPicker; 814 } 815 OverlayResult::Close => {} 816 _ => { 817 self.active_overlay = DaveOverlay::HostPicker; 818 } 819 } 820 return DaveResponse::default(); 821 } 822 DaveOverlay::DirectoryPicker => { 823 let has_sessions = !self.session_manager.is_empty(); 824 match ui::directory_picker_overlay_ui(&mut self.directory_picker, has_sessions, ui) 825 { 826 OverlayResult::DirectorySelected(path) => { 827 if let Some(target_host) = self.directory_picker.target_host.take() { 828 tracing::info!( 829 "remote directory selected: {:?} on {}", 830 path, 831 target_host 832 ); 833 self.queue_spawn_command( 834 &target_host, 835 &path, 836 self.model_config.backend, 837 ); 838 } else { 839 tracing::info!("directory selected: {:?}", path); 840 self.create_or_pick_backend(path); 841 } 842 } 843 OverlayResult::Close => { 844 self.directory_picker.target_host = None; 845 } 846 _ => { 847 self.active_overlay = DaveOverlay::DirectoryPicker; 848 } 849 } 850 return DaveResponse::default(); 851 } 852 DaveOverlay::SessionPicker { backend } => { 853 match ui::session_picker_overlay_ui(&mut self.session_picker, ui) { 854 OverlayResult::ResumeSession { 855 cwd, 856 session_id, 857 title, 858 file_path, 859 } => { 860 // Resumed sessions are always Claude (discovered from JSONL) 861 let claude_session_id = session_id.clone(); 862 let sid = self.create_resumed_session_with_cwd( 863 cwd, 864 session_id, 865 title, 866 BackendType::Claude, 867 ); 868 self.pending_archive_convert = Some((file_path, sid, claude_session_id)); 869 self.session_picker.close(); 870 } 871 OverlayResult::NewSession { cwd } => { 872 tracing::info!( 873 "new session from session picker: {:?} (backend: {:?})", 874 cwd, 875 backend 876 ); 877 self.session_picker.close(); 878 self.create_session_with_cwd(cwd, backend); 879 } 880 OverlayResult::BackToDirectoryPicker => { 881 self.session_picker.close(); 882 self.active_overlay = DaveOverlay::DirectoryPicker; 883 } 884 _ => { 885 self.active_overlay = DaveOverlay::SessionPicker { backend }; 886 } 887 } 888 return DaveResponse::default(); 889 } 890 DaveOverlay::BackendPicker { cwd } => { 891 if let Some(bt) = ui::backend_picker_overlay_ui(&self.available_backends, ui) { 892 tracing::info!("backend selected: {:?}", bt); 893 self.create_or_resume_session(cwd, bt); 894 } else { 895 self.active_overlay = DaveOverlay::BackendPicker { cwd }; 896 } 897 return DaveResponse::default(); 898 } 899 DaveOverlay::None => {} 900 } 901 902 // Normal routing 903 if is_narrow(ui.ctx()) { 904 self.narrow_ui(app_ctx, ui) 905 } else if self.show_scene { 906 self.scene_ui(app_ctx, ui) 907 } else { 908 self.desktop_ui(app_ctx, ui) 909 } 910 } 911 912 /// Scene view with RTS-style agent visualization and chat side panel 913 fn scene_ui(&mut self, app_ctx: &mut AppContext, ui: &mut egui::Ui) -> DaveResponse { 914 let is_interrupt_pending = self.is_interrupt_pending(); 915 let (dave_response, view_action) = ui::scene_ui( 916 &mut self.session_manager, 917 &mut self.scene, 918 &mut self.focus_queue, 919 &self.model_config, 920 is_interrupt_pending, 921 self.auto_steal.is_enabled(), 922 app_ctx, 923 ui, 924 ); 925 926 // Handle view actions 927 match view_action { 928 SceneViewAction::ToggleToListView => { 929 self.show_scene = false; 930 } 931 SceneViewAction::SpawnAgent => { 932 return DaveResponse::new(DaveAction::NewChat); 933 } 934 SceneViewAction::DeleteSelected(ids) => { 935 for id in ids { 936 self.delete_session(id); 937 } 938 if let Some(session) = self.session_manager.sessions_ordered().first() { 939 self.scene.select(session.id); 940 } else { 941 self.scene.clear_selection(); 942 } 943 } 944 SceneViewAction::None => {} 945 } 946 947 dave_response 948 } 949 950 /// Desktop layout with sidebar for session list 951 fn desktop_ui(&mut self, app_ctx: &mut AppContext, ui: &mut egui::Ui) -> DaveResponse { 952 let is_interrupt_pending = self.is_interrupt_pending(); 953 let (chat_response, session_action, toggle_scene) = ui::desktop_ui( 954 &mut self.session_manager, 955 &self.focus_queue, 956 &self.model_config, 957 is_interrupt_pending, 958 self.auto_steal.is_enabled(), 959 app_ctx, 960 ui, 961 ); 962 963 if toggle_scene { 964 self.show_scene = true; 965 } 966 967 if let Some(action) = session_action { 968 match action { 969 SessionListAction::NewSession => return DaveResponse::new(DaveAction::NewChat), 970 SessionListAction::SwitchTo(id) => { 971 self.session_manager.switch_to(id); 972 self.focus_queue.dequeue(id); 973 } 974 SessionListAction::Delete(id) => { 975 self.delete_session(id); 976 } 977 SessionListAction::Rename(id, new_title) => { 978 self.rename_session(id, new_title); 979 } 980 SessionListAction::DismissDone(id) => { 981 self.focus_queue.dequeue_done(id); 982 if let Some(session) = self.session_manager.get_mut(id) { 983 if session.indicator == Some(focus_queue::FocusPriority::Done) { 984 session.indicator = None; 985 session.state_dirty = true; 986 } 987 } 988 } 989 } 990 } 991 992 chat_response 993 } 994 995 /// Narrow/mobile layout - shows either session list or chat 996 fn narrow_ui(&mut self, app_ctx: &mut AppContext, ui: &mut egui::Ui) -> DaveResponse { 997 let is_interrupt_pending = self.is_interrupt_pending(); 998 let (dave_response, session_action) = ui::narrow_ui( 999 &mut self.session_manager, 1000 &self.focus_queue, 1001 &self.model_config, 1002 is_interrupt_pending, 1003 self.auto_steal.is_enabled(), 1004 self.show_session_list, 1005 app_ctx, 1006 ui, 1007 ); 1008 1009 if let Some(action) = session_action { 1010 match action { 1011 SessionListAction::NewSession => { 1012 self.handle_new_chat(); 1013 self.show_session_list = false; 1014 } 1015 SessionListAction::SwitchTo(id) => { 1016 self.session_manager.switch_to(id); 1017 self.focus_queue.dequeue(id); 1018 self.show_session_list = false; 1019 } 1020 SessionListAction::Delete(id) => { 1021 self.delete_session(id); 1022 } 1023 SessionListAction::Rename(id, new_title) => { 1024 self.rename_session(id, new_title); 1025 } 1026 SessionListAction::DismissDone(id) => { 1027 self.focus_queue.dequeue_done(id); 1028 if let Some(session) = self.session_manager.get_mut(id) { 1029 if session.indicator == Some(focus_queue::FocusPriority::Done) { 1030 session.indicator = None; 1031 session.state_dirty = true; 1032 } 1033 } 1034 } 1035 } 1036 } 1037 1038 dave_response 1039 } 1040 1041 fn handle_new_chat(&mut self) { 1042 match self.ai_mode { 1043 AiMode::Chat => { 1044 // In chat mode, create a session directly without the directory picker 1045 let cwd = std::env::current_dir().unwrap_or_default(); 1046 self.create_session_with_cwd(cwd, self.model_config.backend); 1047 } 1048 AiMode::Agentic => { 1049 // If remote hosts are known, show host picker first 1050 if !self.known_remote_hosts().is_empty() { 1051 self.active_overlay = DaveOverlay::HostPicker; 1052 } else { 1053 self.directory_picker.target_host = None; 1054 self.active_overlay = DaveOverlay::DirectoryPicker; 1055 } 1056 } 1057 } 1058 } 1059 1060 /// Collect remote hostnames from session host_groups and directory picker's 1061 /// event-sourced paths. Excludes the local hostname. 1062 fn known_remote_hosts(&self) -> Vec<String> { 1063 let mut hosts: Vec<String> = Vec::new(); 1064 1065 // From active session groups 1066 for (hostname, _) in self.session_manager.host_groups() { 1067 if hostname != &self.hostname && !hosts.contains(hostname) { 1068 hosts.push(hostname.clone()); 1069 } 1070 } 1071 1072 // From event-sourced paths (may include hosts with no active sessions) 1073 for hostname in self.directory_picker.host_recent_paths.keys() { 1074 if hostname != &self.hostname && !hosts.contains(hostname) { 1075 hosts.push(hostname.clone()); 1076 } 1077 } 1078 1079 hosts.sort(); 1080 hosts 1081 } 1082 1083 /// Create a new session with the given cwd (called after directory picker selection) 1084 fn create_session_with_cwd(&mut self, cwd: PathBuf, backend_type: BackendType) { 1085 update::create_session_with_cwd( 1086 &mut self.session_manager, 1087 &mut self.directory_picker, 1088 &mut self.scene, 1089 self.show_scene, 1090 self.ai_mode, 1091 cwd, 1092 &self.hostname, 1093 backend_type, 1094 None, 1095 ); 1096 } 1097 1098 /// Create a new session that resumes an existing Claude conversation 1099 fn create_resumed_session_with_cwd( 1100 &mut self, 1101 cwd: PathBuf, 1102 resume_session_id: String, 1103 title: String, 1104 backend_type: BackendType, 1105 ) -> SessionId { 1106 update::create_resumed_session_with_cwd( 1107 &mut self.session_manager, 1108 &mut self.directory_picker, 1109 &mut self.scene, 1110 self.show_scene, 1111 self.ai_mode, 1112 cwd, 1113 resume_session_id, 1114 title, 1115 &self.hostname, 1116 backend_type, 1117 ) 1118 } 1119 1120 /// Clone the active agent, creating a new session with the same working directory 1121 fn clone_active_agent(&mut self) { 1122 let Some(active) = self.session_manager.get_active() else { 1123 return; 1124 }; 1125 1126 // If the active session is remote, send a spawn command to its host 1127 if active.is_remote() { 1128 if let Some(cwd) = active.cwd().cloned() { 1129 let host = active.details.hostname.clone(); 1130 let backend = active.backend_type; 1131 self.queue_spawn_command(&host, &cwd, backend); 1132 return; 1133 } 1134 } 1135 1136 update::clone_active_agent( 1137 &mut self.session_manager, 1138 &mut self.directory_picker, 1139 &mut self.scene, 1140 self.show_scene, 1141 self.ai_mode, 1142 &self.hostname, 1143 ); 1144 } 1145 1146 /// Poll for IPC spawn-agent commands from external tools 1147 fn poll_ipc_commands(&mut self) { 1148 let Some(listener) = self.ipc_listener.as_ref() else { 1149 return; 1150 }; 1151 1152 // Drain all pending connections (non-blocking) 1153 while let Some(mut pending) = listener.try_recv() { 1154 // Create the session and get its ID 1155 let id = self.session_manager.new_session( 1156 pending.cwd.clone(), 1157 self.ai_mode, 1158 self.model_config.backend, 1159 ); 1160 self.directory_picker.add_recent(pending.cwd); 1161 1162 // Focus on new session 1163 if let Some(session) = self.session_manager.get_mut(id) { 1164 session.details.hostname = self.hostname.clone(); 1165 session.focus_requested = true; 1166 if self.show_scene { 1167 self.scene.select(id); 1168 if let Some(agentic) = &session.agentic { 1169 self.scene.focus_on(agentic.scene_position); 1170 } 1171 } 1172 } 1173 self.session_manager.rebuild_host_groups(); 1174 1175 // Close directory picker if open 1176 if matches!(self.active_overlay, DaveOverlay::DirectoryPicker) { 1177 self.active_overlay = DaveOverlay::None; 1178 } 1179 1180 // Send success response back to the client 1181 #[cfg(unix)] 1182 { 1183 let response = ipc::SpawnResponse::ok(id); 1184 let _ = ipc::send_response(&mut pending.stream, &response); 1185 } 1186 1187 tracing::info!("Spawned agent via IPC (session {})", id); 1188 } 1189 } 1190 1191 /// Poll for remote conversation actions arriving via nostr relays. 1192 /// 1193 /// Dispatches kind-1988 events by `role` tag: 1194 /// - `permission_response`: route through oneshot channel (first-response-wins) 1195 /// - `set_permission_mode`: apply mode change locally 1196 /// 1197 /// Returns (backend_session_id, backend_type, mode) tuples for mode changes 1198 /// that need to be applied to the local CLI backend. 1199 fn poll_remote_conversation_actions( 1200 &mut self, 1201 ndb: &nostrdb::Ndb, 1202 ) -> Vec<(String, BackendType, claude_agent_sdk_rs::PermissionMode)> { 1203 let mut mode_applies = Vec::new(); 1204 let session_ids = self.session_manager.session_ids(); 1205 for session_id in session_ids { 1206 let Some(session) = self.session_manager.get_mut(session_id) else { 1207 continue; 1208 }; 1209 // Only local sessions poll for remote actions 1210 if session.is_remote() { 1211 continue; 1212 } 1213 let Some(agentic) = &mut session.agentic else { 1214 continue; 1215 }; 1216 let Some(sub) = agentic.conversation_action_sub else { 1217 continue; 1218 }; 1219 1220 let note_keys = ndb.poll_for_notes(sub, 64); 1221 if note_keys.is_empty() { 1222 continue; 1223 } 1224 1225 let txn = match Transaction::new(ndb) { 1226 Ok(txn) => txn, 1227 Err(_) => continue, 1228 }; 1229 1230 for key in note_keys { 1231 let Ok(note) = ndb.get_note_by_key(&txn, key) else { 1232 continue; 1233 }; 1234 1235 match session_events::get_tag_value(¬e, "role") { 1236 Some("permission_response") => { 1237 handle_remote_permission_response(¬e, agentic, &mut session.chat); 1238 } 1239 Some("set_permission_mode") => { 1240 let content = note.content(); 1241 let mode_str = match serde_json::from_str::<serde_json::Value>(content) { 1242 Ok(v) => v 1243 .get("mode") 1244 .and_then(|m| m.as_str()) 1245 .unwrap_or("default") 1246 .to_string(), 1247 Err(_) => continue, 1248 }; 1249 1250 let new_mode = crate::session::permission_mode_from_str(&mode_str); 1251 agentic.permission_mode = new_mode; 1252 session.state_dirty = true; 1253 1254 mode_applies.push(( 1255 format!("dave-session-{}", session_id), 1256 session.backend_type, 1257 new_mode, 1258 )); 1259 1260 tracing::info!( 1261 "remote command: set permission mode to {:?} for session {}", 1262 new_mode, 1263 session_id, 1264 ); 1265 } 1266 _ => {} 1267 } 1268 } 1269 } 1270 mode_applies 1271 } 1272 1273 /// Publish kind-31988 state events for sessions whose status changed. 1274 fn publish_dirty_session_states(&mut self, ctx: &mut AppContext<'_>) { 1275 let Some(sk) = secret_key_bytes(ctx.accounts.get_selected_account().keypair()) else { 1276 return; 1277 }; 1278 1279 for session in self.session_manager.iter_mut() { 1280 if !session.state_dirty { 1281 continue; 1282 } 1283 1284 // Remote sessions are owned by another machine — only the 1285 // session owner should publish state events. 1286 if session.is_remote() { 1287 session.state_dirty = false; 1288 continue; 1289 } 1290 1291 let Some(agentic) = &session.agentic else { 1292 continue; 1293 }; 1294 1295 let event_sid = agentic.event_session_id().to_string(); 1296 let cwd = agentic.cwd.to_string_lossy(); 1297 let status = session.status().as_str(); 1298 let indicator = session.indicator.as_ref().map(|i| i.as_str()); 1299 let perm_mode = crate::session::permission_mode_to_str(agentic.permission_mode); 1300 let cli_sid = agentic.cli_resume_id().map(|s| s.to_string()); 1301 1302 queue_built_event( 1303 session_events::build_session_state_event( 1304 &event_sid, 1305 &session.details.title, 1306 session.details.custom_title.as_deref(), 1307 &cwd, 1308 status, 1309 indicator, 1310 &self.hostname, 1311 &session.details.home_dir, 1312 session.backend_type.as_str(), 1313 perm_mode, 1314 cli_sid.as_deref(), 1315 &sk, 1316 ), 1317 &format!("publishing session state: {} -> {}", event_sid, status), 1318 ctx.ndb, 1319 &sk, 1320 &mut self.pending_relay_events, 1321 ); 1322 1323 session.state_dirty = false; 1324 } 1325 } 1326 1327 /// Publish "deleted" state events for sessions that were deleted. 1328 /// Called in the update loop where AppContext is available. 1329 fn publish_pending_deletions(&mut self, ctx: &mut AppContext<'_>) { 1330 if self.pending_deletions.is_empty() { 1331 return; 1332 } 1333 1334 let Some(sk) = secret_key_bytes(ctx.accounts.get_selected_account().keypair()) else { 1335 return; 1336 }; 1337 1338 for info in std::mem::take(&mut self.pending_deletions) { 1339 queue_built_event( 1340 session_events::build_session_state_event( 1341 &info.claude_session_id, 1342 &info.title, 1343 None, 1344 &info.cwd, 1345 "deleted", 1346 None, // no indicator for deleted sessions 1347 &self.hostname, 1348 &info.home_dir, 1349 info.backend.as_str(), 1350 "default", 1351 None, 1352 &sk, 1353 ), 1354 &format!( 1355 "publishing deleted session state: {}", 1356 info.claude_session_id 1357 ), 1358 ctx.ndb, 1359 &sk, 1360 &mut self.pending_relay_events, 1361 ); 1362 } 1363 } 1364 1365 /// Build and queue permission response events from remote sessions. 1366 /// Called in the update loop where AppContext is available. 1367 fn publish_pending_perm_responses(&mut self, ctx: &AppContext<'_>) { 1368 if self.pending_perm_responses.is_empty() { 1369 return; 1370 } 1371 1372 let Some(sk) = secret_key_bytes(ctx.accounts.get_selected_account().keypair()) else { 1373 tracing::warn!("no secret key for publishing permission responses"); 1374 self.pending_perm_responses.clear(); 1375 return; 1376 }; 1377 1378 let pending = std::mem::take(&mut self.pending_perm_responses); 1379 1380 // Get session info from the active session 1381 let session = match self.session_manager.get_active() { 1382 Some(s) => s, 1383 None => return, 1384 }; 1385 let agentic = match &session.agentic { 1386 Some(a) => a, 1387 None => return, 1388 }; 1389 let session_id = agentic.event_session_id().to_string(); 1390 1391 for resp in pending { 1392 let request_note_id = match agentic.permissions.request_note_ids.get(&resp.perm_id) { 1393 Some(id) => id, 1394 None => { 1395 tracing::warn!("no request note_id for perm_id {}", resp.perm_id); 1396 continue; 1397 } 1398 }; 1399 1400 queue_built_event( 1401 session_events::build_permission_response_event( 1402 &resp.perm_id, 1403 request_note_id, 1404 resp.allowed, 1405 resp.message.as_deref(), 1406 &session_id, 1407 &sk, 1408 ), 1409 &format!( 1410 "queued remote permission response for {} ({})", 1411 resp.perm_id, 1412 if resp.allowed { "allow" } else { "deny" } 1413 ), 1414 ctx.ndb, 1415 &sk, 1416 &mut self.pending_relay_events, 1417 ); 1418 } 1419 } 1420 1421 /// Publish permission mode command events for remote sessions. 1422 /// Called in the update loop where AppContext is available. 1423 fn publish_pending_mode_commands(&mut self, ctx: &AppContext<'_>) { 1424 if self.pending_mode_commands.is_empty() { 1425 return; 1426 } 1427 1428 let Some(sk) = secret_key_bytes(ctx.accounts.get_selected_account().keypair()) else { 1429 tracing::warn!("no secret key for publishing mode commands"); 1430 self.pending_mode_commands.clear(); 1431 return; 1432 }; 1433 1434 for cmd in std::mem::take(&mut self.pending_mode_commands) { 1435 queue_built_event( 1436 session_events::build_set_permission_mode_event(cmd.mode, &cmd.session_id, &sk), 1437 &format!( 1438 "publishing permission mode command: {} -> {}", 1439 cmd.session_id, cmd.mode 1440 ), 1441 ctx.ndb, 1442 &sk, 1443 &mut self.pending_relay_events, 1444 ); 1445 } 1446 } 1447 1448 /// Restore sessions from kind-31988 state events in ndb. 1449 /// Called once on first `update()`. 1450 fn restore_sessions_from_ndb(&mut self, ctx: &mut AppContext<'_>) { 1451 let txn = match Transaction::new(ctx.ndb) { 1452 Ok(t) => t, 1453 Err(e) => { 1454 tracing::error!("failed to open txn for session restore: {:?}", e); 1455 return; 1456 } 1457 }; 1458 1459 let states = session_loader::load_session_states(ctx.ndb, &txn); 1460 if states.is_empty() { 1461 return; 1462 } 1463 1464 tracing::info!("restoring {} sessions from ndb", states.len()); 1465 1466 for state in &states { 1467 let backend = state 1468 .backend 1469 .as_deref() 1470 .and_then(BackendType::from_tag_str) 1471 .unwrap_or(BackendType::Claude); 1472 let cwd = std::path::PathBuf::from(&state.cwd); 1473 1474 // The d-tag is the event_id (Nostr identity). The cli_session 1475 // tag holds the real CLI session ID for --resume. If there's 1476 // no cli_session tag, this is a legacy event where d-tag was 1477 // the CLI session ID. 1478 let resume_id = match state.cli_session_id { 1479 Some(ref cli) if !cli.is_empty() => cli.clone(), 1480 Some(_) => { 1481 // Empty cli_session — backend never started, nothing to resume 1482 String::new() 1483 } 1484 None => { 1485 // Legacy: d-tag IS the CLI session ID 1486 state.claude_session_id.clone() 1487 } 1488 }; 1489 1490 let dave_sid = self.session_manager.new_resumed_session( 1491 cwd, 1492 resume_id, 1493 state.title.clone(), 1494 AiMode::Agentic, 1495 backend, 1496 ); 1497 1498 // Load conversation history from kind-1988 events 1499 let loaded = 1500 session_loader::load_session_messages(ctx.ndb, &txn, &state.claude_session_id); 1501 1502 if let Some(session) = self.session_manager.get_mut(dave_sid) { 1503 tracing::info!( 1504 "restored session '{}': {} messages", 1505 state.title, 1506 loaded.messages.len(), 1507 ); 1508 session.chat = loaded.messages; 1509 1510 if is_session_remote(&state.hostname, &state.cwd, &self.hostname) { 1511 session.source = session::SessionSource::Remote; 1512 } 1513 1514 // Local sessions use the current machine's hostname; 1515 // remote sessions use what was stored in the event. 1516 session.details.hostname = if session.is_remote() { 1517 state.hostname.clone() 1518 } else { 1519 self.hostname.clone() 1520 }; 1521 1522 session.details.custom_title = state.custom_title.clone(); 1523 1524 // Restore focus indicator from state event 1525 session.indicator = state 1526 .indicator 1527 .as_deref() 1528 .and_then(focus_queue::FocusPriority::from_indicator_str); 1529 1530 // Use home_dir from the event for remote abbreviation 1531 if !state.home_dir.is_empty() { 1532 session.details.home_dir = state.home_dir.clone(); 1533 } 1534 1535 if let Some(agentic) = &mut session.agentic { 1536 // Restore the event_id from the d-tag so published 1537 // state events keep using the same Nostr identity. 1538 agentic.event_id = state.claude_session_id.clone(); 1539 1540 // If cli_session was empty the backend never ran — 1541 // clear resume_session_id so we don't try --resume 1542 // with the event UUID. 1543 if state.cli_session_id.as_ref().is_some_and(|s| s.is_empty()) { 1544 agentic.resume_session_id = None; 1545 } 1546 1547 if let (Some(root), Some(last)) = (loaded.root_note_id, loaded.last_note_id) { 1548 agentic.live_threading.seed(root, last, loaded.event_count); 1549 } 1550 // Load permission state and dedup set from events 1551 agentic.permissions.merge_loaded( 1552 loaded.permissions.responded, 1553 loaded.permissions.request_note_ids, 1554 ); 1555 agentic.seen_note_ids = loaded.note_ids; 1556 // Set remote status and permission mode from state event 1557 agentic.remote_status = AgentStatus::from_status_str(&state.status); 1558 agentic.remote_status_ts = state.created_at; 1559 if let Some(ref pm) = state.permission_mode { 1560 agentic.permission_mode = crate::session::permission_mode_from_str(pm); 1561 } 1562 1563 setup_conversation_subscription(agentic, &state.claude_session_id, ctx.ndb); 1564 } 1565 } 1566 } 1567 1568 self.session_manager.rebuild_host_groups(); 1569 1570 // Seed per-host recent paths from session state events 1571 let host_paths = session_loader::load_recent_paths_by_host(ctx.ndb, &txn); 1572 self.directory_picker 1573 .seed_host_paths(host_paths, &self.hostname); 1574 1575 // Skip the directory picker since we restored sessions 1576 self.active_overlay = DaveOverlay::None; 1577 } 1578 1579 /// Poll for new kind-31988 session state events from the ndb subscription. 1580 /// 1581 /// When PNS events arrive from relays and get unwrapped, new session state 1582 /// events may appear. This detects them and creates sessions we don't already have. 1583 fn poll_session_state_events(&mut self, ctx: &mut AppContext<'_>) { 1584 let Some(sub) = self.session_state_sub else { 1585 return; 1586 }; 1587 1588 let note_keys = ctx.ndb.poll_for_notes(sub, 32); 1589 if note_keys.is_empty() { 1590 return; 1591 } 1592 1593 let txn = match Transaction::new(ctx.ndb) { 1594 Ok(t) => t, 1595 Err(_) => return, 1596 }; 1597 1598 // Collect existing claude session IDs to avoid duplicates 1599 let mut existing_ids: std::collections::HashSet<String> = self 1600 .session_manager 1601 .iter() 1602 .filter_map(|s| s.agentic.as_ref().map(|a| a.event_session_id().to_string())) 1603 .collect(); 1604 1605 for key in note_keys { 1606 let Ok(note) = ctx.ndb.get_note_by_key(&txn, key) else { 1607 continue; 1608 }; 1609 1610 let Some(claude_sid) = session_events::get_tag_value(¬e, "d") else { 1611 continue; 1612 }; 1613 1614 let status_str = session_events::get_tag_value(¬e, "status").unwrap_or("idle"); 1615 let backend_tag = 1616 session_events::get_tag_value(¬e, "backend").and_then(BackendType::from_tag_str); 1617 1618 // Skip deleted sessions entirely — don't create or keep them 1619 if status_str == "deleted" { 1620 // If we have this session locally, remove it (only if this 1621 // event is newer than the last state we applied). 1622 if existing_ids.contains(claude_sid) { 1623 let ts = note.created_at(); 1624 let to_delete: Vec<SessionId> = self 1625 .session_manager 1626 .iter() 1627 .filter(|s| { 1628 s.agentic.as_ref().is_some_and(|a| { 1629 a.event_session_id() == claude_sid && ts > a.remote_status_ts 1630 }) 1631 }) 1632 .map(|s| s.id) 1633 .collect(); 1634 for id in to_delete { 1635 let bt = self 1636 .session_manager 1637 .get(id) 1638 .map(|s| s.backend_type) 1639 .unwrap_or(BackendType::Remote); 1640 update::delete_session( 1641 &mut self.session_manager, 1642 &mut self.focus_queue, 1643 get_backend(&self.backends, bt), 1644 &mut self.directory_picker, 1645 id, 1646 ); 1647 } 1648 } 1649 continue; 1650 } 1651 1652 // Update remote_status for existing remote sessions, but only 1653 // if this event is newer than the one we already applied. 1654 // Multiple revisions of the same replaceable event can arrive 1655 // out of order (e.g. after a relay reconnect). 1656 if existing_ids.contains(claude_sid) { 1657 let ts = note.created_at(); 1658 let new_status = AgentStatus::from_status_str(status_str); 1659 let new_custom_title = 1660 session_events::get_tag_value(¬e, "custom_title").map(|s| s.to_string()); 1661 let new_hostname = session_events::get_tag_value(¬e, "hostname").unwrap_or(""); 1662 for session in self.session_manager.iter_mut() { 1663 let is_remote = session.is_remote(); 1664 if let Some(agentic) = &mut session.agentic { 1665 if agentic.event_session_id() == claude_sid && ts > agentic.remote_status_ts 1666 { 1667 agentic.remote_status_ts = ts; 1668 // custom_title syncs for both local and remote 1669 if new_custom_title.is_some() { 1670 session.details.custom_title = new_custom_title.clone(); 1671 } 1672 if let Some(backend) = backend_tag { 1673 session.backend_type = backend; 1674 } 1675 // Hostname syncs for remote sessions from the event 1676 if is_remote && !new_hostname.is_empty() { 1677 session.details.hostname = new_hostname.to_string(); 1678 } 1679 // Status, indicator, and permission mode only update 1680 // for remote sessions (local sessions derive from 1681 // the process) 1682 if is_remote { 1683 agentic.remote_status = new_status; 1684 session.indicator = 1685 session_events::get_tag_value(¬e, "indicator") 1686 .and_then(focus_queue::FocusPriority::from_indicator_str); 1687 if let Some(pm) = 1688 session_events::get_tag_value(¬e, "permission-mode") 1689 { 1690 agentic.permission_mode = 1691 crate::session::permission_mode_from_str(pm); 1692 } 1693 } 1694 } 1695 } 1696 } 1697 self.session_manager.rebuild_host_groups(); 1698 continue; 1699 } 1700 1701 // Look up the latest revision of this session. PNS wrapping 1702 // causes old revisions (including pre-deletion) to arrive from 1703 // the relay. Only create a session if the latest revision is valid. 1704 let Some(state) = session_loader::latest_valid_session(ctx.ndb, &txn, claude_sid) 1705 else { 1706 continue; 1707 }; 1708 1709 tracing::info!( 1710 "discovered new session from relay: '{}' ({}) on {}", 1711 state.title, 1712 claude_sid, 1713 state.hostname, 1714 ); 1715 1716 existing_ids.insert(claude_sid.to_string()); 1717 1718 // Track this host+cwd for the directory picker 1719 if !state.cwd.is_empty() { 1720 self.directory_picker 1721 .add_host_path(&state.hostname, PathBuf::from(&state.cwd)); 1722 } 1723 1724 let backend = state 1725 .backend 1726 .as_deref() 1727 .and_then(BackendType::from_tag_str) 1728 .unwrap_or(BackendType::Claude); 1729 let cwd = std::path::PathBuf::from(&state.cwd); 1730 1731 // Same event_id / cli_session logic as restore_sessions_from_ndb 1732 let resume_id = match state.cli_session_id { 1733 Some(ref cli) if !cli.is_empty() => cli.clone(), 1734 Some(_) => String::new(), // backend never started 1735 None => claude_sid.to_string(), // legacy 1736 }; 1737 1738 let dave_sid = self.session_manager.new_resumed_session( 1739 cwd, 1740 resume_id, 1741 state.title.clone(), 1742 AiMode::Agentic, 1743 backend, 1744 ); 1745 1746 // Load any conversation history that arrived with it 1747 let loaded = session_loader::load_session_messages(ctx.ndb, &txn, claude_sid); 1748 1749 if let Some(session) = self.session_manager.get_mut(dave_sid) { 1750 session.details.hostname = state.hostname.clone(); 1751 session.details.custom_title = state.custom_title.clone(); 1752 session.indicator = state 1753 .indicator 1754 .as_deref() 1755 .and_then(focus_queue::FocusPriority::from_indicator_str); 1756 if !state.home_dir.is_empty() { 1757 session.details.home_dir = state.home_dir.clone(); 1758 } 1759 if !loaded.messages.is_empty() { 1760 tracing::info!( 1761 "loaded {} messages for discovered session", 1762 loaded.messages.len() 1763 ); 1764 session.chat = loaded.messages; 1765 } 1766 1767 if is_session_remote(&state.hostname, &state.cwd, &self.hostname) { 1768 session.source = session::SessionSource::Remote; 1769 } 1770 1771 if let Some(agentic) = &mut session.agentic { 1772 // Restore the event_id from the d-tag 1773 agentic.event_id = claude_sid.to_string(); 1774 1775 // If cli_session was empty the backend never ran — 1776 // clear resume_session_id so we don't try --resume 1777 // with the event UUID. 1778 if state.cli_session_id.as_ref().is_some_and(|s| s.is_empty()) { 1779 agentic.resume_session_id = None; 1780 } 1781 1782 if let (Some(root), Some(last)) = (loaded.root_note_id, loaded.last_note_id) { 1783 agentic.live_threading.seed(root, last, loaded.event_count); 1784 } 1785 // Load permission state and dedup set 1786 agentic.permissions.merge_loaded( 1787 loaded.permissions.responded, 1788 loaded.permissions.request_note_ids, 1789 ); 1790 agentic.seen_note_ids = loaded.note_ids; 1791 // Set remote status and permission mode 1792 agentic.remote_status = AgentStatus::from_status_str(&state.status); 1793 agentic.remote_status_ts = state.created_at; 1794 if let Some(ref pm) = state.permission_mode { 1795 agentic.permission_mode = crate::session::permission_mode_from_str(pm); 1796 } 1797 1798 setup_conversation_subscription(agentic, claude_sid, ctx.ndb); 1799 } 1800 } 1801 1802 self.session_manager.rebuild_host_groups(); 1803 1804 // If we were showing the directory picker, switch to showing sessions 1805 if matches!(self.active_overlay, DaveOverlay::DirectoryPicker) { 1806 self.active_overlay = DaveOverlay::None; 1807 } 1808 } 1809 } 1810 1811 /// Poll for kind-31989 spawn command events. 1812 /// 1813 /// When a remote device wants to create a session on this host, it publishes 1814 /// a kind-31989 event with `target_host` matching our hostname. We pick it up 1815 /// here and create the session locally. 1816 fn poll_session_command_events(&mut self, ctx: &mut AppContext<'_>) { 1817 let Some(sub) = self.session_command_sub else { 1818 return; 1819 }; 1820 1821 let note_keys = ctx.ndb.poll_for_notes(sub, 16); 1822 if note_keys.is_empty() { 1823 return; 1824 } 1825 1826 let txn = match Transaction::new(ctx.ndb) { 1827 Ok(t) => t, 1828 Err(_) => return, 1829 }; 1830 1831 for key in note_keys { 1832 let Ok(note) = ctx.ndb.get_note_by_key(&txn, key) else { 1833 continue; 1834 }; 1835 1836 let Some(command_id) = session_events::get_tag_value(¬e, "d") else { 1837 continue; 1838 }; 1839 1840 // Dedup: skip already-processed commands 1841 if self.processed_commands.contains(command_id) { 1842 continue; 1843 } 1844 1845 let command = session_events::get_tag_value(¬e, "command").unwrap_or(""); 1846 if command != "spawn_session" { 1847 continue; 1848 } 1849 1850 let target = session_events::get_tag_value(¬e, "target_host").unwrap_or(""); 1851 if target != self.hostname { 1852 continue; 1853 } 1854 1855 let cwd = session_events::get_tag_value(¬e, "cwd").unwrap_or(""); 1856 let backend_str = session_events::get_tag_value(¬e, "backend").unwrap_or(""); 1857 let backend = 1858 BackendType::from_tag_str(backend_str).unwrap_or(self.model_config.backend); 1859 1860 tracing::info!( 1861 "received spawn command {}: cwd={}, backend={:?}", 1862 command_id, 1863 cwd, 1864 backend 1865 ); 1866 1867 self.processed_commands.insert(command_id.to_string()); 1868 update::create_session_with_cwd( 1869 &mut self.session_manager, 1870 &mut self.directory_picker, 1871 &mut self.scene, 1872 self.show_scene, 1873 self.ai_mode, 1874 PathBuf::from(cwd), 1875 &self.hostname, 1876 backend, 1877 Some(ctx.ndb), 1878 ); 1879 } 1880 } 1881 1882 /// Poll for new kind-1988 conversation events. 1883 /// 1884 /// For remote sessions: process all roles (user, assistant, tool_call, etc.) 1885 /// to keep the phone UI in sync with the desktop's conversation. 1886 /// 1887 /// For local sessions: only process `role=user` messages arriving from 1888 /// remote clients (phone), collecting them for backend dispatch. 1889 fn poll_remote_conversation_events( 1890 &mut self, 1891 ndb: &nostrdb::Ndb, 1892 secret_key: Option<&[u8; 32]>, 1893 ) -> (Vec<(SessionId, String)>, Vec<session_events::BuiltEvent>) { 1894 let mut remote_user_messages: Vec<(SessionId, String)> = Vec::new(); 1895 let mut events_to_publish: Vec<session_events::BuiltEvent> = Vec::new(); 1896 let session_ids = self.session_manager.session_ids(); 1897 for session_id in session_ids { 1898 let Some(session) = self.session_manager.get_mut(session_id) else { 1899 continue; 1900 }; 1901 let is_remote = session.is_remote(); 1902 1903 // Get sub without holding agentic borrow 1904 let sub = match session 1905 .agentic 1906 .as_ref() 1907 .and_then(|a| a.live_conversation_sub) 1908 { 1909 Some(s) => s, 1910 None => continue, 1911 }; 1912 1913 let note_keys = ndb.poll_for_notes(sub, 128); 1914 if note_keys.is_empty() { 1915 continue; 1916 } 1917 1918 let txn = match Transaction::new(ndb) { 1919 Ok(txn) => txn, 1920 Err(_) => continue, 1921 }; 1922 1923 // Collect and sort by created_at to process in order 1924 let mut notes: Vec<_> = note_keys 1925 .iter() 1926 .filter_map(|key| ndb.get_note_by_key(&txn, *key).ok()) 1927 .collect(); 1928 notes.sort_by_key(|n| n.created_at()); 1929 1930 for note in ¬es { 1931 // Skip events we've already processed (dedup) 1932 let note_id = *note.id(); 1933 let dominated = session 1934 .agentic 1935 .as_mut() 1936 .map(|a| !a.seen_note_ids.insert(note_id)) 1937 .unwrap_or(true); 1938 if dominated { 1939 continue; 1940 } 1941 1942 let content = note.content(); 1943 let role = session_events::get_tag_value(note, "role"); 1944 1945 // Local sessions: only process incoming user messages from remote clients 1946 if !is_remote { 1947 if role == Some("user") { 1948 tracing::info!("received remote user message for local session"); 1949 session.chat.push(Message::User(content.to_string())); 1950 session.update_title_from_last_message(); 1951 remote_user_messages.push((session_id, content.to_string())); 1952 } 1953 continue; 1954 } 1955 1956 let Some(agentic) = &mut session.agentic else { 1957 continue; 1958 }; 1959 1960 match role { 1961 Some("user") => { 1962 session.chat.push(Message::User(content.to_string())); 1963 } 1964 Some("assistant") => { 1965 session.chat.push(Message::Assistant( 1966 crate::messages::AssistantMessage::from_text(content.to_string()), 1967 )); 1968 } 1969 Some("tool_call") => { 1970 session.chat.push(Message::Assistant( 1971 crate::messages::AssistantMessage::from_text(content.to_string()), 1972 )); 1973 } 1974 Some("tool_result") => { 1975 let summary = if content.chars().count() > 100 { 1976 let truncated: String = content.chars().take(100).collect(); 1977 format!("{}...", truncated) 1978 } else { 1979 content.to_string() 1980 }; 1981 let tool_name = session_events::get_tag_value(note, "tool-name") 1982 .unwrap_or("tool") 1983 .to_string(); 1984 session 1985 .chat 1986 .push(Message::ToolResponse(ToolResponse::executed_tool( 1987 crate::messages::ExecutedTool { 1988 tool_name, 1989 summary, 1990 parent_task_id: None, 1991 file_update: None, 1992 }, 1993 ))); 1994 } 1995 Some("permission_request") => { 1996 handle_remote_permission_request( 1997 note, 1998 content, 1999 agentic, 2000 &mut session.chat, 2001 secret_key, 2002 &mut events_to_publish, 2003 ); 2004 } 2005 Some("permission_response") => { 2006 // Track that this permission was responded to 2007 if let Some(perm_id_str) = session_events::get_tag_value(note, "perm-id") { 2008 if let Ok(perm_id) = uuid::Uuid::parse_str(perm_id_str) { 2009 agentic.permissions.responded.insert(perm_id); 2010 // Update the matching PermissionRequest in chat 2011 for msg in session.chat.iter_mut() { 2012 if let Message::PermissionRequest(req) = msg { 2013 if req.id == perm_id && req.response.is_none() { 2014 req.response = Some( 2015 crate::messages::PermissionResponseType::Allowed, 2016 ); 2017 } 2018 } 2019 } 2020 } 2021 } 2022 } 2023 Some("compaction_started") => { 2024 agentic.is_compacting = true; 2025 } 2026 Some("compaction_complete") => { 2027 agentic.is_compacting = false; 2028 let pre_tokens = content.parse::<u64>().unwrap_or(0); 2029 let info = crate::messages::CompactionInfo { pre_tokens }; 2030 agentic.last_compaction = Some(info.clone()); 2031 session.chat.push(Message::CompactionComplete(info)); 2032 2033 // Advance compact-and-proceed: for remote sessions, 2034 // there's no stream-end to wait for, so go straight 2035 // to ReadyToProceed and consume immediately. 2036 if agentic.compact_and_proceed 2037 == crate::session::CompactAndProceedState::WaitingForCompaction 2038 { 2039 agentic.compact_and_proceed = 2040 crate::session::CompactAndProceedState::ReadyToProceed; 2041 } 2042 } 2043 _ => { 2044 // Skip progress, queue-operation, etc. 2045 } 2046 } 2047 2048 // Handle proceed after compaction for remote sessions. 2049 // Published as a relay event so the desktop backend picks it up. 2050 if session.take_compact_and_proceed() { 2051 if let Some(sk) = secret_key { 2052 if let Some(evt) = ingest_live_event( 2053 session, 2054 ndb, 2055 sk, 2056 "Proceed with implementing the plan.", 2057 "user", 2058 None, 2059 None, 2060 ) { 2061 events_to_publish.push(evt); 2062 } 2063 } 2064 } 2065 } 2066 } 2067 (remote_user_messages, events_to_publish) 2068 } 2069 2070 fn rename_session(&mut self, id: SessionId, new_title: String) { 2071 let Some(session) = self.session_manager.get_mut(id) else { 2072 return; 2073 }; 2074 session.details.custom_title = Some(new_title); 2075 session.state_dirty = true; 2076 } 2077 2078 fn delete_session(&mut self, id: SessionId) { 2079 // Capture session info before deletion so we can publish a "deleted" state event 2080 if let Some(session) = self.session_manager.get(id) { 2081 if let Some(agentic) = &session.agentic { 2082 self.pending_deletions.push(DeletedSessionInfo { 2083 claude_session_id: agentic.event_session_id().to_string(), 2084 title: session.details.title.clone(), 2085 cwd: agentic.cwd.to_string_lossy().to_string(), 2086 home_dir: session.details.home_dir.clone(), 2087 backend: session.backend_type, 2088 }); 2089 } 2090 } 2091 2092 let bt = self 2093 .session_manager 2094 .get(id) 2095 .map(|s| s.backend_type) 2096 .unwrap_or(BackendType::Remote); 2097 update::delete_session( 2098 &mut self.session_manager, 2099 &mut self.focus_queue, 2100 get_backend(&self.backends, bt), 2101 &mut self.directory_picker, 2102 id, 2103 ); 2104 } 2105 2106 /// Handle an interrupt request - requires double-Escape to confirm 2107 fn handle_interrupt_request(&mut self, ctx: &egui::Context) { 2108 let bt = self 2109 .session_manager 2110 .get_active() 2111 .map(|s| s.backend_type) 2112 .unwrap_or(BackendType::Remote); 2113 self.interrupt_pending_since = update::handle_interrupt_request( 2114 &self.session_manager, 2115 get_backend(&self.backends, bt), 2116 self.interrupt_pending_since, 2117 ctx, 2118 ); 2119 } 2120 2121 /// Check if interrupt confirmation has timed out and clear it 2122 fn check_interrupt_timeout(&mut self) { 2123 self.interrupt_pending_since = 2124 update::check_interrupt_timeout(self.interrupt_pending_since); 2125 } 2126 2127 /// Returns true if an interrupt is pending confirmation 2128 pub fn is_interrupt_pending(&self) -> bool { 2129 self.interrupt_pending_since.is_some() 2130 } 2131 2132 /// If only one agentic backend is available, return it. Otherwise None 2133 /// (meaning we need to show the backend picker). 2134 fn single_agentic_backend(&self) -> Option<BackendType> { 2135 if self.available_backends.len() == 1 { 2136 Some(self.available_backends[0]) 2137 } else { 2138 None 2139 } 2140 } 2141 2142 /// Queue a spawn command request. The event is built and published in 2143 /// update() where AppContext (and thus the secret key) is available. 2144 fn queue_spawn_command(&mut self, target_host: &str, cwd: &Path, backend: BackendType) { 2145 tracing::info!("queuing spawn command for {} at {:?}", target_host, cwd); 2146 self.pending_spawn_commands.push(PendingSpawnCommand { 2147 target_host: target_host.to_string(), 2148 cwd: cwd.to_path_buf(), 2149 backend, 2150 }); 2151 } 2152 2153 fn create_or_pick_backend(&mut self, cwd: PathBuf) { 2154 tracing::info!( 2155 "create_or_pick_backend: {} available backends: {:?}", 2156 self.available_backends.len(), 2157 self.available_backends 2158 ); 2159 if let Some(bt) = self.single_agentic_backend() { 2160 tracing::info!("single backend detected, skipping picker: {:?}", bt); 2161 self.create_or_resume_session(cwd, bt); 2162 } else if self.available_backends.is_empty() { 2163 // No agentic backends — fall back to configured backend 2164 self.create_or_resume_session(cwd, self.model_config.backend); 2165 } else { 2166 tracing::info!( 2167 "multiple backends available, showing backend picker: {:?}", 2168 self.available_backends 2169 ); 2170 self.active_overlay = DaveOverlay::BackendPicker { cwd }; 2171 } 2172 } 2173 2174 /// After a backend is determined, either create a session directly or 2175 /// show the session picker if there are resumable sessions for this backend. 2176 fn create_or_resume_session(&mut self, cwd: PathBuf, backend_type: BackendType) { 2177 // Only Claude has discoverable resumable sessions (from ~/.claude/) 2178 if backend_type == BackendType::Claude { 2179 let resumable = discover_sessions(&cwd); 2180 if !resumable.is_empty() { 2181 tracing::info!( 2182 "found {} resumable sessions, showing session picker", 2183 resumable.len() 2184 ); 2185 self.session_picker.open(cwd); 2186 self.active_overlay = DaveOverlay::SessionPicker { 2187 backend: backend_type, 2188 }; 2189 return; 2190 } 2191 } 2192 self.create_session_with_cwd(cwd, backend_type); 2193 self.active_overlay = DaveOverlay::None; 2194 } 2195 2196 /// Get the first pending permission request ID for the active session 2197 fn first_pending_permission(&self) -> Option<uuid::Uuid> { 2198 update::first_pending_permission(&self.session_manager) 2199 } 2200 2201 /// Check if the first pending permission is an AskUserQuestion tool call 2202 fn has_pending_question(&self) -> bool { 2203 update::has_pending_question(&self.session_manager) 2204 } 2205 2206 /// Handle a keybinding action 2207 fn handle_key_action(&mut self, key_action: KeyAction, egui_ctx: &egui::Context) { 2208 let bt = self 2209 .session_manager 2210 .get_active() 2211 .map(|s| s.backend_type) 2212 .unwrap_or(BackendType::Remote); 2213 match ui::handle_key_action( 2214 key_action, 2215 &mut self.session_manager, 2216 &mut self.scene, 2217 &mut self.focus_queue, 2218 get_backend(&self.backends, bt), 2219 self.show_scene, 2220 self.auto_steal.is_enabled(), 2221 &mut self.home_session, 2222 egui_ctx, 2223 ) { 2224 KeyActionResult::ToggleView => { 2225 self.show_scene = !self.show_scene; 2226 } 2227 KeyActionResult::HandleInterrupt => { 2228 self.handle_interrupt_request(egui_ctx); 2229 } 2230 KeyActionResult::CloneAgent => { 2231 self.clone_active_agent(); 2232 } 2233 KeyActionResult::NewAgent => { 2234 self.handle_new_chat(); 2235 } 2236 KeyActionResult::DeleteSession(id) => { 2237 self.delete_session(id); 2238 } 2239 KeyActionResult::SetAutoSteal(new_state) => { 2240 self.auto_steal = if new_state { 2241 focus_queue::AutoStealState::Pending 2242 } else { 2243 focus_queue::AutoStealState::Disabled 2244 }; 2245 } 2246 KeyActionResult::PublishPermissionResponse(publish) => { 2247 self.pending_perm_responses.push(publish); 2248 } 2249 KeyActionResult::PublishModeCommand(cmd) => { 2250 self.pending_mode_commands.push(cmd); 2251 } 2252 KeyActionResult::None => {} 2253 } 2254 } 2255 2256 /// Handle the Send action, including tentative permission states 2257 fn handle_send_action(&mut self, ctx: &AppContext, ui: &egui::Ui) { 2258 let bt = self 2259 .session_manager 2260 .get_active() 2261 .map(|s| s.backend_type) 2262 .unwrap_or(BackendType::Remote); 2263 match ui::handle_send_action( 2264 &mut self.session_manager, 2265 get_backend(&self.backends, bt), 2266 ui.ctx(), 2267 ) { 2268 SendActionResult::SendMessage => { 2269 self.handle_user_send(ctx, ui); 2270 } 2271 SendActionResult::NeedsRelayPublish(publish) => { 2272 self.pending_perm_responses.push(publish); 2273 } 2274 SendActionResult::Handled => {} 2275 } 2276 } 2277 2278 /// Handle a UI action from DaveUi 2279 fn handle_ui_action( 2280 &mut self, 2281 action: DaveAction, 2282 ctx: &AppContext, 2283 ui: &egui::Ui, 2284 ) -> Option<AppAction> { 2285 // Intercept NewChat to handle chat vs agentic mode 2286 if matches!(action, DaveAction::NewChat) { 2287 self.handle_new_chat(); 2288 return None; 2289 } 2290 2291 let bt = self 2292 .session_manager 2293 .get_active() 2294 .map(|s| s.backend_type) 2295 .unwrap_or(BackendType::Remote); 2296 match ui::handle_ui_action( 2297 action, 2298 &mut self.session_manager, 2299 get_backend(&self.backends, bt), 2300 &mut self.active_overlay, 2301 &mut self.show_session_list, 2302 ui.ctx(), 2303 ) { 2304 UiActionResult::AppAction(app_action) => Some(app_action), 2305 UiActionResult::SendAction => { 2306 self.handle_send_action(ctx, ui); 2307 None 2308 } 2309 UiActionResult::PublishPermissionResponse(publish) => { 2310 self.pending_perm_responses.push(publish); 2311 None 2312 } 2313 UiActionResult::PublishModeCommand(cmd) => { 2314 self.pending_mode_commands.push(cmd); 2315 None 2316 } 2317 UiActionResult::ToggleAutoSteal => { 2318 let new_state = crate::update::toggle_auto_steal( 2319 &mut self.session_manager, 2320 &mut self.scene, 2321 self.show_scene, 2322 self.auto_steal.is_enabled(), 2323 &mut self.home_session, 2324 ); 2325 self.auto_steal = if new_state { 2326 focus_queue::AutoStealState::Pending 2327 } else { 2328 focus_queue::AutoStealState::Disabled 2329 }; 2330 None 2331 } 2332 UiActionResult::NewChat => { 2333 self.handle_new_chat(); 2334 None 2335 } 2336 UiActionResult::FocusQueueNext => { 2337 crate::update::focus_queue_next( 2338 &mut self.session_manager, 2339 &mut self.focus_queue, 2340 &mut self.scene, 2341 self.show_scene, 2342 ); 2343 None 2344 } 2345 UiActionResult::Compact => { 2346 if let Some(session) = self.session_manager.get_active() { 2347 let session_id = format!("dave-session-{}", session.id); 2348 if let Some(rx) = get_backend(&self.backends, bt) 2349 .compact_session(session_id, ui.ctx().clone()) 2350 { 2351 if let Some(session) = self.session_manager.get_active_mut() { 2352 session.incoming_tokens = Some(rx); 2353 } 2354 } 2355 } 2356 None 2357 } 2358 UiActionResult::Handled => None, 2359 } 2360 } 2361 2362 /// Handle a user send action triggered by the ui 2363 fn handle_user_send(&mut self, app_ctx: &AppContext, ui: &egui::Ui) { 2364 // Check for /cd command first (agentic only) 2365 let cd_result = self 2366 .session_manager 2367 .get_active_mut() 2368 .and_then(update::handle_cd_command); 2369 2370 // If /cd command was processed, add to recent directories 2371 if let Some(Ok(path)) = cd_result { 2372 self.directory_picker.add_recent(path); 2373 return; 2374 } else if cd_result.is_some() { 2375 // Error case - already handled above 2376 return; 2377 } 2378 2379 // Normal message handling 2380 if let Some(session) = self.session_manager.get_active_mut() { 2381 let user_text = session.input.clone(); 2382 session.input.clear(); 2383 2384 // Generate live event for user message 2385 if let Some(sk) = secret_key_bytes(app_ctx.accounts.get_selected_account().keypair()) { 2386 if let Some(evt) = 2387 ingest_live_event(session, app_ctx.ndb, &sk, &user_text, "user", None, None) 2388 { 2389 self.pending_relay_events.push(evt); 2390 } 2391 } 2392 2393 session.chat.push(Message::User(user_text)); 2394 session.update_title_from_last_message(); 2395 2396 // Remote sessions: publish user message to relay but don't send to local backend 2397 if session.is_remote() { 2398 return; 2399 } 2400 2401 // If already dispatched (waiting for or receiving response), queue 2402 // the message in chat without dispatching. 2403 // needs_redispatch_after_stream_end() will dispatch it when the 2404 // current turn finishes. 2405 if session.is_dispatched() { 2406 tracing::info!("message queued, will dispatch after current turn"); 2407 return; 2408 } 2409 } 2410 self.send_user_message(app_ctx, ui.ctx()); 2411 } 2412 2413 fn send_user_message(&mut self, app_ctx: &AppContext, ctx: &egui::Context) { 2414 let Some(active_id) = self.session_manager.active_id() else { 2415 return; 2416 }; 2417 self.send_user_message_for(active_id, app_ctx, ctx); 2418 } 2419 2420 /// Send a message for a specific session by ID 2421 fn send_user_message_for(&mut self, sid: SessionId, app_ctx: &AppContext, ctx: &egui::Context) { 2422 let Some(session) = self.session_manager.get_mut(sid) else { 2423 return; 2424 }; 2425 2426 // Only dispatch if we have the backend this session needs. 2427 // Without this guard, get_backend falls back to Remote which 2428 // immediately disconnects, causing an infinite redispatch loop. 2429 if !self.backends.contains_key(&session.backend_type) { 2430 return; 2431 } 2432 2433 // Record how many trailing user messages we're dispatching. 2434 // DispatchState tracks this for append_token insert position, 2435 // UI queued indicator, and redispatch-after-stream-end logic. 2436 session.mark_dispatched(); 2437 2438 let user_id = calculate_user_id(app_ctx.accounts.get_selected_account().keypair()); 2439 let session_id = format!("dave-session-{}", session.id); 2440 let messages = session.chat.clone(); 2441 let cwd = session.agentic.as_ref().map(|a| a.cwd.clone()); 2442 let resume_session_id = session 2443 .agentic 2444 .as_ref() 2445 .and_then(|a| a.cli_resume_id().map(|s| s.to_string())); 2446 let backend_type = session.backend_type; 2447 let tools = self.tools.clone(); 2448 let model_name = if backend_type == self.model_config.backend { 2449 self.model_config.model().to_owned() 2450 } else { 2451 backend_type.default_model().to_owned() 2452 }; 2453 let ctx = ctx.clone(); 2454 2455 // Use backend to stream request 2456 let (rx, task_handle) = get_backend(&self.backends, backend_type).stream_request( 2457 messages, 2458 tools, 2459 model_name, 2460 user_id, 2461 session_id, 2462 cwd, 2463 resume_session_id, 2464 ctx, 2465 ); 2466 session.incoming_tokens = Some(rx); 2467 session.task_handle = task_handle; 2468 } 2469 2470 /// Process pending archive conversion (JSONL to nostr events). 2471 /// 2472 /// When resuming a session, the JSONL archive needs to be converted to 2473 /// nostr events. If events already exist in ndb, load them directly. 2474 fn process_archive_conversion(&mut self, ctx: &mut AppContext<'_>) { 2475 let Some((file_path, dave_sid, claude_sid)) = self.pending_archive_convert.take() else { 2476 return; 2477 }; 2478 2479 let txn = Transaction::new(ctx.ndb).expect("txn"); 2480 let filter = nostrdb::Filter::new() 2481 .kinds([session_events::AI_CONVERSATION_KIND as u64]) 2482 .tags([claude_sid.as_str()], 'd') 2483 .limit(1) 2484 .build(); 2485 let already_exists = ctx 2486 .ndb 2487 .query(&txn, &[filter], 1) 2488 .map(|r| !r.is_empty()) 2489 .unwrap_or(false); 2490 drop(txn); 2491 2492 if already_exists { 2493 tracing::info!( 2494 "session {} already has events in ndb, skipping archive conversion", 2495 claude_sid 2496 ); 2497 let loaded_txn = Transaction::new(ctx.ndb).expect("txn"); 2498 let loaded = session_loader::load_session_messages(ctx.ndb, &loaded_txn, &claude_sid); 2499 if let Some(session) = self.session_manager.get_mut(dave_sid) { 2500 tracing::info!("loaded {} messages into chat UI", loaded.messages.len()); 2501 session.chat = loaded.messages; 2502 2503 if let Some(agentic) = &mut session.agentic { 2504 if let (Some(root), Some(last)) = (loaded.root_note_id, loaded.last_note_id) { 2505 agentic.live_threading.seed(root, last, loaded.event_count); 2506 } 2507 agentic 2508 .permissions 2509 .request_note_ids 2510 .extend(loaded.permissions.request_note_ids); 2511 } 2512 } 2513 } else if let Some(secret_bytes) = 2514 secret_key_bytes(ctx.accounts.get_selected_account().keypair()) 2515 { 2516 let sub_filter = nostrdb::Filter::new() 2517 .kinds([session_events::AI_CONVERSATION_KIND as u64]) 2518 .tags([claude_sid.as_str()], 'd') 2519 .build(); 2520 2521 match ctx.ndb.subscribe(&[sub_filter]) { 2522 Ok(sub) => { 2523 match session_converter::convert_session_to_events( 2524 &file_path, 2525 ctx.ndb, 2526 &secret_bytes, 2527 ) { 2528 Ok(note_ids) => { 2529 tracing::info!( 2530 "archived session: {} events from {}, awaiting indexing", 2531 note_ids.len(), 2532 file_path.display() 2533 ); 2534 self.pending_message_load = Some(PendingMessageLoad { 2535 sub, 2536 dave_session_id: dave_sid, 2537 claude_session_id: claude_sid, 2538 }); 2539 } 2540 Err(e) => { 2541 tracing::error!("archive conversion failed: {}", e); 2542 } 2543 } 2544 } 2545 Err(e) => { 2546 tracing::error!("failed to subscribe for archive events: {:?}", e); 2547 } 2548 } 2549 } else { 2550 tracing::warn!("no secret key available for archive conversion"); 2551 } 2552 } 2553 2554 /// Poll for pending message load completion. 2555 /// 2556 /// After archive conversion, wait for ndb to index the kind-1988 events, 2557 /// then load them into the session's chat history. 2558 fn poll_pending_message_load(&mut self, ndb: &nostrdb::Ndb) { 2559 let Some(pending) = &self.pending_message_load else { 2560 return; 2561 }; 2562 2563 let notes = ndb.poll_for_notes(pending.sub, 4096); 2564 if notes.is_empty() { 2565 return; 2566 } 2567 2568 let txn = Transaction::new(ndb).expect("txn"); 2569 let loaded = session_loader::load_session_messages(ndb, &txn, &pending.claude_session_id); 2570 if let Some(session) = self.session_manager.get_mut(pending.dave_session_id) { 2571 tracing::info!("loaded {} messages into chat UI", loaded.messages.len()); 2572 session.chat = loaded.messages; 2573 2574 if let Some(agentic) = &mut session.agentic { 2575 if let (Some(root), Some(last)) = (loaded.root_note_id, loaded.last_note_id) { 2576 agentic.live_threading.seed(root, last, loaded.event_count); 2577 } 2578 agentic 2579 .permissions 2580 .request_note_ids 2581 .extend(loaded.permissions.request_note_ids); 2582 } 2583 } 2584 self.pending_message_load = None; 2585 } 2586 2587 /// Process relay events and run negentropy reconciliation against PNS relay. 2588 /// 2589 /// Collects negentropy protocol events from the relay, re-subscribes on 2590 /// reconnect, and drives multi-round sync to fetch missing PNS events. 2591 fn process_negentropy_sync(&mut self, ctx: &mut AppContext<'_>, egui_ctx: &egui::Context) { 2592 let pns_sub_id = self.pns_relay_sub.clone(); 2593 let pns_relay = self.pns_relay_url.clone(); 2594 let mut neg_events: Vec<enostr::negentropy::NegEvent> = Vec::new(); 2595 try_process_events_core(ctx, &mut self.pool, egui_ctx, |app_ctx, pool, ev| { 2596 if ev.relay == pns_relay { 2597 if let enostr::RelayEvent::Opened = (&ev.event).into() { 2598 neg_events.push(enostr::negentropy::NegEvent::RelayOpened); 2599 if let Some(sub_id) = &pns_sub_id { 2600 if let Some(sk) = 2601 app_ctx.accounts.get_selected_account().keypair().secret_key 2602 { 2603 let pns_keys = enostr::pns::derive_pns_keys(&sk.secret_bytes()); 2604 let pns_filter = nostrdb::Filter::new() 2605 .kinds([enostr::pns::PNS_KIND as u64]) 2606 .authors([pns_keys.keypair.pubkey.bytes()]) 2607 .limit(500) 2608 .build(); 2609 let req = enostr::ClientMessage::req(sub_id.clone(), vec![pns_filter]); 2610 pool.send_to(&req, &pns_relay); 2611 tracing::info!("re-subscribed for PNS events after relay reconnect"); 2612 } 2613 } 2614 } 2615 2616 neg_events.extend(enostr::negentropy::NegEvent::from_relay(&ev.event)); 2617 } 2618 }); 2619 2620 // Reset round counter on relay reconnect so we do a fresh burst 2621 if neg_events 2622 .iter() 2623 .any(|e| matches!(e, enostr::negentropy::NegEvent::RelayOpened)) 2624 { 2625 self.neg_sync_round = 0; 2626 } 2627 2628 // Reconcile local events against PNS relay, 2629 // fetch any missing kind-1080 events via standard REQ. 2630 if let Some(sk) = ctx.accounts.get_selected_account().keypair().secret_key { 2631 let pns_keys = enostr::pns::derive_pns_keys(&sk.secret_bytes()); 2632 let filter = nostrdb::Filter::new() 2633 .kinds([enostr::pns::PNS_KIND as u64]) 2634 .authors([pns_keys.keypair.pubkey.bytes()]) 2635 .build(); 2636 let result = self.neg_sync.process( 2637 neg_events, 2638 ctx.ndb, 2639 &mut self.pool, 2640 &filter, 2641 &self.pns_relay_url, 2642 ); 2643 2644 // If events were found and we haven't hit the round limit, 2645 // trigger another sync to pull more recent data. 2646 if result.new_events > 0 { 2647 self.neg_sync_round += 1; 2648 if self.neg_sync_round < MAX_NEG_SYNC_ROUNDS { 2649 tracing::info!( 2650 "negentropy: scheduling round {}/{} (got {} new, {} skipped)", 2651 self.neg_sync_round + 1, 2652 MAX_NEG_SYNC_ROUNDS, 2653 result.new_events, 2654 result.skipped 2655 ); 2656 self.neg_sync.trigger_now(); 2657 } else { 2658 tracing::info!( 2659 "negentropy: reached max rounds ({}), stopping", 2660 MAX_NEG_SYNC_ROUNDS 2661 ); 2662 } 2663 } else if result.skipped > 0 { 2664 tracing::info!( 2665 "negentropy: relay has {} events we can't reconcile, stopping", 2666 result.skipped 2667 ); 2668 } 2669 } 2670 } 2671 2672 /// One-time initialization on first update. 2673 /// 2674 /// Restores sessions from ndb, triggers initial negentropy sync, 2675 /// and sets up relay subscriptions. 2676 fn initialize_once(&mut self, ctx: &mut AppContext<'_>, egui_ctx: &egui::Context) { 2677 self.sessions_restored = true; 2678 2679 self.restore_sessions_from_ndb(ctx); 2680 2681 // Trigger initial negentropy sync after startup 2682 self.neg_sync.trigger_now(); 2683 self.neg_sync_round = 0; 2684 2685 // Subscribe to PNS events on relays for session discovery from other devices. 2686 // Also subscribe locally in ndb for kind-31988 session state events 2687 // so we detect new sessions appearing after PNS unwrapping. 2688 if let Some(sk) = ctx.accounts.get_selected_account().keypair().secret_key { 2689 let pns_keys = enostr::pns::derive_pns_keys(&sk.secret_bytes()); 2690 2691 // Ensure the PNS relay is in the pool 2692 let egui_ctx = egui_ctx.clone(); 2693 let wakeup = move || egui_ctx.request_repaint(); 2694 if let Err(e) = self.pool.add_url(self.pns_relay_url.clone(), wakeup) { 2695 tracing::warn!("failed to add PNS relay {}: {:?}", self.pns_relay_url, e); 2696 } 2697 2698 // Remote: subscribe on PNS relay for kind-1080 authored by our PNS pubkey 2699 let pns_filter = nostrdb::Filter::new() 2700 .kinds([enostr::pns::PNS_KIND as u64]) 2701 .authors([pns_keys.keypair.pubkey.bytes()]) 2702 .limit(500) 2703 .build(); 2704 let sub_id = uuid::Uuid::new_v4().to_string(); 2705 let req = enostr::ClientMessage::req(sub_id.clone(), vec![pns_filter]); 2706 self.pool.send_to(&req, &self.pns_relay_url); 2707 self.pns_relay_sub = Some(sub_id); 2708 tracing::info!("subscribed for PNS events on {}", self.pns_relay_url); 2709 2710 // Local: subscribe in ndb for kind-31988 session state events 2711 let state_filter = nostrdb::Filter::new() 2712 .kinds([session_events::AI_SESSION_STATE_KIND as u64]) 2713 .build(); 2714 match ctx.ndb.subscribe(&[state_filter]) { 2715 Ok(sub) => { 2716 self.session_state_sub = Some(sub); 2717 tracing::info!("subscribed for session state events in ndb"); 2718 } 2719 Err(e) => { 2720 tracing::warn!("failed to subscribe for session state events: {:?}", e); 2721 } 2722 } 2723 2724 // Local: subscribe in ndb for kind-31989 session command events 2725 let cmd_filter = nostrdb::Filter::new() 2726 .kinds([session_events::AI_SESSION_COMMAND_KIND as u64]) 2727 .build(); 2728 match ctx.ndb.subscribe(&[cmd_filter]) { 2729 Ok(sub) => { 2730 self.session_command_sub = Some(sub); 2731 tracing::info!("subscribed for session command events in ndb"); 2732 } 2733 Err(e) => { 2734 tracing::warn!("failed to subscribe for session command events: {:?}", e); 2735 } 2736 } 2737 } 2738 } 2739 } 2740 2741 impl notedeck::App for Dave { 2742 fn update(&mut self, ctx: &mut AppContext<'_>, egui_ctx: &egui::Context) { 2743 self.process_negentropy_sync(ctx, egui_ctx); 2744 2745 // Poll for external spawn-agent commands via IPC 2746 self.poll_ipc_commands(); 2747 2748 // Process pending thread summary requests 2749 let pending = std::mem::take(&mut self.pending_summaries); 2750 for note_id in pending { 2751 if let Some(sid) = self.build_summary_session(ctx.ndb, ¬e_id) { 2752 self.send_user_message_for(sid, ctx, egui_ctx); 2753 } 2754 } 2755 2756 // One-time initialization on first update 2757 if !self.sessions_restored { 2758 self.initialize_once(ctx, egui_ctx); 2759 } 2760 2761 // Poll for external editor completion 2762 update::poll_editor_job(&mut self.session_manager); 2763 2764 // Poll for new session states from PNS-unwrapped relay events 2765 self.poll_session_state_events(ctx); 2766 2767 // Poll for spawn commands targeting this host 2768 self.poll_session_command_events(ctx); 2769 2770 // Poll for live conversation events on all sessions. 2771 // Returns user messages from remote clients that need backend dispatch. 2772 // Only dispatch if the session isn't already streaming a response — 2773 // the message is already in chat, so it will be included when the 2774 // current stream finishes and we re-dispatch. 2775 let sk_bytes = secret_key_bytes(ctx.accounts.get_selected_account().keypair()); 2776 let (remote_user_msgs, conv_events) = 2777 self.poll_remote_conversation_events(ctx.ndb, sk_bytes.as_ref()); 2778 self.pending_relay_events.extend(conv_events); 2779 for (sid, _msg) in remote_user_msgs { 2780 let should_dispatch = self 2781 .session_manager 2782 .get(sid) 2783 .is_some_and(|s| s.should_dispatch_remote_message()); 2784 if should_dispatch { 2785 self.send_user_message_for(sid, ctx, egui_ctx); 2786 } 2787 } 2788 2789 self.process_archive_conversion(ctx); 2790 self.poll_pending_message_load(ctx.ndb); 2791 2792 // Handle global keybindings (when no text input has focus) 2793 let has_pending_permission = self.first_pending_permission().is_some(); 2794 let has_pending_question = self.has_pending_question(); 2795 let in_tentative_state = self 2796 .session_manager 2797 .get_active() 2798 .and_then(|s| s.agentic.as_ref()) 2799 .map(|a| a.permission_message_state != crate::session::PermissionMessageState::None) 2800 .unwrap_or(false); 2801 let active_ai_mode = self 2802 .session_manager 2803 .get_active() 2804 .map(|s| s.ai_mode) 2805 .unwrap_or(self.ai_mode); 2806 if let Some(key_action) = check_keybindings( 2807 egui_ctx, 2808 has_pending_permission, 2809 has_pending_question, 2810 in_tentative_state, 2811 active_ai_mode, 2812 ) { 2813 self.handle_key_action(key_action, egui_ctx); 2814 } 2815 2816 // Check if interrupt confirmation has timed out 2817 self.check_interrupt_timeout(); 2818 2819 // Process incoming AI responses for all sessions 2820 let (sessions_needing_send, events_to_publish) = self.process_events(ctx); 2821 2822 // Build permission response events from remote sessions 2823 self.publish_pending_perm_responses(ctx); 2824 2825 // Build spawn command events (need secret key from AppContext) 2826 if !self.pending_spawn_commands.is_empty() { 2827 if let Some(sk) = secret_key_bytes(ctx.accounts.get_selected_account().keypair()) { 2828 for cmd in std::mem::take(&mut self.pending_spawn_commands) { 2829 match session_events::build_spawn_command_event( 2830 &cmd.target_host, 2831 &cmd.cwd.to_string_lossy(), 2832 cmd.backend.as_str(), 2833 &sk, 2834 ) { 2835 Ok(evt) => self.pending_relay_events.push(evt), 2836 Err(e) => tracing::warn!("failed to build spawn command: {:?}", e), 2837 } 2838 } 2839 } 2840 } 2841 2842 // Build permission mode command events for remote sessions 2843 self.publish_pending_mode_commands(ctx); 2844 2845 // PNS-wrap and publish events to relays 2846 let pending = std::mem::take(&mut self.pending_relay_events); 2847 let all_events = events_to_publish.iter().chain(pending.iter()); 2848 if let Some(sk) = ctx.accounts.get_selected_account().keypair().secret_key { 2849 let pns_keys = enostr::pns::derive_pns_keys(&sk.secret_bytes()); 2850 for event in all_events { 2851 match session_events::wrap_pns(&event.note_json, &pns_keys) { 2852 Ok(pns_json) => match enostr::ClientMessage::event_json(pns_json) { 2853 Ok(msg) => self.pool.send_to(&msg, &self.pns_relay_url), 2854 Err(e) => tracing::warn!("failed to build relay message: {:?}", e), 2855 }, 2856 Err(e) => tracing::warn!("failed to PNS-wrap event: {}", e), 2857 } 2858 } 2859 } 2860 2861 // Poll for remote conversation actions (permission responses, commands). 2862 let mode_applies = self.poll_remote_conversation_actions(ctx.ndb); 2863 for (backend_sid, bt, mode) in mode_applies { 2864 get_backend(&self.backends, bt).set_permission_mode( 2865 backend_sid, 2866 mode, 2867 egui_ctx.clone(), 2868 ); 2869 } 2870 2871 // Poll git status for local agentic sessions 2872 for session in self.session_manager.iter_mut() { 2873 if session.is_remote() { 2874 continue; 2875 } 2876 if let Some(agentic) = &mut session.agentic { 2877 agentic.git_status.poll(); 2878 agentic.git_status.maybe_auto_refresh(); 2879 } 2880 } 2881 2882 // Update all session statuses after processing events 2883 self.session_manager.update_all_statuses(); 2884 2885 // Publish kind-31988 state events for sessions whose status changed 2886 self.publish_dirty_session_states(ctx); 2887 2888 // Publish "deleted" state events for recently deleted sessions 2889 self.publish_pending_deletions(ctx); 2890 2891 // Update focus queue from persisted indicator field 2892 let indicator_iter = self.session_manager.iter().map(|s| (s.id, s.indicator)); 2893 let queue_update = self.focus_queue.update_from_indicators(indicator_iter); 2894 2895 // Vibrate on Android whenever a session transitions to NeedsInput 2896 if queue_update.new_needs_input { 2897 notedeck::platform::try_vibrate(); 2898 } 2899 2900 // Transition to Pending on queue changes so auto-steal retries 2901 // across frames if temporarily suppressed (e.g. user is typing). 2902 if queue_update.changed && self.auto_steal.is_enabled() { 2903 self.auto_steal = focus_queue::AutoStealState::Pending; 2904 } 2905 2906 // Run auto-steal when pending. Transitions back to Idle once 2907 // the steal logic executes (even if no switch was needed). 2908 // Stays Pending while the user is typing so it retries next frame. 2909 if self.auto_steal == focus_queue::AutoStealState::Pending { 2910 let user_is_typing = self 2911 .session_manager 2912 .get_active() 2913 .is_some_and(|s| !s.input.is_empty()); 2914 2915 if !user_is_typing { 2916 let stole_focus = update::process_auto_steal_focus( 2917 &mut self.session_manager, 2918 &mut self.focus_queue, 2919 &mut self.scene, 2920 self.show_scene, 2921 true, 2922 &mut self.home_session, 2923 ); 2924 2925 if stole_focus { 2926 activate_app(egui_ctx); 2927 } 2928 2929 self.auto_steal = focus_queue::AutoStealState::Idle; 2930 } 2931 } 2932 2933 // Send continuation messages for all sessions that have queued messages 2934 for session_id in sessions_needing_send { 2935 tracing::info!( 2936 "Session {}: dispatching queued message via send_user_message_for", 2937 session_id 2938 ); 2939 self.send_user_message_for(session_id, ctx, egui_ctx); 2940 } 2941 } 2942 2943 fn render(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse { 2944 let mut app_action: Option<AppAction> = None; 2945 2946 if let Some(action) = self.ui(ctx, ui).action { 2947 if let Some(returned_action) = self.handle_ui_action(action, ctx, ui) { 2948 app_action = Some(returned_action); 2949 } 2950 } 2951 2952 AppResponse::action(app_action) 2953 } 2954 } 2955 2956 /// Bring the application to the front. 2957 /// 2958 /// On macOS, egui's ViewportCommand::Focus focuses the window but doesn't 2959 /// always activate the app (bring it in front of other apps). Stage Manager 2960 /// single-window mode is particularly aggressive, so we use both 2961 /// NSRunningApplication::activateWithOptions and orderFrontRegardless 2962 /// on the key window. 2963 /// Set up a live conversation subscription for a session if not already subscribed. 2964 /// 2965 /// Subscribes to kind-1988 events tagged with the session's claude ID so we 2966 /// receive messages from remote clients (phone) even before the local backend starts. 2967 pub(crate) fn setup_conversation_subscription( 2968 agentic: &mut session::AgenticSessionData, 2969 claude_session_id: &str, 2970 ndb: &nostrdb::Ndb, 2971 ) { 2972 if agentic.live_conversation_sub.is_some() { 2973 return; 2974 } 2975 let filter = nostrdb::Filter::new() 2976 .kinds([session_events::AI_CONVERSATION_KIND as u64]) 2977 .tags([claude_session_id], 'd') 2978 .build(); 2979 match ndb.subscribe(&[filter]) { 2980 Ok(sub) => { 2981 agentic.live_conversation_sub = Some(sub); 2982 tracing::info!( 2983 "subscribed for live conversation events (session {})", 2984 claude_session_id, 2985 ); 2986 } 2987 Err(e) => { 2988 tracing::warn!("failed to subscribe for conversation events: {:?}", e,); 2989 } 2990 } 2991 } 2992 2993 /// Subscribe for kind-1988 conversation action events (permission responses, 2994 /// mode commands) for the given session d-tag. 2995 pub(crate) fn setup_conversation_action_subscription( 2996 agentic: &mut session::AgenticSessionData, 2997 event_id: &str, 2998 ndb: &nostrdb::Ndb, 2999 ) { 3000 if agentic.conversation_action_sub.is_some() { 3001 return; 3002 } 3003 let filter = nostrdb::Filter::new() 3004 .kinds([session_events::AI_CONVERSATION_KIND as u64]) 3005 .tags([event_id], 'd') 3006 .build(); 3007 match ndb.subscribe(&[filter]) { 3008 Ok(sub) => { 3009 agentic.conversation_action_sub = Some(sub); 3010 tracing::info!("subscribed for conversation actions (session {})", event_id,); 3011 } 3012 Err(e) => { 3013 tracing::warn!("failed to subscribe for conversation actions: {:?}", e); 3014 } 3015 } 3016 } 3017 3018 /// Check if a session state represents a remote session. 3019 /// 3020 /// A session is remote if its hostname differs from the local hostname, 3021 /// or (for old events without hostname) if the cwd doesn't exist locally. 3022 fn is_session_remote(hostname: &str, cwd: &str, local_hostname: &str) -> bool { 3023 (!hostname.is_empty() && hostname != local_hostname) 3024 || (hostname.is_empty() && !std::path::PathBuf::from(cwd).exists()) 3025 } 3026 3027 /// Handle tool calls from the AI backend. 3028 /// 3029 /// Pushes the tool calls to chat, executes each one, and pushes the 3030 /// responses. Returns `true` if any tool produced a response that 3031 /// needs to be sent back to the backend. 3032 fn handle_tool_calls( 3033 session: &mut session::ChatSession, 3034 toolcalls: &[ToolCall], 3035 ndb: &nostrdb::Ndb, 3036 ) -> bool { 3037 tracing::info!("got tool calls: {:?}", toolcalls); 3038 session.chat.push(Message::ToolCalls(toolcalls.to_vec())); 3039 3040 let txn = Transaction::new(ndb).unwrap(); 3041 let mut needs_send = false; 3042 3043 for call in toolcalls { 3044 match call.calls() { 3045 ToolCalls::PresentNotes(present) => { 3046 session.chat.push(Message::ToolResponse(ToolResponse::new( 3047 call.id().to_owned(), 3048 ToolResponses::PresentNotes(present.note_ids.len() as i32), 3049 ))); 3050 needs_send = true; 3051 } 3052 ToolCalls::Invalid(invalid) => { 3053 session.chat.push(Message::tool_error( 3054 call.id().to_string(), 3055 invalid.error.clone(), 3056 )); 3057 needs_send = true; 3058 } 3059 ToolCalls::Query(search_call) => { 3060 let resp = search_call.execute(&txn, ndb); 3061 session.chat.push(Message::ToolResponse(ToolResponse::new( 3062 call.id().to_owned(), 3063 ToolResponses::Query(resp), 3064 ))); 3065 needs_send = true; 3066 } 3067 } 3068 } 3069 3070 needs_send 3071 } 3072 3073 /// Handle a permission request from the AI backend. 3074 /// 3075 /// Builds and publishes a permission request event for remote clients, 3076 /// stores the response sender for later, and adds the request to chat. 3077 fn handle_permission_request( 3078 session: &mut session::ChatSession, 3079 pending: messages::PendingPermission, 3080 secret_key: &Option<[u8; 32]>, 3081 ndb: &nostrdb::Ndb, 3082 events_to_publish: &mut Vec<session_events::BuiltEvent>, 3083 ) { 3084 tracing::info!( 3085 "Permission request for tool '{}': {:?}", 3086 pending.request.tool_name, 3087 pending.request.tool_input 3088 ); 3089 3090 // Check runtime allowlist — auto-accept and show as already-allowed in chat 3091 if let Some(agentic) = &session.agentic { 3092 if agentic.should_runtime_allow(&pending.request.tool_name, &pending.request.tool_input) { 3093 tracing::info!( 3094 "runtime allow: auto-accepting '{}' for this session", 3095 pending.request.tool_name, 3096 ); 3097 let _ = pending 3098 .response_tx 3099 .send(PermissionResponse::Allow { message: None }); 3100 let mut request = pending.request; 3101 request.response = Some(crate::messages::PermissionResponseType::Allowed); 3102 session.chat.push(Message::PermissionRequest(request)); 3103 return; 3104 } 3105 } 3106 3107 // Build and publish a proper permission request event 3108 // with perm-id, tool-name tags for remote clients 3109 if let Some(sk) = secret_key { 3110 let event_session_id = session 3111 .agentic 3112 .as_ref() 3113 .map(|a| a.event_session_id().to_string()); 3114 3115 if let Some(sid) = event_session_id { 3116 match session_events::build_permission_request_event( 3117 &pending.request.id, 3118 &pending.request.tool_name, 3119 &pending.request.tool_input, 3120 &sid, 3121 sk, 3122 ) { 3123 Ok(evt) => { 3124 pns_ingest(ndb, &evt.note_json, sk); 3125 if let Some(agentic) = &mut session.agentic { 3126 agentic 3127 .permissions 3128 .request_note_ids 3129 .insert(pending.request.id, evt.note_id); 3130 } 3131 events_to_publish.push(evt); 3132 } 3133 Err(e) => { 3134 tracing::warn!("failed to build permission request event: {}", e); 3135 } 3136 } 3137 } 3138 } 3139 3140 // Store the response sender for later (agentic only) 3141 if let Some(agentic) = &mut session.agentic { 3142 agentic 3143 .permissions 3144 .pending 3145 .insert(pending.request.id, pending.response_tx); 3146 } 3147 3148 // Add the request to chat for UI display 3149 session 3150 .chat 3151 .push(Message::PermissionRequest(pending.request)); 3152 } 3153 3154 /// Handle a remote permission request from a kind-1988 conversation event. 3155 /// Checks runtime allowlist for auto-accept, otherwise adds to chat for UI display. 3156 fn handle_remote_permission_request( 3157 note: &nostrdb::Note, 3158 content: &str, 3159 agentic: &mut session::AgenticSessionData, 3160 chat: &mut Vec<Message>, 3161 secret_key: Option<&[u8; 32]>, 3162 events_to_publish: &mut Vec<session_events::BuiltEvent>, 3163 ) { 3164 let Ok(content_json) = serde_json::from_str::<serde_json::Value>(content) else { 3165 return; 3166 }; 3167 let tool_name = content_json["tool_name"] 3168 .as_str() 3169 .unwrap_or("unknown") 3170 .to_string(); 3171 let tool_input = content_json 3172 .get("tool_input") 3173 .cloned() 3174 .unwrap_or(serde_json::Value::Null); 3175 let perm_id = session_events::get_tag_value(note, "perm-id") 3176 .and_then(|s| uuid::Uuid::parse_str(s).ok()) 3177 .unwrap_or_else(uuid::Uuid::new_v4); 3178 3179 // Store the note ID for linking responses 3180 agentic 3181 .permissions 3182 .request_note_ids 3183 .insert(perm_id, *note.id()); 3184 3185 // Runtime allowlist auto-accept 3186 if agentic.should_runtime_allow(&tool_name, &tool_input) { 3187 tracing::info!( 3188 "runtime allow: auto-accepting remote '{}' for this session", 3189 tool_name, 3190 ); 3191 agentic.permissions.responded.insert(perm_id); 3192 if let Some(sk) = secret_key { 3193 let sid = agentic.event_session_id(); 3194 if let Ok(evt) = session_events::build_permission_response_event( 3195 &perm_id, 3196 note.id(), 3197 true, 3198 None, 3199 sid, 3200 sk, 3201 ) { 3202 events_to_publish.push(evt); 3203 } 3204 } 3205 chat.push(Message::PermissionRequest( 3206 crate::messages::PermissionRequest { 3207 id: perm_id, 3208 tool_name, 3209 tool_input, 3210 response: Some(crate::messages::PermissionResponseType::Allowed), 3211 answer_summary: None, 3212 cached_plan: None, 3213 }, 3214 )); 3215 return; 3216 } 3217 3218 // Check if we already responded 3219 let response = if agentic.permissions.responded.contains(&perm_id) { 3220 Some(crate::messages::PermissionResponseType::Allowed) 3221 } else { 3222 None 3223 }; 3224 3225 // Parse plan markdown for ExitPlanMode requests 3226 let cached_plan = if tool_name == "ExitPlanMode" { 3227 tool_input 3228 .get("plan") 3229 .and_then(|v| v.as_str()) 3230 .map(crate::messages::ParsedMarkdown::parse) 3231 } else { 3232 None 3233 }; 3234 3235 chat.push(Message::PermissionRequest( 3236 crate::messages::PermissionRequest { 3237 id: perm_id, 3238 tool_name, 3239 tool_input, 3240 response, 3241 answer_summary: None, 3242 cached_plan, 3243 }, 3244 )); 3245 } 3246 3247 /// Handle a remote permission response from a kind-1988 event. 3248 fn handle_remote_permission_response( 3249 note: &nostrdb::Note, 3250 agentic: &mut session::AgenticSessionData, 3251 chat: &mut [Message], 3252 ) { 3253 let Some(perm_id_str) = session_events::get_tag_value(note, "perm-id") else { 3254 tracing::warn!("permission_response event missing perm-id tag"); 3255 return; 3256 }; 3257 let Ok(perm_id) = uuid::Uuid::parse_str(perm_id_str) else { 3258 tracing::warn!("invalid perm-id UUID: {}", perm_id_str); 3259 return; 3260 }; 3261 3262 let content = note.content(); 3263 let (allowed, message) = match serde_json::from_str::<serde_json::Value>(content) { 3264 Ok(v) => { 3265 let decision = v.get("decision").and_then(|d| d.as_str()).unwrap_or("deny"); 3266 let msg = v 3267 .get("message") 3268 .and_then(|m| m.as_str()) 3269 .filter(|s| !s.is_empty()) 3270 .map(|s| s.to_string()); 3271 (decision == "allow", msg) 3272 } 3273 Err(_) => (false, None), 3274 }; 3275 3276 if let Some(sender) = agentic.permissions.pending.remove(&perm_id) { 3277 let response = if allowed { 3278 PermissionResponse::Allow { message } 3279 } else { 3280 PermissionResponse::Deny { 3281 reason: message.unwrap_or_else(|| "Denied by remote".to_string()), 3282 } 3283 }; 3284 3285 let response_type = if allowed { 3286 crate::messages::PermissionResponseType::Allowed 3287 } else { 3288 crate::messages::PermissionResponseType::Denied 3289 }; 3290 for msg in chat.iter_mut() { 3291 if let Message::PermissionRequest(req) = msg { 3292 if req.id == perm_id { 3293 req.response = Some(response_type); 3294 break; 3295 } 3296 } 3297 } 3298 3299 if sender.send(response).is_err() { 3300 tracing::warn!("failed to send remote permission response for {}", perm_id); 3301 } else { 3302 tracing::info!( 3303 "remote permission response for {}: {}", 3304 perm_id, 3305 if allowed { "allowed" } else { "denied" } 3306 ); 3307 } 3308 } 3309 } 3310 3311 /// Handle a tool result (execution metadata) from the AI backend. 3312 /// 3313 /// Invalidates git status after file-modifying tools, then either folds 3314 /// the result into a subagent or pushes it as a standalone tool response. 3315 fn handle_tool_result(session: &mut session::ChatSession, result: ExecutedTool) { 3316 tracing::debug!("Tool result: {} - {}", result.tool_name, result.summary); 3317 3318 if matches!(result.tool_name.as_str(), "Bash" | "Write" | "Edit") { 3319 if let Some(agentic) = &mut session.agentic { 3320 agentic.git_status.invalidate(); 3321 } 3322 } 3323 if let Some(result) = session.fold_tool_result(result) { 3324 session 3325 .chat 3326 .push(Message::ToolResponse(ToolResponse::executed_tool(result))); 3327 } 3328 } 3329 3330 /// Handle a subagent spawn event from the AI backend. 3331 fn handle_subagent_spawned(session: &mut session::ChatSession, subagent: SubagentInfo) { 3332 tracing::debug!( 3333 "Subagent spawned: {} ({}) - {}", 3334 subagent.task_id, 3335 subagent.subagent_type, 3336 subagent.description 3337 ); 3338 let task_id = subagent.task_id.clone(); 3339 let idx = session.chat.len(); 3340 session.chat.push(Message::Subagent(subagent)); 3341 if let Some(agentic) = &mut session.agentic { 3342 agentic.subagent_indices.insert(task_id, idx); 3343 } 3344 } 3345 3346 /// Handle compaction completion from the AI backend. 3347 /// 3348 /// Updates agentic state, advances compact-and-proceed if waiting, 3349 /// and pushes the compaction info to chat. 3350 fn handle_compaction_complete( 3351 session: &mut session::ChatSession, 3352 session_id: SessionId, 3353 info: messages::CompactionInfo, 3354 ) { 3355 tracing::debug!( 3356 "Compaction completed for session {}: pre_tokens={}", 3357 session_id, 3358 info.pre_tokens 3359 ); 3360 if let Some(agentic) = &mut session.agentic { 3361 agentic.is_compacting = false; 3362 agentic.last_compaction = Some(info.clone()); 3363 3364 if agentic.compact_and_proceed 3365 == crate::session::CompactAndProceedState::WaitingForCompaction 3366 { 3367 agentic.compact_and_proceed = crate::session::CompactAndProceedState::ReadyToProceed; 3368 } 3369 } 3370 session.chat.push(Message::CompactionComplete(info)); 3371 } 3372 3373 /// Handle query completion (usage metrics) from the AI backend. 3374 fn handle_query_complete(session: &mut session::ChatSession, info: messages::UsageInfo) { 3375 if let Some(agentic) = &mut session.agentic { 3376 agentic.usage.input_tokens = info.input_tokens; 3377 agentic.usage.output_tokens = info.output_tokens; 3378 agentic.usage.num_turns = info.num_turns; 3379 if let Some(cost) = info.cost_usd { 3380 agentic.usage.cost_usd = Some(cost); 3381 } 3382 } 3383 } 3384 3385 /// Handle a SessionInfo response from the AI backend. 3386 /// 3387 /// Sets up ndb subscriptions for permission responses and conversation events 3388 /// when we first learn the claude session ID. 3389 fn handle_session_info(session: &mut session::ChatSession, info: SessionInfo, ndb: &nostrdb::Ndb) { 3390 if let Some(agentic) = &mut session.agentic { 3391 // Use the stable event_id (not the CLI session ID) for subscriptions, 3392 // since all live events are tagged with event_id as the d-tag. 3393 let event_id = agentic.event_session_id().to_string(); 3394 setup_conversation_action_subscription(agentic, &event_id, ndb); 3395 setup_conversation_subscription(agentic, &event_id, ndb); 3396 3397 agentic.session_info = Some(info); 3398 } 3399 // Persist initial session state now that we know the claude_session_id 3400 session.state_dirty = true; 3401 } 3402 3403 /// Handle stream-end for a session after the AI backend disconnects. 3404 /// 3405 /// Finalizes the assistant message, publishes the live event, 3406 /// and checks whether queued messages need redispatch. 3407 fn handle_stream_end( 3408 session: &mut session::ChatSession, 3409 session_id: SessionId, 3410 secret_key: &Option<[u8; 32]>, 3411 ndb: &nostrdb::Ndb, 3412 events_to_publish: &mut Vec<session_events::BuiltEvent>, 3413 needs_send: &mut HashSet<SessionId>, 3414 ) { 3415 session.finalize_last_assistant(); 3416 3417 // Generate live event for the finalized assistant message 3418 if let Some(sk) = secret_key { 3419 if let Some(text) = session.last_assistant_text() { 3420 if let Some(evt) = ingest_live_event(session, ndb, sk, &text, "assistant", None, None) { 3421 events_to_publish.push(evt); 3422 } 3423 } 3424 } 3425 3426 session.task_handle = None; 3427 3428 // If the backend returned nothing (dispatch_state never left 3429 // AwaitingResponse), show an error so the user isn't left staring 3430 // at silence. 3431 if matches!( 3432 session.dispatch_state, 3433 session::DispatchState::AwaitingResponse { .. } 3434 ) && session.last_assistant_text().is_none() 3435 { 3436 tracing::warn!("Session {}: backend returned empty response", session_id); 3437 session 3438 .chat 3439 .push(Message::Error("No response from backend".into())); 3440 } 3441 3442 // Check redispatch BEFORE resetting dispatch_state — the check 3443 // reads the state to distinguish empty responses from new messages. 3444 if session.needs_redispatch_after_stream_end() { 3445 tracing::info!( 3446 "Session {}: redispatching queued user message after stream end", 3447 session_id 3448 ); 3449 needs_send.insert(session_id); 3450 } 3451 3452 session.dispatch_state.stream_ended(); 3453 3454 // After compact & approve: compaction must have completed 3455 // (ReadyToProceed) before we send "Proceed". 3456 if session.take_compact_and_proceed() { 3457 needs_send.insert(session_id); 3458 } 3459 } 3460 3461 fn activate_app(ctx: &egui::Context) { 3462 ctx.send_viewport_cmd(egui::ViewportCommand::Focus); 3463 3464 #[cfg(target_os = "macos")] 3465 { 3466 use objc2::MainThreadMarker; 3467 use objc2_app_kit::{NSApplication, NSApplicationActivationOptions, NSRunningApplication}; 3468 3469 // Safety: UI update runs on the main thread 3470 if let Some(mtm) = MainThreadMarker::new() { 3471 let app = NSApplication::sharedApplication(mtm); 3472 3473 // Activate via NSRunningApplication for per-process activation 3474 let current = unsafe { NSRunningApplication::currentApplication() }; 3475 unsafe { 3476 current.activateWithOptions(NSApplicationActivationOptions::ActivateAllWindows); 3477 }; 3478 3479 // Also force the key window to front regardless of Stage Manager 3480 if let Some(window) = app.keyWindow() { 3481 unsafe { window.orderFrontRegardless() }; 3482 } 3483 } 3484 } 3485 }