notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 59f8ae7a2322eb1c7ead4565f4d1623cc0f2f2da
parent 3ec93aa772ec97f55990400e44be1cd413996018
Author: William Casarin <jb55@jb55.com>
Date:   Mon, 26 Jan 2026 13:21:19 -0800

dave: implement persistent ClaudeClient per session with actor pattern

Each session now gets its own long-running tokio task that exclusively
owns a ClaudeClient. Communication happens via channels instead of
mutexes. The connection is established once per session and reused for
all subsequent queries via query_with_session().

This fixes the issue where the second message would hang after using
the --resume flag, since we now maintain a persistent subprocess
connection instead of spawning a new process per query.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

Diffstat:
MCargo.lock | 1+
Mcrates/notedeck_dave/Cargo.toml | 1+
Mcrates/notedeck_dave/src/backend/claude.rs | 574+++++++++++++++++++++++++++++++++++++++++++++++--------------------------------
Mcrates/notedeck_dave/src/backend/openai.rs | 5+++++
Mcrates/notedeck_dave/src/backend/traits.rs | 4++++
Mcrates/notedeck_dave/src/lib.rs | 15++++++++++++---
6 files changed, 367 insertions(+), 233 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4122,6 +4122,7 @@ dependencies = [ "bytemuck", "chrono", "claude-agent-sdk-rs", + "dashmap", "eframe", "egui", "egui-wgpu", diff --git a/crates/notedeck_dave/Cargo.toml b/crates/notedeck_dave/Cargo.toml @@ -24,6 +24,7 @@ rand = "0.9.0" uuid = { version = "1", features = ["v4"] } bytemuck = "1.22.0" futures = "0.3.31" +dashmap = "6" #reqwest = "0.12.15" egui_extras = { workspace = true } diff --git a/crates/notedeck_dave/src/backend/claude.rs b/crates/notedeck_dave/src/backend/claude.rs @@ -8,21 +8,44 @@ use claude_agent_sdk_rs::{ ClaudeAgentOptions, ClaudeClient, ContentBlock, Message as ClaudeMessage, PermissionMode, PermissionResult, PermissionResultAllow, PermissionResultDeny, ToolUseBlock, }; +use dashmap::DashMap; use futures::future::BoxFuture; use futures::StreamExt; use std::collections::HashMap; use std::sync::mpsc; use std::sync::Arc; +use tokio::sync::mpsc as tokio_mpsc; use tokio::sync::oneshot; use uuid::Uuid; +/// Commands sent to a session's actor task +enum SessionCommand { + Query { + prompt: String, + response_tx: mpsc::Sender<DaveApiResponse>, + ctx: egui::Context, + }, + Shutdown, +} + +/// Handle to a session's actor +struct SessionHandle { + command_tx: tokio_mpsc::Sender<SessionCommand>, +} + pub struct ClaudeBackend { + #[allow(dead_code)] // May be used in the future for API key validation api_key: String, + /// Registry of active sessions (using dashmap for lock-free access) + sessions: DashMap<String, SessionHandle>, } impl ClaudeBackend { pub fn new(api_key: String) -> Self { - Self { api_key } + Self { + api_key, + sessions: DashMap::new(), + } } /// Convert our messages to a prompt for Claude Code @@ -78,6 +101,275 @@ impl ClaudeBackend { } } +/// Permission request forwarded from the callback to the actor +struct PermissionRequestInternal { + tool_name: String, + tool_input: serde_json::Value, + response_tx: oneshot::Sender<PermissionResult>, +} + +/// Session actor task that owns a single ClaudeClient with persistent connection +async fn session_actor(session_id: String, mut command_rx: tokio_mpsc::Receiver<SessionCommand>) { + // Permission channel - the callback sends to perm_tx, actor receives on perm_rx + let (perm_tx, mut perm_rx) = tokio_mpsc::channel::<PermissionRequestInternal>(16); + + // Create the can_use_tool callback that forwards to our permission channel + let can_use_tool: Arc< + dyn Fn( + String, + serde_json::Value, + claude_agent_sdk_rs::ToolPermissionContext, + ) -> BoxFuture<'static, PermissionResult> + + Send + + Sync, + > = Arc::new({ + let perm_tx = perm_tx.clone(); + move |tool_name: String, + tool_input: serde_json::Value, + _context: claude_agent_sdk_rs::ToolPermissionContext| { + let perm_tx = perm_tx.clone(); + Box::pin(async move { + let (resp_tx, resp_rx) = oneshot::channel(); + if perm_tx + .send(PermissionRequestInternal { + tool_name: tool_name.clone(), + tool_input, + response_tx: resp_tx, + }) + .await + .is_err() + { + return PermissionResult::Deny(PermissionResultDeny { + message: "Session actor channel closed".to_string(), + interrupt: true, + }); + } + // Wait for response from session actor (which forwards from UI) + match resp_rx.await { + Ok(result) => result, + Err(_) => PermissionResult::Deny(PermissionResultDeny { + message: "Permission response cancelled".to_string(), + interrupt: true, + }), + } + }) + } + }); + + // A stderr callback to prevent the subprocess from blocking + let stderr_callback = Arc::new(|msg: String| { + tracing::trace!("Claude CLI stderr: {}", msg); + }); + + // Create client once - this maintains the persistent connection + let options = ClaudeAgentOptions::builder() + .permission_mode(PermissionMode::Default) + .stderr_callback(stderr_callback) + .can_use_tool(can_use_tool) + .include_partial_messages(true) + .build(); + let mut client = ClaudeClient::new(options); + + // Connect once - this starts the subprocess + if let Err(err) = client.connect().await { + tracing::error!("Session {} failed to connect: {}", session_id, err); + // Process any pending commands to report the error + while let Some(cmd) = command_rx.recv().await { + if let SessionCommand::Query { + ref response_tx, .. + } = cmd + { + let _ = response_tx.send(DaveApiResponse::Failed(format!( + "Failed to connect to Claude: {}", + err + ))); + } + if matches!(cmd, SessionCommand::Shutdown) { + break; + } + } + return; + } + + tracing::debug!("Session {} connected successfully", session_id); + + // Process commands + while let Some(cmd) = command_rx.recv().await { + match cmd { + SessionCommand::Query { + prompt, + response_tx, + ctx, + } => { + // Send query using session_id for context + if let Err(err) = client.query_with_session(&prompt, &session_id).await { + tracing::error!("Session {} query error: {}", session_id, err); + let _ = response_tx.send(DaveApiResponse::Failed(err.to_string())); + continue; + } + + // Track pending tool uses: tool_use_id -> (tool_name, tool_input) + let mut pending_tools: HashMap<String, (String, serde_json::Value)> = + HashMap::new(); + + // Stream response with select! to handle both stream and permission requests + let mut stream = client.receive_response(); + let mut stream_done = false; + + while !stream_done { + tokio::select! { + biased; + + // Handle permission requests first (they're blocking the SDK) + Some(perm_req) = perm_rx.recv() => { + // Forward permission request to UI + let request_id = Uuid::new_v4(); + let (ui_resp_tx, ui_resp_rx) = oneshot::channel(); + + let request = PermissionRequest { + id: request_id, + tool_name: perm_req.tool_name.clone(), + tool_input: perm_req.tool_input.clone(), + response: None, + }; + + let pending = PendingPermission { + request, + response_tx: ui_resp_tx, + }; + + if response_tx.send(DaveApiResponse::PermissionRequest(pending)).is_err() { + tracing::error!("Failed to send permission request to UI"); + let _ = perm_req.response_tx.send(PermissionResult::Deny(PermissionResultDeny { + message: "UI channel closed".to_string(), + interrupt: true, + })); + continue; + } + + ctx.request_repaint(); + + // Spawn task to wait for UI response and forward to callback + let tool_name = perm_req.tool_name.clone(); + let callback_tx = perm_req.response_tx; + tokio::spawn(async move { + let result = match ui_resp_rx.await { + Ok(PermissionResponse::Allow) => { + tracing::debug!("User allowed tool: {}", tool_name); + PermissionResult::Allow(PermissionResultAllow::default()) + } + Ok(PermissionResponse::Deny { reason }) => { + tracing::debug!("User denied tool {}: {}", tool_name, reason); + PermissionResult::Deny(PermissionResultDeny { + message: reason, + interrupt: false, + }) + } + Err(_) => { + tracing::error!("Permission response channel closed"); + PermissionResult::Deny(PermissionResultDeny { + message: "Permission request cancelled".to_string(), + interrupt: true, + }) + } + }; + let _ = callback_tx.send(result); + }); + } + + stream_result = stream.next() => { + match stream_result { + Some(Ok(message)) => { + match message { + ClaudeMessage::Assistant(assistant_msg) => { + for block in &assistant_msg.message.content { + if let ContentBlock::ToolUse(ToolUseBlock { id, name, input }) = block { + pending_tools.insert(id.clone(), (name.clone(), input.clone())); + } + } + } + ClaudeMessage::StreamEvent(event) => { + if let Some(event_type) = event.event.get("type").and_then(|v| v.as_str()) { + if event_type == "content_block_delta" { + if let Some(text) = event + .event + .get("delta") + .and_then(|d| d.get("text")) + .and_then(|t| t.as_str()) + { + if response_tx.send(DaveApiResponse::Token(text.to_string())).is_err() { + tracing::error!("Failed to send token to UI"); + // Setting stream_done isn't needed since we break immediately + break; + } + ctx.request_repaint(); + } + } + } + } + ClaudeMessage::Result(result_msg) => { + if result_msg.is_error { + let error_text = result_msg + .result + .unwrap_or_else(|| "Unknown error".to_string()); + let _ = response_tx.send(DaveApiResponse::Failed(error_text)); + } + stream_done = true; + } + ClaudeMessage::User(user_msg) => { + if let Some(tool_use_result) = user_msg.extra.get("tool_use_result") { + let tool_use_id = user_msg + .extra + .get("message") + .and_then(|m| m.get("content")) + .and_then(|c| c.as_array()) + .and_then(|arr| arr.first()) + .and_then(|item| item.get("tool_use_id")) + .and_then(|id| id.as_str()); + + if let Some(tool_use_id) = tool_use_id { + if let Some((tool_name, tool_input)) = pending_tools.remove(tool_use_id) { + let summary = format_tool_summary(&tool_name, &tool_input, tool_use_result); + let tool_result = ToolResult { tool_name, summary }; + let _ = response_tx.send(DaveApiResponse::ToolResult(tool_result)); + ctx.request_repaint(); + } + } + } + } + _ => {} + } + } + Some(Err(err)) => { + tracing::error!("Claude stream error: {}", err); + let _ = response_tx.send(DaveApiResponse::Failed(err.to_string())); + stream_done = true; + } + None => { + stream_done = true; + } + } + } + } + } + + tracing::debug!("Query complete for session {}", session_id); + // Don't disconnect - keep the connection alive for subsequent queries + } + SessionCommand::Shutdown => { + tracing::debug!("Session actor {} shutting down", session_id); + break; + } + } + } + + // Disconnect when shutting down + if let Err(err) = client.disconnect().await { + tracing::warn!("Error disconnecting session {}: {}", session_id, err); + } + tracing::debug!("Session {} actor exited", session_id); +} + impl AiBackend for ClaudeBackend { fn stream_request( &self, @@ -85,256 +377,78 @@ impl AiBackend for ClaudeBackend { _tools: Arc<HashMap<String, Tool>>, _model: String, _user_id: String, - _session_id: String, // TODO: Currently unused - --continue resumes last conversation globally + session_id: String, ctx: egui::Context, ) -> ( mpsc::Receiver<DaveApiResponse>, Option<tokio::task::JoinHandle<()>>, ) { - let (tx, rx) = mpsc::channel(); - let _api_key = self.api_key.clone(); + let (response_tx, response_rx) = mpsc::channel(); - let tx_for_callback = tx.clone(); - let ctx_for_callback = ctx.clone(); - - // First message in session = start fresh conversation - // Subsequent messages = use --continue to resume the last conversation - // NOTE: --continue resumes the globally last conversation, not per-session. - // This works for single-conversation use but multiple UI sessions would interfere. - // For proper per-session context, we'd need a persistent ClaudeClient connection. + // Determine if this is the first message in the session let is_first_message = messages .iter() .filter(|m| matches!(m, Message::User(_))) .count() == 1; - let handle = tokio::spawn(async move { - // For first message, send full prompt; for continuation, just the latest message - let prompt = if is_first_message { - Self::messages_to_prompt(&messages) - } else { - Self::get_latest_user_message(&messages) - }; + // For first message, send full prompt; for continuation, just the latest message + let prompt = if is_first_message { + Self::messages_to_prompt(&messages) + } else { + Self::get_latest_user_message(&messages) + }; - tracing::debug!( - "Sending request to Claude Code: is_first={}, prompt length: {}, preview: {:?}", - is_first_message, - prompt.len(), - &prompt[..prompt.len().min(100)] - ); - - // A stderr callback is needed to prevent the subprocess from blocking - // when stderr buffer fills up. We log the output for debugging. - let stderr_callback = |msg: String| { - tracing::trace!("Claude CLI stderr: {}", msg); - }; + tracing::debug!( + "Sending request to Claude Code: session={}, is_first={}, prompt length: {}, preview: {:?}", + session_id, + is_first_message, + prompt.len(), + &prompt[..prompt.len().min(100)] + ); - // Permission callback - sends requests to UI and waits for user response - let can_use_tool: Arc< - dyn Fn( - String, - serde_json::Value, - claude_agent_sdk_rs::ToolPermissionContext, - ) -> BoxFuture<'static, PermissionResult> - + Send - + Sync, - > = Arc::new({ - let tx = tx_for_callback; - let ctx = ctx_for_callback; - move |tool_name: String, - tool_input: serde_json::Value, - _context: claude_agent_sdk_rs::ToolPermissionContext| { - let tx = tx.clone(); - let ctx = ctx.clone(); - Box::pin(async move { - let (response_tx, response_rx) = oneshot::channel(); - - let request = PermissionRequest { - id: Uuid::new_v4(), - tool_name: tool_name.clone(), - tool_input: tool_input.clone(), - response: None, - }; - - let pending = PendingPermission { - request, - response_tx, - }; - - // Send permission request to UI - if tx - .send(DaveApiResponse::PermissionRequest(pending)) - .is_err() - { - tracing::error!("Failed to send permission request to UI"); - return PermissionResult::Deny(PermissionResultDeny { - message: "UI channel closed".to_string(), - interrupt: true, - }); - } + // Get or create session actor + let command_tx = { + let entry = self.sessions.entry(session_id.clone()); + let handle = entry.or_insert_with(|| { + let (command_tx, command_rx) = tokio_mpsc::channel(16); - ctx.request_repaint(); + // Spawn session actor + let session_id_clone = session_id.clone(); + tokio::spawn(async move { + session_actor(session_id_clone, command_rx).await; + }); - // Wait for user response - match response_rx.await { - Ok(PermissionResponse::Allow) => { - tracing::debug!("User allowed tool: {}", tool_name); - PermissionResult::Allow(PermissionResultAllow::default()) - } - Ok(PermissionResponse::Deny { reason }) => { - tracing::debug!("User denied tool {}: {}", tool_name, reason); - PermissionResult::Deny(PermissionResultDeny { - message: reason, - interrupt: false, - }) - } - Err(_) => { - tracing::error!("Permission response channel closed"); - PermissionResult::Deny(PermissionResultDeny { - message: "Permission request cancelled".to_string(), - interrupt: true, - }) - } - } - }) - } + SessionHandle { command_tx } }); + handle.command_tx.clone() + }; - // Use ClaudeClient instead of query_stream to enable control protocol - // for can_use_tool callbacks - // For follow-up messages, use --continue to resume the last conversation - let stderr_callback = Arc::new(stderr_callback); - - let options = if is_first_message { - ClaudeAgentOptions::builder() - .permission_mode(PermissionMode::Default) - .stderr_callback(stderr_callback.clone()) - .can_use_tool(can_use_tool) - .include_partial_messages(true) - .build() - } else { - ClaudeAgentOptions::builder() - .permission_mode(PermissionMode::Default) - .stderr_callback(stderr_callback.clone()) - .can_use_tool(can_use_tool) - .include_partial_messages(true) - .continue_conversation(true) - .build() - }; - let mut client = ClaudeClient::new(options); - if let Err(err) = client.connect().await { - tracing::error!("Claude Code connection error: {}", err); - let _ = tx.send(DaveApiResponse::Failed(err.to_string())); - let _ = client.disconnect().await; - return; - } - if let Err(err) = client.query(&prompt).await { - tracing::error!("Claude Code query error: {}", err); - let _ = tx.send(DaveApiResponse::Failed(err.to_string())); - let _ = client.disconnect().await; - return; - } - let mut stream = client.receive_response(); - - // Track pending tool uses: tool_use_id -> (tool_name, tool_input) - let mut pending_tools: HashMap<String, (String, serde_json::Value)> = HashMap::new(); - - while let Some(result) = stream.next().await { - match result { - Ok(message) => match message { - ClaudeMessage::Assistant(assistant_msg) => { - // Text is handled by StreamEvent for incremental display - for block in &assistant_msg.message.content { - if let ContentBlock::ToolUse(ToolUseBlock { id, name, input }) = - block - { - // Store for later correlation with tool result - pending_tools.insert(id.clone(), (name.clone(), input.clone())); - } - } - } - ClaudeMessage::StreamEvent(event) => { - if let Some(event_type) = - event.event.get("type").and_then(|v| v.as_str()) - { - if event_type == "content_block_delta" { - if let Some(text) = event - .event - .get("delta") - .and_then(|d| d.get("text")) - .and_then(|t| t.as_str()) - { - if let Err(err) = - tx.send(DaveApiResponse::Token(text.to_string())) - { - tracing::error!("Failed to send token to UI: {}", err); - drop(stream); - let _ = client.disconnect().await; - return; - } - ctx.request_repaint(); - } - } - } - } - ClaudeMessage::Result(result_msg) => { - if result_msg.is_error { - let error_text = result_msg - .result - .unwrap_or_else(|| "Unknown error".to_string()); - let _ = tx.send(DaveApiResponse::Failed(error_text)); - } - break; - } - ClaudeMessage::User(user_msg) => { - // Tool results come in user_msg.extra, not content - // Structure: extra["tool_use_result"] has the result, - // extra["message"]["content"][0]["tool_use_id"] has the correlation ID - if let Some(tool_use_result) = user_msg.extra.get("tool_use_result") { - // Get tool_use_id from message.content[0].tool_use_id - let tool_use_id = user_msg - .extra - .get("message") - .and_then(|m| m.get("content")) - .and_then(|c| c.as_array()) - .and_then(|arr| arr.first()) - .and_then(|item| item.get("tool_use_id")) - .and_then(|id| id.as_str()); - - if let Some(tool_use_id) = tool_use_id { - if let Some((tool_name, tool_input)) = - pending_tools.remove(tool_use_id) - { - let summary = format_tool_summary( - &tool_name, - &tool_input, - tool_use_result, - ); - let tool_result = ToolResult { tool_name, summary }; - let _ = tx.send(DaveApiResponse::ToolResult(tool_result)); - ctx.request_repaint(); - } - } - } - } - _ => {} - }, - Err(err) => { - tracing::error!("Claude stream error: {}", err); - let _ = tx.send(DaveApiResponse::Failed(err.to_string())); - drop(stream); - let _ = client.disconnect().await; - return; - } - } + // Spawn a task to send the query command + let handle = tokio::spawn(async move { + if let Err(err) = command_tx + .send(SessionCommand::Query { + prompt, + response_tx, + ctx, + }) + .await + { + tracing::error!("Failed to send query command to session actor: {}", err); } - - drop(stream); - let _ = client.disconnect().await; - tracing::debug!("Claude stream closed"); }); - (rx, Some(handle)) + (response_rx, Some(handle)) + } + + fn cleanup_session(&self, session_id: String) { + if let Some((_, handle)) = self.sessions.remove(&session_id) { + tokio::spawn(async move { + if let Err(err) = handle.command_tx.send(SessionCommand::Shutdown).await { + tracing::warn!("Failed to send shutdown command: {}", err); + } + }); + } } } diff --git a/crates/notedeck_dave/src/backend/openai.rs b/crates/notedeck_dave/src/backend/openai.rs @@ -161,4 +161,9 @@ impl AiBackend for OpenAiBackend { (rx, Some(handle)) } + + fn cleanup_session(&self, _session_id: String) { + // OpenAI backend doesn't maintain persistent connections per session + // No cleanup needed + } } diff --git a/crates/notedeck_dave/src/backend/traits.rs b/crates/notedeck_dave/src/backend/traits.rs @@ -29,4 +29,8 @@ pub trait AiBackend: Send + Sync { mpsc::Receiver<DaveApiResponse>, Option<tokio::task::JoinHandle<()>>, ); + + /// Clean up resources associated with a session. + /// Called when a session is deleted to allow backends to shut down any persistent connections. + fn cleanup_session(&self, session_id: String); } diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs @@ -391,7 +391,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr } SceneAction::DeleteSelected => { for id in self.scene.selected.clone() { - self.session_manager.delete_session(id); + self.delete_session(id); } self.scene.clear_selection(); } @@ -457,7 +457,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr self.session_manager.switch_to(id); } SessionListAction::Delete(id) => { - self.session_manager.delete_session(id); + self.delete_session(id); } } } @@ -485,7 +485,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr self.show_session_list = false; } SessionListAction::Delete(id) => { - self.session_manager.delete_session(id); + self.delete_session(id); } } } @@ -505,6 +505,15 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr self.session_manager.new_session(); } + /// Delete a session and clean up backend resources + fn delete_session(&mut self, id: SessionId) { + if self.session_manager.delete_session(id) { + // Clean up backend resources (e.g., close persistent connections) + let session_id = format!("dave-session-{}", id); + self.backend.cleanup_session(session_id); + } + } + /// Handle a user send action triggered by the ui fn handle_user_send(&mut self, app_ctx: &AppContext, ui: &egui::Ui) { if let Some(session) = self.session_manager.get_active_mut() {