notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 59965709ee1b8922e9b271c472b5a40c7f6895e5
parent c5930c56a242741373e19ef150acf30f2fc11b4c
Author: William Casarin <jb55@jb55.com>
Date:   Wed, 25 Feb 2026 12:22:17 -0800

dave: break up Dave::update() and Dave::process_events()

Extract large blocks from the two biggest methods in lib.rs into
focused methods and standalone functions:

update() (412 → 174 lines):
- process_negentropy_sync(): relay event processing and neg sync
- initialize_once(): first-update session restore and subscriptions
- process_archive_conversion(): JSONL to nostr event conversion
- poll_pending_message_load(): wait for ndb to index archive events

process_events() (387 → 302 lines):
- handle_session_info(): ndb subscription setup on session ID discovery
- handle_stream_end(): finalize messages and check redispatch on disconnect

Also removes the process_pns thread spawn from initialization.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Diffstat:
Mcrates/notedeck_dave/src/lib.rs | 562++++++++++++++++++++++++++++++++++++++++---------------------------------------
1 file changed, 288 insertions(+), 274 deletions(-)

diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs @@ -799,59 +799,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr info.tools.len(), info.agents.len() ); - // Set up subscriptions when we learn the claude session ID - if let Some(agentic) = &mut session.agentic { - if let Some(ref csid) = info.claude_session_id { - // Permission response subscription (filtered to ai-permission tag) - if agentic.perm_response_sub.is_none() { - let filter = nostrdb::Filter::new() - .kinds([session_events::AI_CONVERSATION_KIND as u64]) - .tags([csid.as_str()], 'd') - .tags(["ai-permission"], 't') - .build(); - match app_ctx.ndb.subscribe(&[filter]) { - Ok(sub) => { - tracing::info!( - "subscribed for remote permission responses (session {})", - csid - ); - agentic.perm_response_sub = Some(sub); - } - Err(e) => { - tracing::warn!( - "failed to subscribe for permission responses: {:?}", - e - ); - } - } - } - // Conversation subscription for incoming remote user messages - if agentic.live_conversation_sub.is_none() { - let filter = nostrdb::Filter::new() - .kinds([session_events::AI_CONVERSATION_KIND as u64]) - .tags([csid.as_str()], 'd') - .build(); - match app_ctx.ndb.subscribe(&[filter]) { - Ok(sub) => { - tracing::info!( - "subscribed for conversation events (session {})", - csid - ); - agentic.live_conversation_sub = Some(sub); - } - Err(e) => { - tracing::warn!( - "failed to subscribe for conversation events: {:?}", - e - ); - } - } - } - } - agentic.session_info = Some(info); - } - // Persist initial session state now that we know the claude_session_id - session.state_dirty = true; + handle_session_info(session, info, app_ctx.ndb); } DaveApiResponse::SubagentSpawned(subagent) => { @@ -923,49 +871,15 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr // Check if channel is disconnected (stream ended) match recvr.try_recv() { Err(std::sync::mpsc::TryRecvError::Disconnected) => { - // Stream ended, clear task state if let Some(session) = self.session_manager.get_mut(session_id) { - // Finalize the last assistant message to cache parsed - // elements. It may not be chat.last() if user messages - // were queued after it. - session.finalize_last_assistant(); - - // Generate live event for the finalized assistant message - if let Some(sk) = &secret_key { - if let Some(text) = session.last_assistant_text() { - if let Some(evt) = ingest_live_event( - session, - app_ctx.ndb, - sk, - &text, - "assistant", - None, - None, - ) { - events_to_publish.push(evt); - } - } - } - - session.task_handle = None; - // Don't restore incoming_tokens - leave it None - - // If chat ends with a user message, there's an - // unanswered remote message that arrived while we - // were streaming. Queue it for dispatch. - if session.needs_redispatch_after_stream_end() { - tracing::info!( - "Session {}: redispatching queued user message after stream end", - session_id - ); - needs_send.insert(session_id); - } - - // After compact & approve: compaction must have - // completed (ReadyToProceed) before we send "Proceed". - if session.take_compact_and_proceed() { - needs_send.insert(session_id); - } + handle_stream_end( + session, + session_id, + &secret_key, + app_ctx.ndb, + &mut events_to_publish, + &mut needs_send, + ); } } _ => { @@ -2470,14 +2384,129 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr session.incoming_tokens = Some(rx); session.task_handle = task_handle; } -} -impl notedeck::App for Dave { - fn update(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse { - let mut app_action: Option<AppAction> = None; + /// Process pending archive conversion (JSONL to nostr events). + /// + /// When resuming a session, the JSONL archive needs to be converted to + /// nostr events. If events already exist in ndb, load them directly. + fn process_archive_conversion(&mut self, ctx: &mut AppContext<'_>) { + let Some((file_path, dave_sid, claude_sid)) = self.pending_archive_convert.take() else { + return; + }; + + let txn = Transaction::new(ctx.ndb).expect("txn"); + let filter = nostrdb::Filter::new() + .kinds([session_events::AI_CONVERSATION_KIND as u64]) + .tags([claude_sid.as_str()], 'd') + .limit(1) + .build(); + let already_exists = ctx + .ndb + .query(&txn, &[filter], 1) + .map(|r| !r.is_empty()) + .unwrap_or(false); + drop(txn); - // Process relay events into ndb (needed when dave is the active app). - // Re-send PNS subscription when the relay (re)connects. + if already_exists { + tracing::info!( + "session {} already has events in ndb, skipping archive conversion", + claude_sid + ); + let loaded_txn = Transaction::new(ctx.ndb).expect("txn"); + let loaded = session_loader::load_session_messages(ctx.ndb, &loaded_txn, &claude_sid); + if let Some(session) = self.session_manager.get_mut(dave_sid) { + tracing::info!("loaded {} messages into chat UI", loaded.messages.len()); + session.chat = loaded.messages; + + if let Some(agentic) = &mut session.agentic { + if let (Some(root), Some(last)) = (loaded.root_note_id, loaded.last_note_id) { + agentic.live_threading.seed(root, last, loaded.event_count); + } + agentic + .permissions + .request_note_ids + .extend(loaded.permissions.request_note_ids); + } + } + } else if let Some(secret_bytes) = + secret_key_bytes(ctx.accounts.get_selected_account().keypair()) + { + let sub_filter = nostrdb::Filter::new() + .kinds([session_events::AI_CONVERSATION_KIND as u64]) + .tags([claude_sid.as_str()], 'd') + .build(); + + match ctx.ndb.subscribe(&[sub_filter]) { + Ok(sub) => { + match session_converter::convert_session_to_events( + &file_path, + ctx.ndb, + &secret_bytes, + ) { + Ok(note_ids) => { + tracing::info!( + "archived session: {} events from {}, awaiting indexing", + note_ids.len(), + file_path.display() + ); + self.pending_message_load = Some(PendingMessageLoad { + sub, + dave_session_id: dave_sid, + claude_session_id: claude_sid, + }); + } + Err(e) => { + tracing::error!("archive conversion failed: {}", e); + } + } + } + Err(e) => { + tracing::error!("failed to subscribe for archive events: {:?}", e); + } + } + } else { + tracing::warn!("no secret key available for archive conversion"); + } + } + + /// Poll for pending message load completion. + /// + /// After archive conversion, wait for ndb to index the kind-1988 events, + /// then load them into the session's chat history. + fn poll_pending_message_load(&mut self, ndb: &nostrdb::Ndb) { + let Some(pending) = &self.pending_message_load else { + return; + }; + + let notes = ndb.poll_for_notes(pending.sub, 4096); + if notes.is_empty() { + return; + } + + let txn = Transaction::new(ndb).expect("txn"); + let loaded = session_loader::load_session_messages(ndb, &txn, &pending.claude_session_id); + if let Some(session) = self.session_manager.get_mut(pending.dave_session_id) { + tracing::info!("loaded {} messages into chat UI", loaded.messages.len()); + session.chat = loaded.messages; + + if let Some(agentic) = &mut session.agentic { + if let (Some(root), Some(last)) = (loaded.root_note_id, loaded.last_note_id) { + agentic.live_threading.seed(root, last, loaded.event_count); + } + agentic + .permissions + .request_note_ids + .extend(loaded.permissions.request_note_ids); + } + } + self.pending_message_load = None; + } + + /// Process relay events and run negentropy reconciliation against PNS relay. + /// + /// Collects negentropy protocol events from the relay, re-subscribes on + /// reconnect, and drives multi-round sync to fetch missing PNS events. + fn process_negentropy_sync(&mut self, ctx: &mut AppContext<'_>, ui: &egui::Ui) { let pns_sub_id = self.pns_relay_sub.clone(); let pns_relay = self.pns_relay_url.clone(); let mut neg_events: Vec<enostr::negentropy::NegEvent> = Vec::new(); @@ -2514,7 +2543,7 @@ impl notedeck::App for Dave { self.neg_sync_round = 0; } - // Negentropy sync: reconcile local events against PNS relay, + // Reconcile local events against PNS relay, // fetch any missing kind-1080 events via standard REQ. if let Some(sk) = ctx.accounts.get_selected_account().keypair().secret_key { let pns_keys = enostr::pns::derive_pns_keys(&sk.secret_bytes()); @@ -2552,6 +2581,68 @@ impl notedeck::App for Dave { ); } } + } + + /// One-time initialization on first update. + /// + /// Restores sessions from ndb, triggers initial negentropy sync, + /// and sets up relay subscriptions. + fn initialize_once(&mut self, ctx: &mut AppContext<'_>, ui: &egui::Ui) { + self.sessions_restored = true; + + self.restore_sessions_from_ndb(ctx); + + // Trigger initial negentropy sync after startup + self.neg_sync.trigger_now(); + self.neg_sync_round = 0; + + // Subscribe to PNS events on relays for session discovery from other devices. + // Also subscribe locally in ndb for kind-31988 session state events + // so we detect new sessions appearing after PNS unwrapping. + if let Some(sk) = ctx.accounts.get_selected_account().keypair().secret_key { + let pns_keys = enostr::pns::derive_pns_keys(&sk.secret_bytes()); + + // Ensure the PNS relay is in the pool + let egui_ctx = ui.ctx().clone(); + let wakeup = move || egui_ctx.request_repaint(); + if let Err(e) = ctx.pool.add_url(self.pns_relay_url.clone(), wakeup) { + tracing::warn!("failed to add PNS relay {}: {:?}", self.pns_relay_url, e); + } + + // Remote: subscribe on PNS relay for kind-1080 authored by our PNS pubkey + let pns_filter = nostrdb::Filter::new() + .kinds([enostr::pns::PNS_KIND as u64]) + .authors([pns_keys.keypair.pubkey.bytes()]) + .limit(500) + .build(); + let sub_id = uuid::Uuid::new_v4().to_string(); + let req = enostr::ClientMessage::req(sub_id.clone(), vec![pns_filter]); + ctx.pool.send_to(&req, &self.pns_relay_url); + self.pns_relay_sub = Some(sub_id); + tracing::info!("subscribed for PNS events on {}", self.pns_relay_url); + + // Local: subscribe in ndb for kind-31988 session state events + let state_filter = nostrdb::Filter::new() + .kinds([session_events::AI_SESSION_STATE_KIND as u64]) + .build(); + match ctx.ndb.subscribe(&[state_filter]) { + Ok(sub) => { + self.session_state_sub = Some(sub); + tracing::info!("subscribed for session state events in ndb"); + } + Err(e) => { + tracing::warn!("failed to subscribe for session state events: {:?}", e); + } + } + } + } +} + +impl notedeck::App for Dave { + fn update(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse { + let mut app_action: Option<AppAction> = None; + + self.process_negentropy_sync(ctx, ui); // Poll for external spawn-agent commands via IPC self.poll_ipc_commands(); @@ -2566,65 +2657,7 @@ impl notedeck::App for Dave { // One-time initialization on first update if !self.sessions_restored { - self.sessions_restored = true; - - // Process any PNS-wrapped events already in ndb - let pns_ndb = ctx.ndb.clone(); - if let Err(e) = std::thread::Builder::new() - .name("process_pns".into()) - .spawn(move || { - let txn = Transaction::new(&pns_ndb).expect("txn"); - pns_ndb.process_pns(&txn); - }) - { - tracing::error!("failed to spawn process_pns thread: {e}"); - } - - self.restore_sessions_from_ndb(ctx); - - // Trigger initial negentropy sync after startup - self.neg_sync.trigger_now(); - self.neg_sync_round = 0; - - // Subscribe to PNS events on relays for session discovery from other devices. - // Also subscribe locally in ndb for kind-31988 session state events - // so we detect new sessions appearing after PNS unwrapping. - if let Some(sk) = ctx.accounts.get_selected_account().keypair().secret_key { - let pns_keys = enostr::pns::derive_pns_keys(&sk.secret_bytes()); - - // Ensure the PNS relay is in the pool - let egui_ctx = ui.ctx().clone(); - let wakeup = move || egui_ctx.request_repaint(); - if let Err(e) = ctx.pool.add_url(self.pns_relay_url.clone(), wakeup) { - tracing::warn!("failed to add PNS relay {}: {:?}", self.pns_relay_url, e); - } - - // Remote: subscribe on PNS relay for kind-1080 authored by our PNS pubkey - let pns_filter = nostrdb::Filter::new() - .kinds([enostr::pns::PNS_KIND as u64]) - .authors([pns_keys.keypair.pubkey.bytes()]) - .limit(500) - .build(); - let sub_id = uuid::Uuid::new_v4().to_string(); - let req = enostr::ClientMessage::req(sub_id.clone(), vec![pns_filter]); - ctx.pool.send_to(&req, &self.pns_relay_url); - self.pns_relay_sub = Some(sub_id); - tracing::info!("subscribed for PNS events on {}", self.pns_relay_url); - - // Local: subscribe in ndb for kind-31988 session state events - let state_filter = nostrdb::Filter::new() - .kinds([session_events::AI_SESSION_STATE_KIND as u64]) - .build(); - match ctx.ndb.subscribe(&[state_filter]) { - Ok(sub) => { - self.session_state_sub = Some(sub); - tracing::info!("subscribed for session state events in ndb"); - } - Err(e) => { - tracing::warn!("failed to subscribe for session state events: {:?}", e); - } - } - } + self.initialize_once(ctx, ui); } // Poll for external editor completion @@ -2652,119 +2685,8 @@ impl notedeck::App for Dave { } } - // Process pending archive conversion (JSONL → nostr events) - if let Some((file_path, dave_sid, claude_sid)) = self.pending_archive_convert.take() { - // Check if events already exist for this session in ndb - let txn = Transaction::new(ctx.ndb).expect("txn"); - let filter = nostrdb::Filter::new() - .kinds([session_events::AI_CONVERSATION_KIND as u64]) - .tags([claude_sid.as_str()], 'd') - .limit(1) - .build(); - let already_exists = ctx - .ndb - .query(&txn, &[filter], 1) - .map(|r| !r.is_empty()) - .unwrap_or(false); - drop(txn); - - if already_exists { - // Events already in ndb (from previous conversion or live events). - // Skip archive conversion and load directly. - tracing::info!( - "session {} already has events in ndb, skipping archive conversion", - claude_sid - ); - let loaded_txn = Transaction::new(ctx.ndb).expect("txn"); - let loaded = - session_loader::load_session_messages(ctx.ndb, &loaded_txn, &claude_sid); - if let Some(session) = self.session_manager.get_mut(dave_sid) { - tracing::info!("loaded {} messages into chat UI", loaded.messages.len()); - session.chat = loaded.messages; - - if let Some(agentic) = &mut session.agentic { - if let (Some(root), Some(last)) = (loaded.root_note_id, loaded.last_note_id) - { - agentic.live_threading.seed(root, last, loaded.event_count); - } - agentic - .permissions - .request_note_ids - .extend(loaded.permissions.request_note_ids); - } - } - } else if let Some(secret_bytes) = - secret_key_bytes(ctx.accounts.get_selected_account().keypair()) - { - // Subscribe for 1988 events BEFORE ingesting so we catch them - let sub_filter = nostrdb::Filter::new() - .kinds([session_events::AI_CONVERSATION_KIND as u64]) - .tags([claude_sid.as_str()], 'd') - .build(); - - match ctx.ndb.subscribe(&[sub_filter]) { - Ok(sub) => { - match session_converter::convert_session_to_events( - &file_path, - ctx.ndb, - &secret_bytes, - ) { - Ok(note_ids) => { - tracing::info!( - "archived session: {} events from {}, awaiting indexing", - note_ids.len(), - file_path.display() - ); - self.pending_message_load = Some(PendingMessageLoad { - sub, - dave_session_id: dave_sid, - claude_session_id: claude_sid, - }); - } - Err(e) => { - tracing::error!("archive conversion failed: {}", e); - } - } - } - Err(e) => { - tracing::error!("failed to subscribe for archive events: {:?}", e); - } - } - } else { - tracing::warn!("no secret key available for archive conversion"); - } - } - - // Poll pending message load — wait for ndb to index 1988 events - if let Some(pending) = &self.pending_message_load { - let notes = ctx.ndb.poll_for_notes(pending.sub, 4096); - if !notes.is_empty() { - let txn = Transaction::new(ctx.ndb).expect("txn"); - let loaded = session_loader::load_session_messages( - ctx.ndb, - &txn, - &pending.claude_session_id, - ); - if let Some(session) = self.session_manager.get_mut(pending.dave_session_id) { - tracing::info!("loaded {} messages into chat UI", loaded.messages.len()); - session.chat = loaded.messages; - - // Seed live threading from archive events so new events - // thread as replies to the existing conversation. - if let Some(agentic) = &mut session.agentic { - if let (Some(root), Some(last)) = (loaded.root_note_id, loaded.last_note_id) - { - agentic.live_threading.seed(root, last, loaded.event_count); - } - agentic - .permissions - .request_note_ids - .extend(loaded.permissions.request_note_ids); - } - } - self.pending_message_load = None; - } - } + self.process_archive_conversion(ctx); + self.poll_pending_message_load(ctx.ndb); // Handle global keybindings (when no text input has focus) let has_pending_permission = self.first_pending_permission().is_some(); @@ -2900,6 +2822,98 @@ impl notedeck::App for Dave { /// single-window mode is particularly aggressive, so we use both /// NSRunningApplication::activateWithOptions and orderFrontRegardless /// on the key window. +/// Handle a SessionInfo response from the AI backend. +/// +/// Sets up ndb subscriptions for permission responses and conversation events +/// when we first learn the claude session ID. +fn handle_session_info(session: &mut session::ChatSession, info: SessionInfo, ndb: &nostrdb::Ndb) { + if let Some(agentic) = &mut session.agentic { + if let Some(ref csid) = info.claude_session_id { + // Permission response subscription (filtered to ai-permission tag) + if agentic.perm_response_sub.is_none() { + let filter = nostrdb::Filter::new() + .kinds([session_events::AI_CONVERSATION_KIND as u64]) + .tags([csid.as_str()], 'd') + .tags(["ai-permission"], 't') + .build(); + match ndb.subscribe(&[filter]) { + Ok(sub) => { + tracing::info!( + "subscribed for remote permission responses (session {})", + csid + ); + agentic.perm_response_sub = Some(sub); + } + Err(e) => { + tracing::warn!("failed to subscribe for permission responses: {:?}", e); + } + } + } + // Conversation subscription for incoming remote user messages + if agentic.live_conversation_sub.is_none() { + let filter = nostrdb::Filter::new() + .kinds([session_events::AI_CONVERSATION_KIND as u64]) + .tags([csid.as_str()], 'd') + .build(); + match ndb.subscribe(&[filter]) { + Ok(sub) => { + tracing::info!("subscribed for conversation events (session {})", csid); + agentic.live_conversation_sub = Some(sub); + } + Err(e) => { + tracing::warn!("failed to subscribe for conversation events: {:?}", e); + } + } + } + } + agentic.session_info = Some(info); + } + // Persist initial session state now that we know the claude_session_id + session.state_dirty = true; +} + +/// Handle stream-end for a session after the AI backend disconnects. +/// +/// Finalizes the assistant message, publishes the live event, +/// and checks whether queued messages need redispatch. +fn handle_stream_end( + session: &mut session::ChatSession, + session_id: SessionId, + secret_key: &Option<[u8; 32]>, + ndb: &nostrdb::Ndb, + events_to_publish: &mut Vec<session_events::BuiltEvent>, + needs_send: &mut HashSet<SessionId>, +) { + session.finalize_last_assistant(); + + // Generate live event for the finalized assistant message + if let Some(sk) = secret_key { + if let Some(text) = session.last_assistant_text() { + if let Some(evt) = ingest_live_event(session, ndb, sk, &text, "assistant", None, None) { + events_to_publish.push(evt); + } + } + } + + session.task_handle = None; + + // If chat ends with a user message, there's an unanswered remote message + // that arrived while we were streaming. Queue it for dispatch. + if session.needs_redispatch_after_stream_end() { + tracing::info!( + "Session {}: redispatching queued user message after stream end", + session_id + ); + needs_send.insert(session_id); + } + + // After compact & approve: compaction must have completed + // (ReadyToProceed) before we send "Proceed". + if session.take_compact_and_proceed() { + needs_send.insert(session_id); + } +} + fn activate_app(ctx: &egui::Context) { ctx.send_viewport_cmd(egui::ViewportCommand::Focus);