notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

claude.rs (36942B)


      1 use crate::backend::session_info::parse_session_info;
      2 use crate::backend::shared::{self, SessionCommand, SessionHandle};
      3 use crate::backend::tool_summary::extract_response_content;
      4 use crate::backend::traits::AiBackend;
      5 use crate::file_update::FileUpdate;
      6 use crate::messages::{
      7     CompactionInfo, DaveApiResponse, PermissionResponse, SubagentInfo, SubagentStatus,
      8 };
      9 use crate::tools::Tool;
     10 use crate::Message;
     11 use claude_agent_sdk_rs::{
     12     ClaudeAgentOptions, ClaudeClient, ContentBlock, Message as ClaudeMessage, PermissionMode,
     13     PermissionResult, PermissionResultAllow, PermissionResultDeny, ToolResultContent, ToolUseBlock,
     14     UserContentBlock,
     15 };
     16 use dashmap::DashMap;
     17 use futures::future::BoxFuture;
     18 use futures::StreamExt;
     19 use std::collections::HashMap;
     20 use std::path::PathBuf;
     21 use std::sync::mpsc;
     22 use std::sync::Arc;
     23 use tokio::sync::mpsc as tokio_mpsc;
     24 use tokio::sync::oneshot;
     25 
     26 /// Convert a ToolResultContent to a serde_json::Value for use with tool summary formatting
     27 fn tool_result_content_to_value(content: &Option<ToolResultContent>) -> serde_json::Value {
     28     match content {
     29         Some(ToolResultContent::Text(s)) => serde_json::Value::String(s.clone()),
     30         Some(ToolResultContent::Blocks(blocks)) => serde_json::Value::Array(blocks.to_vec()),
     31         None => serde_json::Value::Null,
     32     }
     33 }
     34 
     35 pub struct ClaudeBackend {
     36     /// Registry of active sessions (using dashmap for lock-free access)
     37     sessions: DashMap<String, SessionHandle>,
     38 }
     39 
     40 impl ClaudeBackend {
     41     pub fn new() -> Self {
     42         Self {
     43             sessions: DashMap::new(),
     44         }
     45     }
     46 }
     47 
     48 /// Permission request forwarded from the callback to the actor
     49 struct PermissionRequestInternal {
     50     tool_name: String,
     51     tool_input: serde_json::Value,
     52     response_tx: oneshot::Sender<PermissionResult>,
     53 }
     54 
     55 /// Session actor task that owns a single ClaudeClient with persistent connection
     56 async fn session_actor(
     57     session_id: String,
     58     cwd: Option<PathBuf>,
     59     resume_session_id: Option<String>,
     60     mut command_rx: tokio_mpsc::Receiver<SessionCommand>,
     61 ) {
     62     // Permission channel - the callback sends to perm_tx, actor receives on perm_rx
     63     let (perm_tx, mut perm_rx) = tokio_mpsc::channel::<PermissionRequestInternal>(16);
     64 
     65     // Create the can_use_tool callback that forwards to our permission channel
     66     let can_use_tool: Arc<
     67         dyn Fn(
     68                 String,
     69                 serde_json::Value,
     70                 claude_agent_sdk_rs::ToolPermissionContext,
     71             ) -> BoxFuture<'static, PermissionResult>
     72             + Send
     73             + Sync,
     74     > = Arc::new({
     75         let perm_tx = perm_tx.clone();
     76         move |tool_name: String,
     77               tool_input: serde_json::Value,
     78               _context: claude_agent_sdk_rs::ToolPermissionContext| {
     79             let perm_tx = perm_tx.clone();
     80             Box::pin(async move {
     81                 let (resp_tx, resp_rx) = oneshot::channel();
     82                 if perm_tx
     83                     .send(PermissionRequestInternal {
     84                         tool_name: tool_name.clone(),
     85                         tool_input,
     86                         response_tx: resp_tx,
     87                     })
     88                     .await
     89                     .is_err()
     90                 {
     91                     return PermissionResult::Deny(PermissionResultDeny {
     92                         message: "Session actor channel closed".to_string(),
     93                         interrupt: true,
     94                     });
     95                 }
     96                 // Wait for response from session actor (which forwards from UI)
     97                 match resp_rx.await {
     98                     Ok(result) => result,
     99                     Err(_) => PermissionResult::Deny(PermissionResultDeny {
    100                         message: "Permission response cancelled".to_string(),
    101                         interrupt: true,
    102                     }),
    103                 }
    104             })
    105         }
    106     });
    107 
    108     // A stderr callback to prevent the subprocess from blocking
    109     let stderr_callback = Arc::new(|msg: String| {
    110         tracing::trace!("Claude CLI stderr: {}", msg);
    111     });
    112 
    113     // Log if we're resuming a session
    114     if let Some(ref resume_id) = resume_session_id {
    115         tracing::info!(
    116             "Session {} will resume Claude session: {}",
    117             session_id,
    118             resume_id
    119         );
    120     }
    121 
    122     // Create client once - this maintains the persistent connection
    123     // Using match to handle the TypedBuilder's strict type requirements
    124     let options = match (&cwd, &resume_session_id) {
    125         (Some(dir), Some(resume_id)) => ClaudeAgentOptions::builder()
    126             .permission_mode(PermissionMode::Default)
    127             .stderr_callback(stderr_callback)
    128             .can_use_tool(can_use_tool)
    129             .include_partial_messages(true)
    130             .cwd(dir)
    131             .resume(resume_id)
    132             .build(),
    133         (Some(dir), None) => ClaudeAgentOptions::builder()
    134             .permission_mode(PermissionMode::Default)
    135             .stderr_callback(stderr_callback)
    136             .can_use_tool(can_use_tool)
    137             .include_partial_messages(true)
    138             .cwd(dir)
    139             .build(),
    140         (None, Some(resume_id)) => ClaudeAgentOptions::builder()
    141             .permission_mode(PermissionMode::Default)
    142             .stderr_callback(stderr_callback)
    143             .can_use_tool(can_use_tool)
    144             .include_partial_messages(true)
    145             .resume(resume_id)
    146             .build(),
    147         (None, None) => ClaudeAgentOptions::builder()
    148             .permission_mode(PermissionMode::Default)
    149             .stderr_callback(stderr_callback)
    150             .can_use_tool(can_use_tool)
    151             .include_partial_messages(true)
    152             .build(),
    153     };
    154     let mut client = ClaudeClient::new(options);
    155 
    156     // Connect once - this starts the subprocess
    157     if let Err(err) = client.connect().await {
    158         tracing::error!("Session {} failed to connect: {}", session_id, err);
    159         // Process any pending commands to report the error
    160         while let Some(cmd) = command_rx.recv().await {
    161             if let SessionCommand::Query {
    162                 ref response_tx, ..
    163             } = cmd
    164             {
    165                 let _ = response_tx.send(DaveApiResponse::Failed(format!(
    166                     "Failed to connect to Claude: {}",
    167                     err
    168                 )));
    169             }
    170             if matches!(cmd, SessionCommand::Shutdown) {
    171                 break;
    172             }
    173         }
    174         return;
    175     }
    176 
    177     tracing::debug!("Session {} connected successfully", session_id);
    178 
    179     // Process commands
    180     while let Some(cmd) = command_rx.recv().await {
    181         match cmd {
    182             SessionCommand::Query {
    183                 prompt,
    184                 response_tx,
    185                 ctx,
    186             } => {
    187                 // Send query using session_id for context
    188                 if let Err(err) = client.query_with_session(&prompt, &session_id).await {
    189                     tracing::error!("Session {} query error: {}", session_id, err);
    190                     let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
    191                     continue;
    192                 }
    193 
    194                 // Track pending tool uses: tool_use_id -> (tool_name, tool_input)
    195                 let mut pending_tools: HashMap<String, (String, serde_json::Value)> =
    196                     HashMap::new();
    197                 // Track active subagent nesting: tool results emitted while
    198                 // a Task is in-flight belong to the top-of-stack subagent.
    199                 let mut subagent_stack: Vec<String> = Vec::new();
    200 
    201                 // Stream response with select! to handle stream, permission requests, and interrupts
    202                 let mut stream = client.receive_response();
    203                 let mut stream_done = false;
    204 
    205                 while !stream_done {
    206                     tokio::select! {
    207                         biased;
    208 
    209                         // Check for interrupt command (highest priority)
    210                         Some(cmd) = command_rx.recv() => {
    211                             match cmd {
    212                                 SessionCommand::Interrupt { ctx: interrupt_ctx } => {
    213                                     tracing::debug!("Session {} received interrupt", session_id);
    214                                     if let Err(err) = client.interrupt().await {
    215                                         tracing::error!("Failed to send interrupt: {}", err);
    216                                     }
    217                                     // Let the stream end naturally - it will send a Result message
    218                                     // The session history is preserved by the CLI
    219                                     interrupt_ctx.request_repaint();
    220                                 }
    221                                 SessionCommand::Query { response_tx: new_tx, .. } => {
    222                                     // A new query came in while we're still streaming - shouldn't happen
    223                                     // but handle gracefully by rejecting it
    224                                     let _ = new_tx.send(DaveApiResponse::Failed(
    225                                         "Query already in progress".to_string()
    226                                     ));
    227                                 }
    228                                 SessionCommand::SetPermissionMode { mode, ctx: mode_ctx } => {
    229                                     // Permission mode change during query - apply it
    230                                     tracing::debug!("Session {} setting permission mode to {:?} during query", session_id, mode);
    231                                     if let Err(err) = client.set_permission_mode(mode).await {
    232                                         tracing::error!("Failed to set permission mode: {}", err);
    233                                     }
    234                                     mode_ctx.request_repaint();
    235                                 }
    236                                 SessionCommand::Compact { response_tx: compact_tx, .. } => {
    237                                     let _ = compact_tx.send(DaveApiResponse::Failed(
    238                                         "Cannot compact during active turn".to_string(),
    239                                     ));
    240                                 }
    241                                 SessionCommand::Shutdown => {
    242                                     tracing::debug!("Session actor {} shutting down during query", session_id);
    243                                     // Drop stream and disconnect - break to exit loop first
    244                                     drop(stream);
    245                                     if let Err(err) = client.disconnect().await {
    246                                         tracing::warn!("Error disconnecting session {}: {}", session_id, err);
    247                                     }
    248                                     tracing::debug!("Session {} actor exited", session_id);
    249                                     return;
    250                                 }
    251                             }
    252                         }
    253 
    254                         // Handle permission requests (they're blocking the SDK)
    255                         Some(perm_req) = perm_rx.recv() => {
    256                             if shared::should_auto_accept(&perm_req.tool_name, &perm_req.tool_input) {
    257                                 let _ = perm_req.response_tx.send(PermissionResult::Allow(PermissionResultAllow::default()));
    258                                 continue;
    259                             }
    260 
    261                             let ui_resp_rx = match shared::forward_permission_to_ui(
    262                                 &perm_req.tool_name,
    263                                 perm_req.tool_input.clone(),
    264                                 &response_tx,
    265                                 &ctx,
    266                             ) {
    267                                 Some(rx) => rx,
    268                                 None => {
    269                                     let _ = perm_req.response_tx.send(PermissionResult::Deny(PermissionResultDeny {
    270                                         message: "UI channel closed".to_string(),
    271                                         interrupt: true,
    272                                     }));
    273                                     continue;
    274                                 }
    275                             };
    276 
    277                             // Wait for UI response inline - blocking is OK since stream is
    278                             // waiting for permission result anyway
    279                             let tool_name = perm_req.tool_name.clone();
    280                             let result = match ui_resp_rx.await {
    281                                 Ok(PermissionResponse::Allow { message }) => {
    282                                     if let Some(msg) = &message {
    283                                         tracing::debug!("User allowed tool {} with message: {}", tool_name, msg);
    284                                         // Inject user message into conversation so AI sees it
    285                                         if let Err(err) = client.query_with_content_and_session(
    286                                             vec![UserContentBlock::text(msg.as_str())],
    287                                             &session_id
    288                                         ).await {
    289                                             tracing::error!("Failed to inject user message: {}", err);
    290                                         }
    291                                     } else {
    292                                         tracing::debug!("User allowed tool: {}", tool_name);
    293                                     }
    294                                     PermissionResult::Allow(PermissionResultAllow::default())
    295                                 }
    296                                 Ok(PermissionResponse::Deny { reason }) => {
    297                                     tracing::debug!("User denied tool {}: {}", tool_name, reason);
    298                                     PermissionResult::Deny(PermissionResultDeny {
    299                                         message: reason,
    300                                         interrupt: false,
    301                                     })
    302                                 }
    303                                 Err(_) => {
    304                                     tracing::error!("Permission response channel closed");
    305                                     PermissionResult::Deny(PermissionResultDeny {
    306                                         message: "Permission request cancelled".to_string(),
    307                                         interrupt: true,
    308                                     })
    309                                 }
    310                             };
    311                             let _ = perm_req.response_tx.send(result);
    312                         }
    313 
    314                         stream_result = stream.next() => {
    315                             match stream_result {
    316                                 Some(Ok(message)) => {
    317                                     match message {
    318                                         ClaudeMessage::Assistant(assistant_msg) => {
    319                                             for block in &assistant_msg.message.content {
    320                                                 if let ContentBlock::ToolUse(ToolUseBlock { id, name, input }) = block {
    321                                                     pending_tools.insert(id.clone(), (name.clone(), input.clone()));
    322 
    323                                                     // Emit SubagentSpawned for Task tool calls
    324                                                     if name == "Task" {
    325                                                         let description = input
    326                                                             .get("description")
    327                                                             .and_then(|v| v.as_str())
    328                                                             .unwrap_or("task")
    329                                                             .to_string();
    330                                                         let subagent_type = input
    331                                                             .get("subagent_type")
    332                                                             .and_then(|v| v.as_str())
    333                                                             .unwrap_or("unknown")
    334                                                             .to_string();
    335 
    336                                                         subagent_stack.push(id.clone());
    337                                                         let subagent_info = SubagentInfo {
    338                                                             task_id: id.clone(),
    339                                                             description,
    340                                                             subagent_type,
    341                                                             status: SubagentStatus::Running,
    342                                                             output: String::new(),
    343                                                             max_output_size: 4000,
    344                                                             tool_results: Vec::new(),
    345                                                         };
    346                                                         let _ = response_tx.send(DaveApiResponse::SubagentSpawned(subagent_info));
    347                                                         ctx.request_repaint();
    348                                                     }
    349                                                 }
    350                                             }
    351                                         }
    352                                         ClaudeMessage::StreamEvent(event) => {
    353                                             if let Some(event_type) = event.event.get("type").and_then(|v| v.as_str()) {
    354                                                 if event_type == "content_block_delta" {
    355                                                     if let Some(text) = event
    356                                                         .event
    357                                                         .get("delta")
    358                                                         .and_then(|d| d.get("text"))
    359                                                         .and_then(|t| t.as_str())
    360                                                     {
    361                                                         if response_tx.send(DaveApiResponse::Token(text.to_string())).is_err() {
    362                                                             tracing::error!("Failed to send token to UI");
    363                                                             // Setting stream_done isn't needed since we break immediately
    364                                                             break;
    365                                                         }
    366                                                         ctx.request_repaint();
    367                                                     }
    368                                                 }
    369                                             }
    370                                         }
    371                                         ClaudeMessage::Result(result_msg) => {
    372                                             if result_msg.is_error {
    373                                                 let error_text = result_msg
    374                                                     .result
    375                                                     .unwrap_or_else(|| "Unknown error".to_string());
    376                                                 let _ = response_tx.send(DaveApiResponse::Failed(error_text));
    377                                             }
    378 
    379                                             // Extract usage metrics
    380                                             tracing::debug!(
    381                                                 "ResultMessage usage: {:?}, total_cost_usd: {:?}, num_turns: {}",
    382                                                 result_msg.usage,
    383                                                 result_msg.total_cost_usd,
    384                                                 result_msg.num_turns
    385                                             );
    386                                             let (input_tokens, output_tokens) = result_msg
    387                                                 .usage
    388                                                 .as_ref()
    389                                                 .map(|u| {
    390                                                     let inp = u.get("input_tokens")
    391                                                         .and_then(|v| v.as_u64())
    392                                                         .unwrap_or(0);
    393                                                     let out = u.get("output_tokens")
    394                                                         .and_then(|v| v.as_u64())
    395                                                         .unwrap_or(0);
    396                                                     (inp, out)
    397                                                 })
    398                                                 .unwrap_or((0, 0));
    399 
    400                                             let usage_info = crate::messages::UsageInfo {
    401                                                 input_tokens,
    402                                                 output_tokens,
    403                                                 cost_usd: result_msg.total_cost_usd,
    404                                                 num_turns: result_msg.num_turns,
    405                                             };
    406                                             let _ = response_tx.send(DaveApiResponse::QueryComplete(usage_info));
    407 
    408                                             stream_done = true;
    409                                         }
    410                                         ClaudeMessage::User(user_msg) => {
    411                                             // Tool results are nested in extra["message"]["content"]
    412                                             // since the SDK's UserMessage.content field doesn't
    413                                             // capture the inner message's content array.
    414                                             let content_blocks: Vec<ContentBlock> = user_msg
    415                                                 .extra
    416                                                 .get("message")
    417                                                 .and_then(|m| m.get("content"))
    418                                                 .and_then(|c| c.as_array())
    419                                                 .map(|arr| {
    420                                                     arr.iter()
    421                                                         .filter_map(|v| serde_json::from_value::<ContentBlock>(v.clone()).ok())
    422                                                         .collect()
    423                                                 })
    424                                                 .unwrap_or_default();
    425 
    426                                             for block in &content_blocks {
    427                                                 if let ContentBlock::ToolResult(tool_result_block) = block {
    428                                                     let tool_use_id = &tool_result_block.tool_use_id;
    429                                                     if let Some((tool_name, tool_input)) = pending_tools.remove(tool_use_id) {
    430                                                         let result_value = tool_result_content_to_value(&tool_result_block.content);
    431 
    432                                                         // Check if this is a Task tool completion
    433                                                         if tool_name == "Task" {
    434                                                             let result_text = extract_response_content(&result_value)
    435                                                                 .unwrap_or_else(|| "completed".to_string());
    436                                                             shared::complete_subagent(tool_use_id, &result_text, &mut subagent_stack, &response_tx, &ctx);
    437                                                         }
    438 
    439                                                         let file_update = FileUpdate::from_tool_call(&tool_name, &tool_input);
    440                                                         shared::send_tool_result(&tool_name, &tool_input, &result_value, file_update, &subagent_stack, &response_tx, &ctx);
    441                                                     }
    442                                                 }
    443                                             }
    444                                         }
    445                                         ClaudeMessage::System(system_msg) => {
    446                                             // Handle system init message - extract session info
    447                                             if system_msg.subtype == "init" {
    448                                                 let session_info = parse_session_info(&system_msg);
    449                                                 let _ = response_tx.send(DaveApiResponse::SessionInfo(session_info));
    450                                                 ctx.request_repaint();
    451                                             } else if system_msg.subtype == "status" {
    452                                                 // Handle status messages (compaction start/end)
    453                                                 let status = system_msg.data.get("status")
    454                                                     .and_then(|v| v.as_str());
    455                                                 if status == Some("compacting") {
    456                                                     let _ = response_tx.send(DaveApiResponse::CompactionStarted);
    457                                                     ctx.request_repaint();
    458                                                 }
    459                                                 // status: null means compaction finished (handled by compact_boundary)
    460                                             } else if system_msg.subtype == "compact_boundary" {
    461                                                 // Compaction completed - extract token savings info
    462                                                 tracing::debug!("compact_boundary data: {:?}", system_msg.data);
    463                                                 let pre_tokens = system_msg.data.get("pre_tokens")
    464                                                     .and_then(|v| v.as_u64())
    465                                                     .unwrap_or(0);
    466                                                 let info = CompactionInfo { pre_tokens };
    467                                                 let _ = response_tx.send(DaveApiResponse::CompactionComplete(info));
    468                                                 ctx.request_repaint();
    469                                             } else {
    470                                                 tracing::debug!("Received system message subtype: {}", system_msg.subtype);
    471                                             }
    472                                         }
    473                                         ClaudeMessage::ControlCancelRequest(_) => {
    474                                             // Ignore internal control messages
    475                                         }
    476                                     }
    477                                 }
    478                                 Some(Err(err)) => {
    479                                     // Non-fatal: unknown message types (e.g. rate_limit_event)
    480                                     // cause deserialization errors but the stream continues.
    481                                     tracing::warn!("Claude stream message skipped: {}", err);
    482                                 }
    483                                 None => {
    484                                     stream_done = true;
    485                                 }
    486                             }
    487                         }
    488                     }
    489                 }
    490 
    491                 tracing::debug!("Query complete for session {}", session_id);
    492                 // Don't disconnect - keep the connection alive for subsequent queries
    493             }
    494             SessionCommand::Interrupt { ctx } => {
    495                 // Interrupt received when not in a query - just request repaint
    496                 tracing::debug!(
    497                     "Session {} received interrupt but no query active",
    498                     session_id
    499                 );
    500                 ctx.request_repaint();
    501             }
    502             SessionCommand::SetPermissionMode { mode, ctx } => {
    503                 tracing::debug!(
    504                     "Session {} setting permission mode to {:?}",
    505                     session_id,
    506                     mode
    507                 );
    508                 if let Err(err) = client.set_permission_mode(mode).await {
    509                     tracing::error!("Failed to set permission mode: {}", err);
    510                 }
    511                 ctx.request_repaint();
    512             }
    513             SessionCommand::Compact { response_tx, .. } => {
    514                 // Claude compact is normally routed via compact_session() which
    515                 // sends /compact as a Query. If a Compact command arrives directly,
    516                 // just drop the tx — the caller will see it disconnected.
    517                 tracing::debug!(
    518                     "Session {} received Compact command (not expected for Claude)",
    519                     session_id
    520                 );
    521                 drop(response_tx);
    522             }
    523             SessionCommand::Shutdown => {
    524                 tracing::debug!("Session actor {} shutting down", session_id);
    525                 break;
    526             }
    527         }
    528     }
    529 
    530     // Disconnect when shutting down
    531     if let Err(err) = client.disconnect().await {
    532         tracing::warn!("Error disconnecting session {}: {}", session_id, err);
    533     }
    534     tracing::debug!("Session {} actor exited", session_id);
    535 }
    536 
    537 impl AiBackend for ClaudeBackend {
    538     fn stream_request(
    539         &self,
    540         messages: Vec<Message>,
    541         _tools: Arc<HashMap<String, Tool>>,
    542         _model: String,
    543         _user_id: String,
    544         session_id: String,
    545         cwd: Option<PathBuf>,
    546         resume_session_id: Option<String>,
    547         ctx: egui::Context,
    548     ) -> (
    549         mpsc::Receiver<DaveApiResponse>,
    550         Option<tokio::task::JoinHandle<()>>,
    551     ) {
    552         let (response_tx, response_rx) = mpsc::channel();
    553 
    554         let prompt = shared::prepare_prompt(&messages, &resume_session_id);
    555 
    556         tracing::debug!(
    557             "Sending request to Claude Code: session={}, resumed={}, prompt length: {}, preview: {:?}",
    558             session_id,
    559             resume_session_id.is_some(),
    560             prompt.len(),
    561             &prompt[..prompt.len().min(100)]
    562         );
    563 
    564         // Get or create session actor
    565         let command_tx = {
    566             let entry = self.sessions.entry(session_id.clone());
    567             let handle = entry.or_insert_with(|| {
    568                 let (command_tx, command_rx) = tokio_mpsc::channel(16);
    569 
    570                 // Spawn session actor with cwd and optional resume session ID
    571                 let session_id_clone = session_id.clone();
    572                 let cwd_clone = cwd.clone();
    573                 let resume_session_id_clone = resume_session_id.clone();
    574                 tokio::spawn(async move {
    575                     session_actor(
    576                         session_id_clone,
    577                         cwd_clone,
    578                         resume_session_id_clone,
    579                         command_rx,
    580                     )
    581                     .await;
    582                 });
    583 
    584                 SessionHandle { command_tx }
    585             });
    586             handle.command_tx.clone()
    587         };
    588 
    589         // Spawn a task to send the query command
    590         let handle = tokio::spawn(async move {
    591             if let Err(err) = command_tx
    592                 .send(SessionCommand::Query {
    593                     prompt,
    594                     response_tx,
    595                     ctx,
    596                 })
    597                 .await
    598             {
    599                 tracing::error!("Failed to send query command to session actor: {}", err);
    600             }
    601         });
    602 
    603         (response_rx, Some(handle))
    604     }
    605 
    606     fn cleanup_session(&self, session_id: String) {
    607         if let Some((_, handle)) = self.sessions.remove(&session_id) {
    608             tokio::spawn(async move {
    609                 if let Err(err) = handle.command_tx.send(SessionCommand::Shutdown).await {
    610                     tracing::warn!("Failed to send shutdown command: {}", err);
    611                 }
    612             });
    613         }
    614     }
    615 
    616     fn interrupt_session(&self, session_id: String, ctx: egui::Context) {
    617         if let Some(handle) = self.sessions.get(&session_id) {
    618             let command_tx = handle.command_tx.clone();
    619             tokio::spawn(async move {
    620                 if let Err(err) = command_tx.send(SessionCommand::Interrupt { ctx }).await {
    621                     tracing::warn!("Failed to send interrupt command: {}", err);
    622                 }
    623             });
    624         }
    625     }
    626 
    627     fn set_permission_mode(&self, session_id: String, mode: PermissionMode, ctx: egui::Context) {
    628         if let Some(handle) = self.sessions.get(&session_id) {
    629             let command_tx = handle.command_tx.clone();
    630             tokio::spawn(async move {
    631                 if let Err(err) = command_tx
    632                     .send(SessionCommand::SetPermissionMode { mode, ctx })
    633                     .await
    634                 {
    635                     tracing::warn!("Failed to send set_permission_mode command: {}", err);
    636                 }
    637             });
    638         } else {
    639             tracing::debug!(
    640                 "Session {} not active, permission mode will apply on next query",
    641                 session_id
    642             );
    643         }
    644     }
    645 
    646     fn compact_session(
    647         &self,
    648         session_id: String,
    649         ctx: egui::Context,
    650     ) -> Option<mpsc::Receiver<DaveApiResponse>> {
    651         let handle = self.sessions.get(&session_id)?;
    652         let command_tx = handle.command_tx.clone();
    653         let (response_tx, response_rx) = mpsc::channel();
    654         tokio::spawn(async move {
    655             if let Err(err) = command_tx
    656                 .send(SessionCommand::Query {
    657                     prompt: "/compact".to_string(),
    658                     response_tx,
    659                     ctx,
    660                 })
    661                 .await
    662             {
    663                 tracing::warn!("Failed to send compact query to claude session: {}", err);
    664             }
    665         });
    666         Some(response_rx)
    667     }
    668 }
    669 
    670 #[cfg(test)]
    671 mod tests {
    672     use super::*;
    673     use crate::messages::AssistantMessage;
    674 
    675     #[test]
    676     fn pending_messages_single_user() {
    677         let messages = vec![Message::User("hello".into())];
    678         assert_eq!(shared::get_pending_user_messages(&messages), "hello");
    679     }
    680 
    681     #[test]
    682     fn pending_messages_multiple_trailing_users() {
    683         let messages = vec![
    684             Message::User("first".into()),
    685             Message::Assistant(AssistantMessage::from_text("reply".into())),
    686             Message::User("second".into()),
    687             Message::User("third".into()),
    688             Message::User("fourth".into()),
    689         ];
    690         assert_eq!(
    691             shared::get_pending_user_messages(&messages),
    692             "second\nthird\nfourth"
    693         );
    694     }
    695 
    696     #[test]
    697     fn pending_messages_stops_at_non_user() {
    698         let messages = vec![
    699             Message::User("old".into()),
    700             Message::User("also old".into()),
    701             Message::Assistant(AssistantMessage::from_text("reply".into())),
    702             Message::User("pending".into()),
    703         ];
    704         assert_eq!(shared::get_pending_user_messages(&messages), "pending");
    705     }
    706 
    707     #[test]
    708     fn pending_messages_empty_when_last_is_assistant() {
    709         let messages = vec![
    710             Message::User("hello".into()),
    711             Message::Assistant(AssistantMessage::from_text("reply".into())),
    712         ];
    713         assert_eq!(shared::get_pending_user_messages(&messages), "");
    714     }
    715 
    716     #[test]
    717     fn pending_messages_empty_chat() {
    718         let messages: Vec<Message> = vec![];
    719         assert_eq!(shared::get_pending_user_messages(&messages), "");
    720     }
    721 
    722     #[test]
    723     fn pending_messages_stops_at_tool_response() {
    724         let messages = vec![
    725             Message::User("do something".into()),
    726             Message::Assistant(AssistantMessage::from_text("ok".into())),
    727             Message::ToolCalls(vec![crate::tools::ToolCall::invalid(
    728                 "c1".into(),
    729                 Some("Read".into()),
    730                 None,
    731                 "test".into(),
    732             )]),
    733             Message::ToolResponse(crate::tools::ToolResponse::error(
    734                 "c1".into(),
    735                 "result".into(),
    736             )),
    737             Message::User("queued 1".into()),
    738             Message::User("queued 2".into()),
    739         ];
    740         assert_eq!(
    741             shared::get_pending_user_messages(&messages),
    742             "queued 1\nqueued 2"
    743         );
    744     }
    745 
    746     #[test]
    747     fn pending_messages_preserves_order() {
    748         let messages = vec![
    749             Message::User("a".into()),
    750             Message::User("b".into()),
    751             Message::User("c".into()),
    752         ];
    753         assert_eq!(shared::get_pending_user_messages(&messages), "a\nb\nc");
    754     }
    755 }