claude.rs (36942B)
1 use crate::backend::session_info::parse_session_info; 2 use crate::backend::shared::{self, SessionCommand, SessionHandle}; 3 use crate::backend::tool_summary::extract_response_content; 4 use crate::backend::traits::AiBackend; 5 use crate::file_update::FileUpdate; 6 use crate::messages::{ 7 CompactionInfo, DaveApiResponse, PermissionResponse, SubagentInfo, SubagentStatus, 8 }; 9 use crate::tools::Tool; 10 use crate::Message; 11 use claude_agent_sdk_rs::{ 12 ClaudeAgentOptions, ClaudeClient, ContentBlock, Message as ClaudeMessage, PermissionMode, 13 PermissionResult, PermissionResultAllow, PermissionResultDeny, ToolResultContent, ToolUseBlock, 14 UserContentBlock, 15 }; 16 use dashmap::DashMap; 17 use futures::future::BoxFuture; 18 use futures::StreamExt; 19 use std::collections::HashMap; 20 use std::path::PathBuf; 21 use std::sync::mpsc; 22 use std::sync::Arc; 23 use tokio::sync::mpsc as tokio_mpsc; 24 use tokio::sync::oneshot; 25 26 /// Convert a ToolResultContent to a serde_json::Value for use with tool summary formatting 27 fn tool_result_content_to_value(content: &Option<ToolResultContent>) -> serde_json::Value { 28 match content { 29 Some(ToolResultContent::Text(s)) => serde_json::Value::String(s.clone()), 30 Some(ToolResultContent::Blocks(blocks)) => serde_json::Value::Array(blocks.to_vec()), 31 None => serde_json::Value::Null, 32 } 33 } 34 35 pub struct ClaudeBackend { 36 /// Registry of active sessions (using dashmap for lock-free access) 37 sessions: DashMap<String, SessionHandle>, 38 } 39 40 impl ClaudeBackend { 41 pub fn new() -> Self { 42 Self { 43 sessions: DashMap::new(), 44 } 45 } 46 } 47 48 /// Permission request forwarded from the callback to the actor 49 struct PermissionRequestInternal { 50 tool_name: String, 51 tool_input: serde_json::Value, 52 response_tx: oneshot::Sender<PermissionResult>, 53 } 54 55 /// Session actor task that owns a single ClaudeClient with persistent connection 56 async fn session_actor( 57 session_id: String, 58 cwd: Option<PathBuf>, 59 resume_session_id: Option<String>, 60 mut command_rx: tokio_mpsc::Receiver<SessionCommand>, 61 ) { 62 // Permission channel - the callback sends to perm_tx, actor receives on perm_rx 63 let (perm_tx, mut perm_rx) = tokio_mpsc::channel::<PermissionRequestInternal>(16); 64 65 // Create the can_use_tool callback that forwards to our permission channel 66 let can_use_tool: Arc< 67 dyn Fn( 68 String, 69 serde_json::Value, 70 claude_agent_sdk_rs::ToolPermissionContext, 71 ) -> BoxFuture<'static, PermissionResult> 72 + Send 73 + Sync, 74 > = Arc::new({ 75 let perm_tx = perm_tx.clone(); 76 move |tool_name: String, 77 tool_input: serde_json::Value, 78 _context: claude_agent_sdk_rs::ToolPermissionContext| { 79 let perm_tx = perm_tx.clone(); 80 Box::pin(async move { 81 let (resp_tx, resp_rx) = oneshot::channel(); 82 if perm_tx 83 .send(PermissionRequestInternal { 84 tool_name: tool_name.clone(), 85 tool_input, 86 response_tx: resp_tx, 87 }) 88 .await 89 .is_err() 90 { 91 return PermissionResult::Deny(PermissionResultDeny { 92 message: "Session actor channel closed".to_string(), 93 interrupt: true, 94 }); 95 } 96 // Wait for response from session actor (which forwards from UI) 97 match resp_rx.await { 98 Ok(result) => result, 99 Err(_) => PermissionResult::Deny(PermissionResultDeny { 100 message: "Permission response cancelled".to_string(), 101 interrupt: true, 102 }), 103 } 104 }) 105 } 106 }); 107 108 // A stderr callback to prevent the subprocess from blocking 109 let stderr_callback = Arc::new(|msg: String| { 110 tracing::trace!("Claude CLI stderr: {}", msg); 111 }); 112 113 // Log if we're resuming a session 114 if let Some(ref resume_id) = resume_session_id { 115 tracing::info!( 116 "Session {} will resume Claude session: {}", 117 session_id, 118 resume_id 119 ); 120 } 121 122 // Create client once - this maintains the persistent connection 123 // Using match to handle the TypedBuilder's strict type requirements 124 let options = match (&cwd, &resume_session_id) { 125 (Some(dir), Some(resume_id)) => ClaudeAgentOptions::builder() 126 .permission_mode(PermissionMode::Default) 127 .stderr_callback(stderr_callback) 128 .can_use_tool(can_use_tool) 129 .include_partial_messages(true) 130 .cwd(dir) 131 .resume(resume_id) 132 .build(), 133 (Some(dir), None) => ClaudeAgentOptions::builder() 134 .permission_mode(PermissionMode::Default) 135 .stderr_callback(stderr_callback) 136 .can_use_tool(can_use_tool) 137 .include_partial_messages(true) 138 .cwd(dir) 139 .build(), 140 (None, Some(resume_id)) => ClaudeAgentOptions::builder() 141 .permission_mode(PermissionMode::Default) 142 .stderr_callback(stderr_callback) 143 .can_use_tool(can_use_tool) 144 .include_partial_messages(true) 145 .resume(resume_id) 146 .build(), 147 (None, None) => ClaudeAgentOptions::builder() 148 .permission_mode(PermissionMode::Default) 149 .stderr_callback(stderr_callback) 150 .can_use_tool(can_use_tool) 151 .include_partial_messages(true) 152 .build(), 153 }; 154 let mut client = ClaudeClient::new(options); 155 156 // Connect once - this starts the subprocess 157 if let Err(err) = client.connect().await { 158 tracing::error!("Session {} failed to connect: {}", session_id, err); 159 // Process any pending commands to report the error 160 while let Some(cmd) = command_rx.recv().await { 161 if let SessionCommand::Query { 162 ref response_tx, .. 163 } = cmd 164 { 165 let _ = response_tx.send(DaveApiResponse::Failed(format!( 166 "Failed to connect to Claude: {}", 167 err 168 ))); 169 } 170 if matches!(cmd, SessionCommand::Shutdown) { 171 break; 172 } 173 } 174 return; 175 } 176 177 tracing::debug!("Session {} connected successfully", session_id); 178 179 // Process commands 180 while let Some(cmd) = command_rx.recv().await { 181 match cmd { 182 SessionCommand::Query { 183 prompt, 184 response_tx, 185 ctx, 186 } => { 187 // Send query using session_id for context 188 if let Err(err) = client.query_with_session(&prompt, &session_id).await { 189 tracing::error!("Session {} query error: {}", session_id, err); 190 let _ = response_tx.send(DaveApiResponse::Failed(err.to_string())); 191 continue; 192 } 193 194 // Track pending tool uses: tool_use_id -> (tool_name, tool_input) 195 let mut pending_tools: HashMap<String, (String, serde_json::Value)> = 196 HashMap::new(); 197 // Track active subagent nesting: tool results emitted while 198 // a Task is in-flight belong to the top-of-stack subagent. 199 let mut subagent_stack: Vec<String> = Vec::new(); 200 201 // Stream response with select! to handle stream, permission requests, and interrupts 202 let mut stream = client.receive_response(); 203 let mut stream_done = false; 204 205 while !stream_done { 206 tokio::select! { 207 biased; 208 209 // Check for interrupt command (highest priority) 210 Some(cmd) = command_rx.recv() => { 211 match cmd { 212 SessionCommand::Interrupt { ctx: interrupt_ctx } => { 213 tracing::debug!("Session {} received interrupt", session_id); 214 if let Err(err) = client.interrupt().await { 215 tracing::error!("Failed to send interrupt: {}", err); 216 } 217 // Let the stream end naturally - it will send a Result message 218 // The session history is preserved by the CLI 219 interrupt_ctx.request_repaint(); 220 } 221 SessionCommand::Query { response_tx: new_tx, .. } => { 222 // A new query came in while we're still streaming - shouldn't happen 223 // but handle gracefully by rejecting it 224 let _ = new_tx.send(DaveApiResponse::Failed( 225 "Query already in progress".to_string() 226 )); 227 } 228 SessionCommand::SetPermissionMode { mode, ctx: mode_ctx } => { 229 // Permission mode change during query - apply it 230 tracing::debug!("Session {} setting permission mode to {:?} during query", session_id, mode); 231 if let Err(err) = client.set_permission_mode(mode).await { 232 tracing::error!("Failed to set permission mode: {}", err); 233 } 234 mode_ctx.request_repaint(); 235 } 236 SessionCommand::Compact { response_tx: compact_tx, .. } => { 237 let _ = compact_tx.send(DaveApiResponse::Failed( 238 "Cannot compact during active turn".to_string(), 239 )); 240 } 241 SessionCommand::Shutdown => { 242 tracing::debug!("Session actor {} shutting down during query", session_id); 243 // Drop stream and disconnect - break to exit loop first 244 drop(stream); 245 if let Err(err) = client.disconnect().await { 246 tracing::warn!("Error disconnecting session {}: {}", session_id, err); 247 } 248 tracing::debug!("Session {} actor exited", session_id); 249 return; 250 } 251 } 252 } 253 254 // Handle permission requests (they're blocking the SDK) 255 Some(perm_req) = perm_rx.recv() => { 256 if shared::should_auto_accept(&perm_req.tool_name, &perm_req.tool_input) { 257 let _ = perm_req.response_tx.send(PermissionResult::Allow(PermissionResultAllow::default())); 258 continue; 259 } 260 261 let ui_resp_rx = match shared::forward_permission_to_ui( 262 &perm_req.tool_name, 263 perm_req.tool_input.clone(), 264 &response_tx, 265 &ctx, 266 ) { 267 Some(rx) => rx, 268 None => { 269 let _ = perm_req.response_tx.send(PermissionResult::Deny(PermissionResultDeny { 270 message: "UI channel closed".to_string(), 271 interrupt: true, 272 })); 273 continue; 274 } 275 }; 276 277 // Wait for UI response inline - blocking is OK since stream is 278 // waiting for permission result anyway 279 let tool_name = perm_req.tool_name.clone(); 280 let result = match ui_resp_rx.await { 281 Ok(PermissionResponse::Allow { message }) => { 282 if let Some(msg) = &message { 283 tracing::debug!("User allowed tool {} with message: {}", tool_name, msg); 284 // Inject user message into conversation so AI sees it 285 if let Err(err) = client.query_with_content_and_session( 286 vec![UserContentBlock::text(msg.as_str())], 287 &session_id 288 ).await { 289 tracing::error!("Failed to inject user message: {}", err); 290 } 291 } else { 292 tracing::debug!("User allowed tool: {}", tool_name); 293 } 294 PermissionResult::Allow(PermissionResultAllow::default()) 295 } 296 Ok(PermissionResponse::Deny { reason }) => { 297 tracing::debug!("User denied tool {}: {}", tool_name, reason); 298 PermissionResult::Deny(PermissionResultDeny { 299 message: reason, 300 interrupt: false, 301 }) 302 } 303 Err(_) => { 304 tracing::error!("Permission response channel closed"); 305 PermissionResult::Deny(PermissionResultDeny { 306 message: "Permission request cancelled".to_string(), 307 interrupt: true, 308 }) 309 } 310 }; 311 let _ = perm_req.response_tx.send(result); 312 } 313 314 stream_result = stream.next() => { 315 match stream_result { 316 Some(Ok(message)) => { 317 match message { 318 ClaudeMessage::Assistant(assistant_msg) => { 319 for block in &assistant_msg.message.content { 320 if let ContentBlock::ToolUse(ToolUseBlock { id, name, input }) = block { 321 pending_tools.insert(id.clone(), (name.clone(), input.clone())); 322 323 // Emit SubagentSpawned for Task tool calls 324 if name == "Task" { 325 let description = input 326 .get("description") 327 .and_then(|v| v.as_str()) 328 .unwrap_or("task") 329 .to_string(); 330 let subagent_type = input 331 .get("subagent_type") 332 .and_then(|v| v.as_str()) 333 .unwrap_or("unknown") 334 .to_string(); 335 336 subagent_stack.push(id.clone()); 337 let subagent_info = SubagentInfo { 338 task_id: id.clone(), 339 description, 340 subagent_type, 341 status: SubagentStatus::Running, 342 output: String::new(), 343 max_output_size: 4000, 344 tool_results: Vec::new(), 345 }; 346 let _ = response_tx.send(DaveApiResponse::SubagentSpawned(subagent_info)); 347 ctx.request_repaint(); 348 } 349 } 350 } 351 } 352 ClaudeMessage::StreamEvent(event) => { 353 if let Some(event_type) = event.event.get("type").and_then(|v| v.as_str()) { 354 if event_type == "content_block_delta" { 355 if let Some(text) = event 356 .event 357 .get("delta") 358 .and_then(|d| d.get("text")) 359 .and_then(|t| t.as_str()) 360 { 361 if response_tx.send(DaveApiResponse::Token(text.to_string())).is_err() { 362 tracing::error!("Failed to send token to UI"); 363 // Setting stream_done isn't needed since we break immediately 364 break; 365 } 366 ctx.request_repaint(); 367 } 368 } 369 } 370 } 371 ClaudeMessage::Result(result_msg) => { 372 if result_msg.is_error { 373 let error_text = result_msg 374 .result 375 .unwrap_or_else(|| "Unknown error".to_string()); 376 let _ = response_tx.send(DaveApiResponse::Failed(error_text)); 377 } 378 379 // Extract usage metrics 380 tracing::debug!( 381 "ResultMessage usage: {:?}, total_cost_usd: {:?}, num_turns: {}", 382 result_msg.usage, 383 result_msg.total_cost_usd, 384 result_msg.num_turns 385 ); 386 let (input_tokens, output_tokens) = result_msg 387 .usage 388 .as_ref() 389 .map(|u| { 390 let inp = u.get("input_tokens") 391 .and_then(|v| v.as_u64()) 392 .unwrap_or(0); 393 let out = u.get("output_tokens") 394 .and_then(|v| v.as_u64()) 395 .unwrap_or(0); 396 (inp, out) 397 }) 398 .unwrap_or((0, 0)); 399 400 let usage_info = crate::messages::UsageInfo { 401 input_tokens, 402 output_tokens, 403 cost_usd: result_msg.total_cost_usd, 404 num_turns: result_msg.num_turns, 405 }; 406 let _ = response_tx.send(DaveApiResponse::QueryComplete(usage_info)); 407 408 stream_done = true; 409 } 410 ClaudeMessage::User(user_msg) => { 411 // Tool results are nested in extra["message"]["content"] 412 // since the SDK's UserMessage.content field doesn't 413 // capture the inner message's content array. 414 let content_blocks: Vec<ContentBlock> = user_msg 415 .extra 416 .get("message") 417 .and_then(|m| m.get("content")) 418 .and_then(|c| c.as_array()) 419 .map(|arr| { 420 arr.iter() 421 .filter_map(|v| serde_json::from_value::<ContentBlock>(v.clone()).ok()) 422 .collect() 423 }) 424 .unwrap_or_default(); 425 426 for block in &content_blocks { 427 if let ContentBlock::ToolResult(tool_result_block) = block { 428 let tool_use_id = &tool_result_block.tool_use_id; 429 if let Some((tool_name, tool_input)) = pending_tools.remove(tool_use_id) { 430 let result_value = tool_result_content_to_value(&tool_result_block.content); 431 432 // Check if this is a Task tool completion 433 if tool_name == "Task" { 434 let result_text = extract_response_content(&result_value) 435 .unwrap_or_else(|| "completed".to_string()); 436 shared::complete_subagent(tool_use_id, &result_text, &mut subagent_stack, &response_tx, &ctx); 437 } 438 439 let file_update = FileUpdate::from_tool_call(&tool_name, &tool_input); 440 shared::send_tool_result(&tool_name, &tool_input, &result_value, file_update, &subagent_stack, &response_tx, &ctx); 441 } 442 } 443 } 444 } 445 ClaudeMessage::System(system_msg) => { 446 // Handle system init message - extract session info 447 if system_msg.subtype == "init" { 448 let session_info = parse_session_info(&system_msg); 449 let _ = response_tx.send(DaveApiResponse::SessionInfo(session_info)); 450 ctx.request_repaint(); 451 } else if system_msg.subtype == "status" { 452 // Handle status messages (compaction start/end) 453 let status = system_msg.data.get("status") 454 .and_then(|v| v.as_str()); 455 if status == Some("compacting") { 456 let _ = response_tx.send(DaveApiResponse::CompactionStarted); 457 ctx.request_repaint(); 458 } 459 // status: null means compaction finished (handled by compact_boundary) 460 } else if system_msg.subtype == "compact_boundary" { 461 // Compaction completed - extract token savings info 462 tracing::debug!("compact_boundary data: {:?}", system_msg.data); 463 let pre_tokens = system_msg.data.get("pre_tokens") 464 .and_then(|v| v.as_u64()) 465 .unwrap_or(0); 466 let info = CompactionInfo { pre_tokens }; 467 let _ = response_tx.send(DaveApiResponse::CompactionComplete(info)); 468 ctx.request_repaint(); 469 } else { 470 tracing::debug!("Received system message subtype: {}", system_msg.subtype); 471 } 472 } 473 ClaudeMessage::ControlCancelRequest(_) => { 474 // Ignore internal control messages 475 } 476 } 477 } 478 Some(Err(err)) => { 479 // Non-fatal: unknown message types (e.g. rate_limit_event) 480 // cause deserialization errors but the stream continues. 481 tracing::warn!("Claude stream message skipped: {}", err); 482 } 483 None => { 484 stream_done = true; 485 } 486 } 487 } 488 } 489 } 490 491 tracing::debug!("Query complete for session {}", session_id); 492 // Don't disconnect - keep the connection alive for subsequent queries 493 } 494 SessionCommand::Interrupt { ctx } => { 495 // Interrupt received when not in a query - just request repaint 496 tracing::debug!( 497 "Session {} received interrupt but no query active", 498 session_id 499 ); 500 ctx.request_repaint(); 501 } 502 SessionCommand::SetPermissionMode { mode, ctx } => { 503 tracing::debug!( 504 "Session {} setting permission mode to {:?}", 505 session_id, 506 mode 507 ); 508 if let Err(err) = client.set_permission_mode(mode).await { 509 tracing::error!("Failed to set permission mode: {}", err); 510 } 511 ctx.request_repaint(); 512 } 513 SessionCommand::Compact { response_tx, .. } => { 514 // Claude compact is normally routed via compact_session() which 515 // sends /compact as a Query. If a Compact command arrives directly, 516 // just drop the tx — the caller will see it disconnected. 517 tracing::debug!( 518 "Session {} received Compact command (not expected for Claude)", 519 session_id 520 ); 521 drop(response_tx); 522 } 523 SessionCommand::Shutdown => { 524 tracing::debug!("Session actor {} shutting down", session_id); 525 break; 526 } 527 } 528 } 529 530 // Disconnect when shutting down 531 if let Err(err) = client.disconnect().await { 532 tracing::warn!("Error disconnecting session {}: {}", session_id, err); 533 } 534 tracing::debug!("Session {} actor exited", session_id); 535 } 536 537 impl AiBackend for ClaudeBackend { 538 fn stream_request( 539 &self, 540 messages: Vec<Message>, 541 _tools: Arc<HashMap<String, Tool>>, 542 _model: String, 543 _user_id: String, 544 session_id: String, 545 cwd: Option<PathBuf>, 546 resume_session_id: Option<String>, 547 ctx: egui::Context, 548 ) -> ( 549 mpsc::Receiver<DaveApiResponse>, 550 Option<tokio::task::JoinHandle<()>>, 551 ) { 552 let (response_tx, response_rx) = mpsc::channel(); 553 554 let prompt = shared::prepare_prompt(&messages, &resume_session_id); 555 556 tracing::debug!( 557 "Sending request to Claude Code: session={}, resumed={}, prompt length: {}, preview: {:?}", 558 session_id, 559 resume_session_id.is_some(), 560 prompt.len(), 561 &prompt[..prompt.len().min(100)] 562 ); 563 564 // Get or create session actor 565 let command_tx = { 566 let entry = self.sessions.entry(session_id.clone()); 567 let handle = entry.or_insert_with(|| { 568 let (command_tx, command_rx) = tokio_mpsc::channel(16); 569 570 // Spawn session actor with cwd and optional resume session ID 571 let session_id_clone = session_id.clone(); 572 let cwd_clone = cwd.clone(); 573 let resume_session_id_clone = resume_session_id.clone(); 574 tokio::spawn(async move { 575 session_actor( 576 session_id_clone, 577 cwd_clone, 578 resume_session_id_clone, 579 command_rx, 580 ) 581 .await; 582 }); 583 584 SessionHandle { command_tx } 585 }); 586 handle.command_tx.clone() 587 }; 588 589 // Spawn a task to send the query command 590 let handle = tokio::spawn(async move { 591 if let Err(err) = command_tx 592 .send(SessionCommand::Query { 593 prompt, 594 response_tx, 595 ctx, 596 }) 597 .await 598 { 599 tracing::error!("Failed to send query command to session actor: {}", err); 600 } 601 }); 602 603 (response_rx, Some(handle)) 604 } 605 606 fn cleanup_session(&self, session_id: String) { 607 if let Some((_, handle)) = self.sessions.remove(&session_id) { 608 tokio::spawn(async move { 609 if let Err(err) = handle.command_tx.send(SessionCommand::Shutdown).await { 610 tracing::warn!("Failed to send shutdown command: {}", err); 611 } 612 }); 613 } 614 } 615 616 fn interrupt_session(&self, session_id: String, ctx: egui::Context) { 617 if let Some(handle) = self.sessions.get(&session_id) { 618 let command_tx = handle.command_tx.clone(); 619 tokio::spawn(async move { 620 if let Err(err) = command_tx.send(SessionCommand::Interrupt { ctx }).await { 621 tracing::warn!("Failed to send interrupt command: {}", err); 622 } 623 }); 624 } 625 } 626 627 fn set_permission_mode(&self, session_id: String, mode: PermissionMode, ctx: egui::Context) { 628 if let Some(handle) = self.sessions.get(&session_id) { 629 let command_tx = handle.command_tx.clone(); 630 tokio::spawn(async move { 631 if let Err(err) = command_tx 632 .send(SessionCommand::SetPermissionMode { mode, ctx }) 633 .await 634 { 635 tracing::warn!("Failed to send set_permission_mode command: {}", err); 636 } 637 }); 638 } else { 639 tracing::debug!( 640 "Session {} not active, permission mode will apply on next query", 641 session_id 642 ); 643 } 644 } 645 646 fn compact_session( 647 &self, 648 session_id: String, 649 ctx: egui::Context, 650 ) -> Option<mpsc::Receiver<DaveApiResponse>> { 651 let handle = self.sessions.get(&session_id)?; 652 let command_tx = handle.command_tx.clone(); 653 let (response_tx, response_rx) = mpsc::channel(); 654 tokio::spawn(async move { 655 if let Err(err) = command_tx 656 .send(SessionCommand::Query { 657 prompt: "/compact".to_string(), 658 response_tx, 659 ctx, 660 }) 661 .await 662 { 663 tracing::warn!("Failed to send compact query to claude session: {}", err); 664 } 665 }); 666 Some(response_rx) 667 } 668 } 669 670 #[cfg(test)] 671 mod tests { 672 use super::*; 673 use crate::messages::AssistantMessage; 674 675 #[test] 676 fn pending_messages_single_user() { 677 let messages = vec![Message::User("hello".into())]; 678 assert_eq!(shared::get_pending_user_messages(&messages), "hello"); 679 } 680 681 #[test] 682 fn pending_messages_multiple_trailing_users() { 683 let messages = vec![ 684 Message::User("first".into()), 685 Message::Assistant(AssistantMessage::from_text("reply".into())), 686 Message::User("second".into()), 687 Message::User("third".into()), 688 Message::User("fourth".into()), 689 ]; 690 assert_eq!( 691 shared::get_pending_user_messages(&messages), 692 "second\nthird\nfourth" 693 ); 694 } 695 696 #[test] 697 fn pending_messages_stops_at_non_user() { 698 let messages = vec![ 699 Message::User("old".into()), 700 Message::User("also old".into()), 701 Message::Assistant(AssistantMessage::from_text("reply".into())), 702 Message::User("pending".into()), 703 ]; 704 assert_eq!(shared::get_pending_user_messages(&messages), "pending"); 705 } 706 707 #[test] 708 fn pending_messages_empty_when_last_is_assistant() { 709 let messages = vec![ 710 Message::User("hello".into()), 711 Message::Assistant(AssistantMessage::from_text("reply".into())), 712 ]; 713 assert_eq!(shared::get_pending_user_messages(&messages), ""); 714 } 715 716 #[test] 717 fn pending_messages_empty_chat() { 718 let messages: Vec<Message> = vec![]; 719 assert_eq!(shared::get_pending_user_messages(&messages), ""); 720 } 721 722 #[test] 723 fn pending_messages_stops_at_tool_response() { 724 let messages = vec![ 725 Message::User("do something".into()), 726 Message::Assistant(AssistantMessage::from_text("ok".into())), 727 Message::ToolCalls(vec![crate::tools::ToolCall::invalid( 728 "c1".into(), 729 Some("Read".into()), 730 None, 731 "test".into(), 732 )]), 733 Message::ToolResponse(crate::tools::ToolResponse::error( 734 "c1".into(), 735 "result".into(), 736 )), 737 Message::User("queued 1".into()), 738 Message::User("queued 2".into()), 739 ]; 740 assert_eq!( 741 shared::get_pending_user_messages(&messages), 742 "queued 1\nqueued 2" 743 ); 744 } 745 746 #[test] 747 fn pending_messages_preserves_order() { 748 let messages = vec![ 749 Message::User("a".into()), 750 Message::User("b".into()), 751 Message::User("c".into()), 752 ]; 753 assert_eq!(shared::get_pending_user_messages(&messages), "a\nb\nc"); 754 } 755 }