codex.rs (125158B)
1 //! Codex backend — orchestrates OpenAI's Codex CLI (`codex app-server`) 2 //! via its JSON-RPC-over-stdio protocol. 3 4 use super::codex_protocol::*; 5 use super::shared::{self, SessionCommand, SessionHandle}; 6 use crate::backend::traits::AiBackend; 7 use crate::file_update::{FileUpdate, FileUpdateType}; 8 use crate::messages::{ 9 CompactionInfo, DaveApiResponse, PermissionResponse, SessionInfo, SubagentInfo, SubagentStatus, 10 UsageInfo, 11 }; 12 use crate::tools::Tool; 13 use crate::Message; 14 use claude_agent_sdk_rs::PermissionMode; 15 use dashmap::DashMap; 16 use serde_json::Value; 17 use std::collections::HashMap; 18 use std::path::PathBuf; 19 use std::sync::mpsc; 20 use std::sync::Arc; 21 use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader}; 22 use tokio::process::{Child, Command}; 23 use tokio::sync::mpsc as tokio_mpsc; 24 use tokio::sync::oneshot; 25 use uuid::Uuid; 26 27 // --------------------------------------------------------------------------- 28 // Session actor 29 // --------------------------------------------------------------------------- 30 31 /// Result of processing a single Codex JSON-RPC message. 32 enum HandleResult { 33 /// Normal notification processed, keep reading. 34 Continue, 35 /// `turn/completed` received — this turn is done. 36 TurnDone, 37 /// Auto-accept matched — send accept for this rpc_id immediately. 38 AutoAccepted(u64), 39 /// Needs UI approval — stash the receiver and wait for the user. 40 NeedsApproval { 41 rpc_id: u64, 42 rx: oneshot::Receiver<PermissionResponse>, 43 }, 44 } 45 46 /// Per-session actor that owns the `codex app-server` child process. 47 async fn session_actor( 48 session_id: String, 49 cwd: Option<PathBuf>, 50 codex_binary: String, 51 model: Option<String>, 52 resume_session_id: Option<String>, 53 mut command_rx: tokio_mpsc::Receiver<SessionCommand>, 54 ) { 55 // Spawn the codex app-server child process 56 let mut child = match spawn_codex(&codex_binary, &cwd) { 57 Ok(c) => c, 58 Err(err) => { 59 tracing::error!("Session {} failed to spawn codex: {}", session_id, err); 60 drain_commands_with_error(&mut command_rx, &format!("Failed to spawn codex: {}", err)) 61 .await; 62 return; 63 } 64 }; 65 66 let stdin = child.stdin.take().expect("stdin piped"); 67 let stdout = child.stdout.take().expect("stdout piped"); 68 69 // Drain stderr in a background task to prevent pipe deadlock 70 if let Some(stderr) = child.stderr.take() { 71 let sid = session_id.clone(); 72 tokio::spawn(async move { 73 let mut lines = BufReader::new(stderr).lines(); 74 while let Ok(Some(line)) = lines.next_line().await { 75 tracing::trace!("Codex stderr [{}]: {}", sid, line); 76 } 77 }); 78 } 79 80 let writer = tokio::io::BufWriter::new(stdin); 81 let reader = BufReader::new(stdout).lines(); 82 let cwd_str = cwd.as_ref().map(|p| p.to_string_lossy().into_owned()); 83 84 session_actor_loop( 85 &session_id, 86 writer, 87 reader, 88 model.as_deref(), 89 cwd_str.as_deref(), 90 resume_session_id.as_deref(), 91 &mut command_rx, 92 ) 93 .await; 94 95 let _ = child.kill().await; 96 tracing::debug!("Session {} actor exited", session_id); 97 } 98 99 /// Core session loop, generic over I/O for testability. 100 /// 101 /// Performs the init handshake, thread start/resume, and main command loop. 102 /// Returns when the session is shut down or an unrecoverable error occurs. 103 /// The caller is responsible for process lifecycle (spawn, kill). 104 async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>( 105 session_id: &str, 106 mut writer: tokio::io::BufWriter<W>, 107 mut reader: tokio::io::Lines<R>, 108 model: Option<&str>, 109 cwd: Option<&str>, 110 resume_session_id: Option<&str>, 111 command_rx: &mut tokio_mpsc::Receiver<SessionCommand>, 112 ) { 113 // ---- init handshake ---- 114 if let Err(err) = do_init_handshake(&mut writer, &mut reader).await { 115 tracing::error!("Session {} init handshake failed: {}", session_id, err); 116 drain_commands_with_error(command_rx, &format!("Codex init handshake failed: {}", err)) 117 .await; 118 return; 119 } 120 121 // ---- thread start / resume ---- 122 let thread_id = if let Some(tid) = resume_session_id { 123 match send_thread_resume(&mut writer, &mut reader, tid).await { 124 Ok(id) => id, 125 Err(err) => { 126 tracing::error!("Session {} thread/resume failed: {}", session_id, err); 127 drain_commands_with_error( 128 command_rx, 129 &format!("Codex thread/resume failed: {}", err), 130 ) 131 .await; 132 return; 133 } 134 } 135 } else { 136 match send_thread_start(&mut writer, &mut reader, model, cwd).await { 137 Ok(id) => id, 138 Err(err) => { 139 tracing::error!("Session {} thread/start failed: {}", session_id, err); 140 drain_commands_with_error( 141 command_rx, 142 &format!("Codex thread/start failed: {}", err), 143 ) 144 .await; 145 return; 146 } 147 } 148 }; 149 150 tracing::info!( 151 "Session {} connected to codex, thread_id={}", 152 session_id, 153 thread_id 154 ); 155 156 // ---- main command loop ---- 157 let mut request_counter: u64 = 10; // start after init IDs 158 let mut current_turn_id: Option<String> = None; 159 let mut sent_session_info = false; 160 let mut turn_count: u32 = 0; 161 162 while let Some(cmd) = command_rx.recv().await { 163 match cmd { 164 SessionCommand::Query { 165 prompt, 166 response_tx, 167 ctx, 168 } => { 169 // Emit session info on the first query so Nostr replication can start 170 if !sent_session_info { 171 sent_session_info = true; 172 let info = SessionInfo { 173 model: model.map(|s| s.to_string()), 174 claude_session_id: Some(thread_id.clone()), 175 ..Default::default() 176 }; 177 let _ = response_tx.send(DaveApiResponse::SessionInfo(info)); 178 ctx.request_repaint(); 179 } 180 181 // Send turn/start 182 turn_count += 1; 183 request_counter += 1; 184 let turn_req_id = request_counter; 185 if let Err(err) = 186 send_turn_start(&mut writer, turn_req_id, &thread_id, &prompt, model).await 187 { 188 tracing::error!("Session {} turn/start failed: {}", session_id, err); 189 let _ = response_tx.send(DaveApiResponse::Failed(err.to_string())); 190 continue; 191 } 192 193 // Read the turn/start response 194 match read_response_for_id(&mut reader, turn_req_id).await { 195 Ok(msg) => { 196 if let Some(err) = msg.error { 197 tracing::error!( 198 "Session {} turn/start error: {}", 199 session_id, 200 err.message 201 ); 202 let _ = response_tx.send(DaveApiResponse::Failed(err.message)); 203 continue; 204 } 205 if let Some(result) = &msg.result { 206 current_turn_id = result 207 .get("turn") 208 .and_then(|t| t.get("id")) 209 .and_then(|v| v.as_str()) 210 .map(|s| s.to_string()); 211 } 212 } 213 Err(err) => { 214 tracing::error!( 215 "Session {} failed reading turn/start response: {}", 216 session_id, 217 err 218 ); 219 let _ = response_tx.send(DaveApiResponse::Failed(err.to_string())); 220 continue; 221 } 222 } 223 224 // Stream notifications until turn/completed 225 let mut subagent_stack: Vec<String> = Vec::new(); 226 let mut turn_done = false; 227 let mut pending_approval: Option<(u64, oneshot::Receiver<PermissionResponse>)> = 228 None; 229 230 while !turn_done { 231 if let Some((rpc_id, mut rx)) = pending_approval.take() { 232 // ---- approval-wait state ---- 233 // Codex is blocked waiting for our response, so no new 234 // lines will arrive. Select between the UI response and 235 // commands (interrupt / shutdown). 236 tokio::select! { 237 biased; 238 239 Some(cmd) = command_rx.recv() => { 240 match cmd { 241 SessionCommand::Interrupt { ctx: int_ctx } => { 242 tracing::debug!("Session {} interrupted during approval", session_id); 243 // Cancel the approval and interrupt the turn 244 let _ = send_approval_response(&mut writer, rpc_id, ApprovalDecision::Cancel).await; 245 if let Some(ref tid) = current_turn_id { 246 request_counter += 1; 247 let _ = send_turn_interrupt(&mut writer, request_counter, &thread_id, tid).await; 248 } 249 int_ctx.request_repaint(); 250 // Don't restore pending — it's been cancelled 251 } 252 SessionCommand::Shutdown => { 253 tracing::debug!("Session {} shutting down during approval", session_id); 254 return; 255 } 256 SessionCommand::Query { response_tx: new_tx, .. } => { 257 let _ = new_tx.send(DaveApiResponse::Failed( 258 "Query already in progress".to_string(), 259 )); 260 // Restore the pending approval — still waiting 261 pending_approval = Some((rpc_id, rx)); 262 } 263 SessionCommand::SetPermissionMode { ctx: mode_ctx, .. } => { 264 mode_ctx.request_repaint(); 265 pending_approval = Some((rpc_id, rx)); 266 } 267 SessionCommand::Compact { response_tx: compact_tx, .. } => { 268 let _ = compact_tx.send(DaveApiResponse::Failed( 269 "Cannot compact during active turn".to_string(), 270 )); 271 pending_approval = Some((rpc_id, rx)); 272 } 273 } 274 } 275 276 result = &mut rx => { 277 let decision = match result { 278 Ok(PermissionResponse::Allow { .. }) => ApprovalDecision::Accept, 279 Ok(PermissionResponse::Deny { .. }) => ApprovalDecision::Decline, 280 Err(_) => ApprovalDecision::Cancel, 281 }; 282 let _ = send_approval_response(&mut writer, rpc_id, decision).await; 283 } 284 } 285 } else { 286 // ---- normal streaming state ---- 287 tokio::select! { 288 biased; 289 290 Some(cmd) = command_rx.recv() => { 291 match cmd { 292 SessionCommand::Interrupt { ctx: int_ctx } => { 293 tracing::debug!("Session {} interrupted", session_id); 294 if let Some(ref tid) = current_turn_id { 295 request_counter += 1; 296 let _ = send_turn_interrupt(&mut writer, request_counter, &thread_id, tid).await; 297 } 298 int_ctx.request_repaint(); 299 } 300 SessionCommand::Query { response_tx: new_tx, .. } => { 301 let _ = new_tx.send(DaveApiResponse::Failed( 302 "Query already in progress".to_string(), 303 )); 304 } 305 SessionCommand::SetPermissionMode { mode, ctx: mode_ctx } => { 306 tracing::debug!( 307 "Session {} ignoring permission mode {:?} (not supported by Codex)", 308 session_id, mode 309 ); 310 mode_ctx.request_repaint(); 311 } 312 SessionCommand::Compact { response_tx: compact_tx, .. } => { 313 let _ = compact_tx.send(DaveApiResponse::Failed( 314 "Cannot compact during active turn".to_string(), 315 )); 316 } 317 SessionCommand::Shutdown => { 318 tracing::debug!("Session {} shutting down during query", session_id); 319 return; 320 } 321 } 322 } 323 324 line_result = reader.next_line() => { 325 match line_result { 326 Ok(Some(line)) => { 327 let msg: RpcMessage = match serde_json::from_str(&line) { 328 Ok(m) => m, 329 Err(err) => { 330 tracing::warn!("Codex parse error: {} in: {}", err, &line[..line.len().min(200)]); 331 continue; 332 } 333 }; 334 335 match handle_codex_message( 336 msg, 337 &response_tx, 338 &ctx, 339 &mut subagent_stack, 340 &turn_count, 341 ) { 342 HandleResult::Continue => {} 343 HandleResult::TurnDone => { 344 turn_done = true; 345 } 346 HandleResult::AutoAccepted(rpc_id) => { 347 let _ = send_approval_response( 348 &mut writer, rpc_id, ApprovalDecision::Accept, 349 ).await; 350 } 351 HandleResult::NeedsApproval { rpc_id, rx } => { 352 pending_approval = Some((rpc_id, rx)); 353 } 354 } 355 } 356 Ok(None) => { 357 tracing::error!("Session {} codex process exited unexpectedly", session_id); 358 let _ = response_tx.send(DaveApiResponse::Failed( 359 "Codex process exited unexpectedly".to_string(), 360 )); 361 turn_done = true; 362 } 363 Err(err) => { 364 tracing::error!("Session {} read error: {}", session_id, err); 365 let _ = response_tx.send(DaveApiResponse::Failed(err.to_string())); 366 turn_done = true; 367 } 368 } 369 } 370 } 371 } 372 } 373 374 current_turn_id = None; 375 // Drop the response channel so the main loop sees a 376 // Disconnected signal and finalizes the assistant message 377 // (builds kind-1988 event for Nostr replication). 378 drop(response_tx); 379 tracing::debug!("Turn complete for session {}", session_id); 380 } 381 SessionCommand::Interrupt { ctx } => { 382 ctx.request_repaint(); 383 } 384 SessionCommand::SetPermissionMode { mode, ctx } => { 385 tracing::debug!( 386 "Session {} ignoring permission mode {:?} (not supported by Codex)", 387 session_id, 388 mode 389 ); 390 ctx.request_repaint(); 391 } 392 SessionCommand::Compact { response_tx, ctx } => { 393 request_counter += 1; 394 let compact_req_id = request_counter; 395 396 // Send thread/compact/start RPC 397 if let Err(err) = send_thread_compact(&mut writer, compact_req_id, &thread_id).await 398 { 399 tracing::error!( 400 "Session {} thread/compact/start failed: {}", 401 session_id, 402 err 403 ); 404 let _ = response_tx.send(DaveApiResponse::Failed(err)); 405 ctx.request_repaint(); 406 continue; 407 } 408 409 // Read the RPC response (empty {}) 410 match read_response_for_id(&mut reader, compact_req_id).await { 411 Ok(msg) => { 412 if let Some(err) = msg.error { 413 tracing::error!( 414 "Session {} thread/compact/start error: {}", 415 session_id, 416 err.message 417 ); 418 let _ = response_tx.send(DaveApiResponse::Failed(err.message)); 419 ctx.request_repaint(); 420 continue; 421 } 422 } 423 Err(err) => { 424 tracing::error!( 425 "Session {} failed reading compact response: {}", 426 session_id, 427 err 428 ); 429 let _ = response_tx.send(DaveApiResponse::Failed(err)); 430 ctx.request_repaint(); 431 continue; 432 } 433 } 434 435 // Compact accepted — stream notifications until item/completed 436 let _ = response_tx.send(DaveApiResponse::CompactionStarted); 437 ctx.request_repaint(); 438 439 loop { 440 tokio::select! { 441 biased; 442 443 Some(cmd) = command_rx.recv() => { 444 match cmd { 445 SessionCommand::Shutdown => { 446 tracing::debug!("Session {} shutting down during compact", session_id); 447 return; 448 } 449 _ => { 450 // Ignore other commands during compaction 451 } 452 } 453 } 454 455 line_result = reader.next_line() => { 456 match line_result { 457 Ok(Some(line)) => { 458 let msg: RpcMessage = match serde_json::from_str(&line) { 459 Ok(m) => m, 460 Err(err) => { 461 tracing::warn!("Codex parse error during compact: {}", err); 462 continue; 463 } 464 }; 465 466 // Look for item/completed with contextCompaction 467 if msg.method.as_deref() == Some("item/completed") { 468 if let Some(ref params) = msg.params { 469 let item_type = params.get("type") 470 .and_then(|v| v.as_str()) 471 .unwrap_or(""); 472 if item_type == "contextCompaction" { 473 let pre_tokens = params.get("preTokens") 474 .and_then(|v| v.as_u64()) 475 .unwrap_or(0); 476 let _ = response_tx.send(DaveApiResponse::CompactionComplete( 477 CompactionInfo { pre_tokens }, 478 )); 479 ctx.request_repaint(); 480 break; 481 } 482 } 483 } 484 } 485 Ok(None) => { 486 tracing::error!("Session {} codex process exited during compact", session_id); 487 let _ = response_tx.send(DaveApiResponse::Failed( 488 "Codex process exited during compaction".to_string(), 489 )); 490 break; 491 } 492 Err(err) => { 493 tracing::error!("Session {} read error during compact: {}", session_id, err); 494 let _ = response_tx.send(DaveApiResponse::Failed(err.to_string())); 495 break; 496 } 497 } 498 } 499 } 500 } 501 502 // Drop response channel to signal completion 503 drop(response_tx); 504 tracing::debug!("Compaction complete for session {}", session_id); 505 } 506 SessionCommand::Shutdown => { 507 tracing::debug!("Session {} shutting down", session_id); 508 break; 509 } 510 } 511 } 512 } 513 514 // --------------------------------------------------------------------------- 515 // Codex message handling (synchronous — no writer needed) 516 // --------------------------------------------------------------------------- 517 518 /// Extract a human-readable error message from a Codex error notification. 519 /// 520 /// Codex sends errors in several different shapes: 521 /// - `params.message` (string) 522 /// - `params.msg.message` (nested JSON string containing `{"detail":"..."}`) 523 /// - `params.error.message` (object with a `message` field) 524 /// - top-level `msg.error.message` (RPC error envelope) 525 fn extract_codex_error(msg: &RpcMessage) -> String { 526 if let Some(params) = &msg.params { 527 // params.message (string) 528 if let Some(s) = params.get("message").and_then(|m| m.as_str()) { 529 return s.to_string(); 530 } 531 // params.msg.message (nested — may itself be JSON like {"detail":"..."}) 532 if let Some(inner) = params 533 .get("msg") 534 .and_then(|m| m.get("message")) 535 .and_then(|m| m.as_str()) 536 { 537 // Try to unwrap a JSON {"detail":"..."} wrapper 538 if let Ok(v) = serde_json::from_str::<serde_json::Value>(inner) { 539 if let Some(detail) = v.get("detail").and_then(|d| d.as_str()) { 540 return detail.to_string(); 541 } 542 } 543 return inner.to_string(); 544 } 545 // params.error.message (error is an object, not a string) 546 if let Some(inner) = params 547 .get("error") 548 .and_then(|e| e.get("message")) 549 .and_then(|m| m.as_str()) 550 { 551 if let Ok(v) = serde_json::from_str::<serde_json::Value>(inner) { 552 if let Some(detail) = v.get("detail").and_then(|d| d.as_str()) { 553 return detail.to_string(); 554 } 555 } 556 return inner.to_string(); 557 } 558 // params.error (plain string) 559 if let Some(s) = params.get("error").and_then(|e| e.as_str()) { 560 return s.to_string(); 561 } 562 } 563 // Top-level RPC error envelope 564 if let Some(rpc_err) = &msg.error { 565 return rpc_err.message.clone(); 566 } 567 // Last resort — dump raw params 568 if let Some(p) = &msg.params { 569 tracing::debug!("Codex error unknown shape: {}", p); 570 } 571 "Codex error (no details)".to_string() 572 } 573 574 /// Process a single incoming Codex JSON-RPC message. Returns a `HandleResult` 575 /// indicating what the caller should do next (continue, finish turn, or handle 576 /// an approval). 577 fn handle_codex_message( 578 msg: RpcMessage, 579 response_tx: &mpsc::Sender<DaveApiResponse>, 580 ctx: &egui::Context, 581 subagent_stack: &mut Vec<String>, 582 turn_count: &u32, 583 ) -> HandleResult { 584 let method = match &msg.method { 585 Some(m) => m.as_str(), 586 None => { 587 tracing::debug!("codex msg with no method (response): id={:?}", msg.id); 588 return HandleResult::Continue; 589 } 590 }; 591 592 tracing::debug!( 593 "codex msg: method={} id={:?} has_params={}", 594 method, 595 msg.id, 596 msg.params.is_some() 597 ); 598 599 match method { 600 "item/agentMessage/delta" => { 601 if let Some(params) = msg.params { 602 if let Ok(delta) = serde_json::from_value::<AgentMessageDeltaParams>(params) { 603 let _ = response_tx.send(DaveApiResponse::Token(delta.delta)); 604 ctx.request_repaint(); 605 } 606 } 607 } 608 609 "item/started" => { 610 if let Some(params) = msg.params { 611 if let Ok(started) = serde_json::from_value::<ItemStartedParams>(params) { 612 match started.item_type.as_str() { 613 "collabAgentToolCall" => { 614 let item_id = started 615 .item_id 616 .unwrap_or_else(|| Uuid::new_v4().to_string()); 617 subagent_stack.push(item_id.clone()); 618 let info = SubagentInfo { 619 task_id: item_id, 620 description: started.name.unwrap_or_else(|| "agent".to_string()), 621 subagent_type: "codex-agent".to_string(), 622 status: SubagentStatus::Running, 623 output: String::new(), 624 max_output_size: 4000, 625 tool_results: Vec::new(), 626 }; 627 let _ = response_tx.send(DaveApiResponse::SubagentSpawned(info)); 628 ctx.request_repaint(); 629 } 630 "commandExecution" => { 631 let cmd = started.command.unwrap_or_default(); 632 let tool_input = serde_json::json!({ "command": cmd }); 633 let result_value = serde_json::json!({}); 634 shared::send_tool_result( 635 "Bash", 636 &tool_input, 637 &result_value, 638 None, 639 subagent_stack, 640 response_tx, 641 ctx, 642 ); 643 } 644 "fileChange" => { 645 let path = started.file_path.unwrap_or_default(); 646 let tool_input = serde_json::json!({ "file_path": path }); 647 let result_value = serde_json::json!({}); 648 shared::send_tool_result( 649 "Edit", 650 &tool_input, 651 &result_value, 652 None, 653 subagent_stack, 654 response_tx, 655 ctx, 656 ); 657 } 658 "contextCompaction" => { 659 let _ = response_tx.send(DaveApiResponse::CompactionStarted); 660 ctx.request_repaint(); 661 } 662 _ => {} 663 } 664 } 665 } 666 } 667 668 "item/completed" => { 669 if let Some(params) = msg.params { 670 if let Ok(completed) = serde_json::from_value::<ItemCompletedParams>(params) { 671 handle_item_completed(&completed, response_tx, ctx, subagent_stack); 672 } 673 } 674 } 675 676 "item/commandExecution/requestApproval" => { 677 tracing::info!( 678 "CMD APPROVAL: id={:?} has_params={}", 679 msg.id, 680 msg.params.is_some() 681 ); 682 if let (Some(rpc_id), Some(params)) = (msg.id, msg.params) { 683 tracing::info!( 684 "CMD APPROVAL params: {}", 685 serde_json::to_string(¶ms).unwrap_or_default() 686 ); 687 match serde_json::from_value::<CommandApprovalParams>(params) { 688 Ok(approval) => { 689 let cmd = approval.command_string(); 690 tracing::info!("CMD APPROVAL deserialized ok: command={}", cmd); 691 return check_approval_or_forward( 692 rpc_id, 693 "Bash", 694 serde_json::json!({ "command": cmd }), 695 response_tx, 696 ctx, 697 ); 698 } 699 Err(e) => { 700 tracing::error!("CMD APPROVAL deser FAILED: {}", e); 701 } 702 } 703 } else { 704 tracing::warn!("CMD APPROVAL missing id or params"); 705 } 706 } 707 708 "item/fileChange/requestApproval" => { 709 tracing::info!( 710 "FILE APPROVAL: id={:?} has_params={}", 711 msg.id, 712 msg.params.is_some() 713 ); 714 if let (Some(rpc_id), Some(params)) = (msg.id, msg.params) { 715 tracing::info!( 716 "FILE APPROVAL params: {}", 717 serde_json::to_string(¶ms).unwrap_or_default() 718 ); 719 match serde_json::from_value::<FileChangeApprovalParams>(params) { 720 Ok(approval) => { 721 let file_path = approval.file_path.as_deref().unwrap_or("unknown"); 722 let kind_str = approval 723 .kind 724 .as_ref() 725 .and_then(|k| k.get("type").and_then(|t| t.as_str())) 726 .unwrap_or("edit"); 727 728 let (tool_name, tool_input) = match kind_str { 729 "create" => ( 730 "Write", 731 serde_json::json!({ 732 "file_path": file_path, 733 "content": approval.diff.as_deref().unwrap_or(""), 734 }), 735 ), 736 _ => ( 737 "Edit", 738 serde_json::json!({ 739 "file_path": file_path, 740 "old_string": "", 741 "new_string": approval.diff.as_deref().unwrap_or(""), 742 }), 743 ), 744 }; 745 746 tracing::info!( 747 "FILE APPROVAL deserialized ok: tool={} file={}", 748 tool_name, 749 file_path 750 ); 751 return check_approval_or_forward( 752 rpc_id, 753 tool_name, 754 tool_input, 755 response_tx, 756 ctx, 757 ); 758 } 759 Err(e) => { 760 tracing::error!("FILE APPROVAL deser FAILED: {}", e); 761 } 762 } 763 } else { 764 tracing::warn!("FILE APPROVAL missing id or params"); 765 } 766 } 767 768 "thread/tokenUsage/updated" => { 769 if let Some(params) = msg.params { 770 if let Ok(usage) = serde_json::from_value::<TokenUsageParams>(params) { 771 let info = UsageInfo { 772 input_tokens: usage.token_usage.total.input_tokens as u64, 773 output_tokens: usage.token_usage.total.output_tokens as u64, 774 num_turns: *turn_count, 775 cost_usd: None, 776 }; 777 let _ = response_tx.send(DaveApiResponse::QueryComplete(info)); 778 ctx.request_repaint(); 779 } 780 } 781 } 782 783 "turn/completed" => { 784 if let Some(params) = msg.params { 785 if let Ok(completed) = serde_json::from_value::<TurnCompletedParams>(params) { 786 if completed.status == "failed" { 787 let err_msg = completed.error.unwrap_or_else(|| "Turn failed".to_string()); 788 let _ = response_tx.send(DaveApiResponse::Failed(err_msg)); 789 } 790 } 791 } 792 return HandleResult::TurnDone; 793 } 794 795 "codex/event/patch_apply_begin" => { 796 // Legacy event carrying full file-change details (paths + diffs). 797 // The V2 `item/completed` for fileChange is sparse, so we extract 798 // the diff from the legacy event and emit ToolResults here. 799 if let Some(params) = &msg.params { 800 if let Some(changes) = params 801 .get("msg") 802 .and_then(|m| m.get("changes")) 803 .and_then(|c| c.as_object()) 804 { 805 for (path, change) in changes { 806 let change_type = change 807 .get("type") 808 .and_then(|t| t.as_str()) 809 .unwrap_or("update"); 810 811 let (tool_name, diff_text) = match change_type { 812 "add" => { 813 let content = 814 change.get("content").and_then(|c| c.as_str()).unwrap_or(""); 815 ("Write", content.to_string()) 816 } 817 "delete" => ("Edit", "(file deleted)".to_string()), 818 _ => { 819 // "update" — has unified_diff 820 let diff = change 821 .get("unified_diff") 822 .and_then(|d| d.as_str()) 823 .unwrap_or(""); 824 ("Edit", diff.to_string()) 825 } 826 }; 827 828 let tool_input = serde_json::json!({ 829 "file_path": path, 830 "diff": diff_text, 831 }); 832 let result_value = serde_json::json!({ "status": "ok" }); 833 let file_update = 834 make_codex_file_update(path, tool_name, change_type, &diff_text); 835 shared::send_tool_result( 836 tool_name, 837 &tool_input, 838 &result_value, 839 file_update, 840 subagent_stack, 841 response_tx, 842 ctx, 843 ); 844 } 845 ctx.request_repaint(); 846 } 847 } 848 } 849 850 "codex/event/error" | "error" => { 851 let err_msg: String = extract_codex_error(&msg); 852 tracing::warn!("Codex error: {}", err_msg); 853 let _ = response_tx.send(DaveApiResponse::Failed(err_msg)); 854 ctx.request_repaint(); 855 } 856 857 other => { 858 tracing::debug!( 859 "Unhandled codex notification: {} id={:?} params={}", 860 other, 861 msg.id, 862 msg.params 863 .as_ref() 864 .map(|p| serde_json::to_string(p).unwrap_or_default()) 865 .unwrap_or_default() 866 ); 867 } 868 } 869 870 HandleResult::Continue 871 } 872 873 /// Check auto-accept rules. If matched, return `AutoAccepted`. Otherwise 874 /// create a `PendingPermission`, send it to the UI, and return `NeedsApproval` 875 /// with the oneshot receiver. 876 fn check_approval_or_forward( 877 rpc_id: u64, 878 tool_name: &str, 879 tool_input: Value, 880 response_tx: &mpsc::Sender<DaveApiResponse>, 881 ctx: &egui::Context, 882 ) -> HandleResult { 883 if shared::should_auto_accept(tool_name, &tool_input) { 884 return HandleResult::AutoAccepted(rpc_id); 885 } 886 887 match shared::forward_permission_to_ui(tool_name, tool_input, response_tx, ctx) { 888 Some(rx) => HandleResult::NeedsApproval { rpc_id, rx }, 889 // Can't reach UI — auto-accept as fallback 890 None => HandleResult::AutoAccepted(rpc_id), 891 } 892 } 893 894 /// Build a `FileUpdate` from codex file-change data. 895 fn make_codex_file_update( 896 path: &str, 897 tool_name: &str, 898 change_type: &str, 899 diff_text: &str, 900 ) -> Option<FileUpdate> { 901 let update_type = match (tool_name, change_type) { 902 ("Write", _) | (_, "add") | (_, "create") => FileUpdateType::Write { 903 content: diff_text.to_string(), 904 }, 905 _ => FileUpdateType::UnifiedDiff { 906 diff: diff_text.to_string(), 907 }, 908 }; 909 Some(FileUpdate::new(path.to_string(), update_type)) 910 } 911 912 /// Handle a completed item from Codex. 913 fn handle_item_completed( 914 completed: &ItemCompletedParams, 915 response_tx: &mpsc::Sender<DaveApiResponse>, 916 ctx: &egui::Context, 917 subagent_stack: &mut Vec<String>, 918 ) { 919 match completed.item_type.as_str() { 920 "commandExecution" => { 921 let command = completed.command.clone().unwrap_or_default(); 922 let exit_code = completed.exit_code.unwrap_or(-1); 923 let output = completed.output.clone().unwrap_or_default(); 924 925 let tool_input = serde_json::json!({ "command": command }); 926 let result_value = serde_json::json!({ "output": output, "exit_code": exit_code }); 927 shared::send_tool_result( 928 "Bash", 929 &tool_input, 930 &result_value, 931 None, 932 subagent_stack, 933 response_tx, 934 ctx, 935 ); 936 } 937 938 "fileChange" => { 939 let file_path = completed.file_path.clone().unwrap_or_default(); 940 let diff = completed.diff.clone(); 941 942 let kind_str = completed 943 .kind 944 .as_ref() 945 .and_then(|k| k.get("type").and_then(|t| t.as_str())) 946 .unwrap_or("edit"); 947 948 let tool_name = match kind_str { 949 "create" => "Write", 950 _ => "Edit", 951 }; 952 953 let tool_input = serde_json::json!({ 954 "file_path": file_path, 955 "diff": diff, 956 }); 957 let result_value = serde_json::json!({ "status": "ok" }); 958 let file_update = make_codex_file_update( 959 &file_path, 960 tool_name, 961 kind_str, 962 diff.as_deref().unwrap_or(""), 963 ); 964 shared::send_tool_result( 965 tool_name, 966 &tool_input, 967 &result_value, 968 file_update, 969 subagent_stack, 970 response_tx, 971 ctx, 972 ); 973 } 974 975 "collabAgentToolCall" => { 976 if let Some(item_id) = &completed.item_id { 977 let result_text = completed 978 .result 979 .clone() 980 .unwrap_or_else(|| "completed".to_string()); 981 shared::complete_subagent(item_id, &result_text, subagent_stack, response_tx, ctx); 982 } 983 } 984 985 "contextCompaction" => { 986 let pre_tokens = completed.pre_tokens.unwrap_or(0); 987 let _ = response_tx.send(DaveApiResponse::CompactionComplete(CompactionInfo { 988 pre_tokens, 989 })); 990 ctx.request_repaint(); 991 } 992 993 other => { 994 tracing::debug!("Unhandled item/completed type: {}", other); 995 } 996 } 997 } 998 999 // --------------------------------------------------------------------------- 1000 // Codex process spawning and JSON-RPC helpers 1001 // --------------------------------------------------------------------------- 1002 1003 fn spawn_codex(binary: &str, cwd: &Option<PathBuf>) -> Result<Child, std::io::Error> { 1004 let mut cmd = Command::new(binary); 1005 cmd.arg("app-server"); 1006 cmd.stdin(std::process::Stdio::piped()); 1007 cmd.stdout(std::process::Stdio::piped()); 1008 cmd.stderr(std::process::Stdio::piped()); 1009 if let Some(dir) = cwd { 1010 cmd.current_dir(dir); 1011 } 1012 cmd.spawn() 1013 } 1014 1015 /// Send a JSONL request on stdin. 1016 async fn send_request<P: serde::Serialize, W: AsyncWrite + Unpin>( 1017 writer: &mut tokio::io::BufWriter<W>, 1018 req: &RpcRequest<P>, 1019 ) -> Result<(), std::io::Error> { 1020 let mut line = serde_json::to_string(req) 1021 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; 1022 line.push('\n'); 1023 writer.write_all(line.as_bytes()).await?; 1024 writer.flush().await?; 1025 Ok(()) 1026 } 1027 1028 /// Send a JSON-RPC response (for approval requests). 1029 async fn send_rpc_response<W: AsyncWrite + Unpin>( 1030 writer: &mut tokio::io::BufWriter<W>, 1031 id: u64, 1032 result: Value, 1033 ) -> Result<(), std::io::Error> { 1034 let resp = serde_json::json!({ "id": id, "result": result }); 1035 let mut line = serde_json::to_string(&resp) 1036 .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?; 1037 line.push('\n'); 1038 writer.write_all(line.as_bytes()).await?; 1039 writer.flush().await?; 1040 Ok(()) 1041 } 1042 1043 /// Send an approval decision response. 1044 async fn send_approval_response<W: AsyncWrite + Unpin>( 1045 writer: &mut tokio::io::BufWriter<W>, 1046 rpc_id: u64, 1047 decision: ApprovalDecision, 1048 ) -> Result<(), std::io::Error> { 1049 let result = serde_json::to_value(ApprovalResponse { decision }).unwrap(); 1050 send_rpc_response(writer, rpc_id, result).await 1051 } 1052 1053 /// Perform the `initialize` → `initialized` handshake. 1054 async fn do_init_handshake<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>( 1055 writer: &mut tokio::io::BufWriter<W>, 1056 reader: &mut tokio::io::Lines<R>, 1057 ) -> Result<(), String> { 1058 let req = RpcRequest { 1059 id: Some(1), 1060 method: "initialize", 1061 params: InitializeParams { 1062 client_info: ClientInfo { 1063 name: "dave".to_string(), 1064 version: env!("CARGO_PKG_VERSION").to_string(), 1065 }, 1066 capabilities: serde_json::json!({}), 1067 }, 1068 }; 1069 1070 send_request(writer, &req) 1071 .await 1072 .map_err(|e| format!("Failed to send initialize: {}", e))?; 1073 1074 let resp = read_response_for_id(reader, 1) 1075 .await 1076 .map_err(|e| format!("Failed to read initialize response: {}", e))?; 1077 1078 if let Some(err) = resp.error { 1079 return Err(format!("Initialize error: {}", err.message)); 1080 } 1081 1082 // Send `initialized` notification (no id, no response expected) 1083 let notif: RpcRequest<Value> = RpcRequest { 1084 id: None, 1085 method: "initialized", 1086 params: serde_json::json!({}), 1087 }; 1088 send_request(writer, ¬if) 1089 .await 1090 .map_err(|e| format!("Failed to send initialized: {}", e))?; 1091 1092 Ok(()) 1093 } 1094 1095 /// Send `thread/start` and return the thread ID. 1096 async fn send_thread_start<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>( 1097 writer: &mut tokio::io::BufWriter<W>, 1098 reader: &mut tokio::io::Lines<R>, 1099 model: Option<&str>, 1100 cwd: Option<&str>, 1101 ) -> Result<String, String> { 1102 let req = RpcRequest { 1103 id: Some(2), 1104 method: "thread/start", 1105 params: ThreadStartParams { 1106 model: model.map(|s| s.to_string()), 1107 cwd: cwd.map(|s| s.to_string()), 1108 approval_policy: Some("on-request".to_string()), 1109 }, 1110 }; 1111 1112 send_request(writer, &req) 1113 .await 1114 .map_err(|e| format!("Failed to send thread/start: {}", e))?; 1115 1116 let resp = read_response_for_id(reader, 2) 1117 .await 1118 .map_err(|e| format!("Failed to read thread/start response: {}", e))?; 1119 1120 if let Some(err) = resp.error { 1121 return Err(format!("thread/start error: {}", err.message)); 1122 } 1123 1124 let result = resp.result.ok_or("No result in thread/start response")?; 1125 let thread_result: ThreadStartResult = serde_json::from_value(result) 1126 .map_err(|e| format!("Failed to parse thread/start result: {}", e))?; 1127 1128 Ok(thread_result.thread.id) 1129 } 1130 1131 /// Send `thread/resume` and return the thread ID. 1132 async fn send_thread_resume<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>( 1133 writer: &mut tokio::io::BufWriter<W>, 1134 reader: &mut tokio::io::Lines<R>, 1135 thread_id: &str, 1136 ) -> Result<String, String> { 1137 let req = RpcRequest { 1138 id: Some(3), 1139 method: "thread/resume", 1140 params: ThreadResumeParams { 1141 thread_id: thread_id.to_string(), 1142 }, 1143 }; 1144 1145 send_request(writer, &req) 1146 .await 1147 .map_err(|e| format!("Failed to send thread/resume: {}", e))?; 1148 1149 let resp = read_response_for_id(reader, 3) 1150 .await 1151 .map_err(|e| format!("Failed to read thread/resume response: {}", e))?; 1152 1153 if let Some(err) = resp.error { 1154 return Err(format!("thread/resume error: {}", err.message)); 1155 } 1156 1157 Ok(thread_id.to_string()) 1158 } 1159 1160 /// Send `turn/start`. 1161 async fn send_turn_start<W: AsyncWrite + Unpin>( 1162 writer: &mut tokio::io::BufWriter<W>, 1163 req_id: u64, 1164 thread_id: &str, 1165 prompt: &str, 1166 model: Option<&str>, 1167 ) -> Result<(), String> { 1168 let req = RpcRequest { 1169 id: Some(req_id), 1170 method: "turn/start", 1171 params: TurnStartParams { 1172 thread_id: thread_id.to_string(), 1173 input: vec![TurnInput::Text { 1174 text: prompt.to_string(), 1175 }], 1176 model: model.map(|s| s.to_string()), 1177 effort: None, 1178 }, 1179 }; 1180 1181 send_request(writer, &req) 1182 .await 1183 .map_err(|e| format!("Failed to send turn/start: {}", e)) 1184 } 1185 1186 /// Send `turn/interrupt`. 1187 async fn send_turn_interrupt<W: AsyncWrite + Unpin>( 1188 writer: &mut tokio::io::BufWriter<W>, 1189 req_id: u64, 1190 thread_id: &str, 1191 turn_id: &str, 1192 ) -> Result<(), String> { 1193 let req = RpcRequest { 1194 id: Some(req_id), 1195 method: "turn/interrupt", 1196 params: TurnInterruptParams { 1197 thread_id: thread_id.to_string(), 1198 turn_id: turn_id.to_string(), 1199 }, 1200 }; 1201 1202 send_request(writer, &req) 1203 .await 1204 .map_err(|e| format!("Failed to send turn/interrupt: {}", e)) 1205 } 1206 1207 /// Send `thread/compact/start`. 1208 async fn send_thread_compact<W: AsyncWrite + Unpin>( 1209 writer: &mut tokio::io::BufWriter<W>, 1210 req_id: u64, 1211 thread_id: &str, 1212 ) -> Result<(), String> { 1213 let req = RpcRequest { 1214 id: Some(req_id), 1215 method: "thread/compact/start", 1216 params: ThreadCompactParams { 1217 thread_id: thread_id.to_string(), 1218 }, 1219 }; 1220 1221 send_request(writer, &req) 1222 .await 1223 .map_err(|e| format!("Failed to send thread/compact/start: {}", e)) 1224 } 1225 1226 /// Read lines until we find a response matching the given request id. 1227 /// Non-matching messages (notifications) are logged and skipped. 1228 async fn read_response_for_id<R: AsyncBufRead + Unpin>( 1229 reader: &mut tokio::io::Lines<R>, 1230 expected_id: u64, 1231 ) -> Result<RpcMessage, String> { 1232 loop { 1233 let line = reader 1234 .next_line() 1235 .await 1236 .map_err(|e| format!("IO error: {}", e))? 1237 .ok_or_else(|| "EOF while waiting for response".to_string())?; 1238 1239 let msg: RpcMessage = serde_json::from_str(&line).map_err(|e| { 1240 format!( 1241 "JSON parse error: {} in: {}", 1242 e, 1243 &line[..line.len().min(200)] 1244 ) 1245 })?; 1246 1247 if msg.id == Some(expected_id) { 1248 return Ok(msg); 1249 } 1250 1251 tracing::trace!( 1252 "Skipping message during handshake (waiting for id={}): method={:?}", 1253 expected_id, 1254 msg.method 1255 ); 1256 } 1257 } 1258 1259 /// Drain pending commands, sending error to any Query commands. 1260 async fn drain_commands_with_error( 1261 command_rx: &mut tokio_mpsc::Receiver<SessionCommand>, 1262 error: &str, 1263 ) { 1264 while let Some(cmd) = command_rx.recv().await { 1265 match &cmd { 1266 SessionCommand::Query { response_tx, .. } 1267 | SessionCommand::Compact { response_tx, .. } => { 1268 let _ = response_tx.send(DaveApiResponse::Failed(error.to_string())); 1269 } 1270 _ => {} 1271 } 1272 if matches!(cmd, SessionCommand::Shutdown) { 1273 break; 1274 } 1275 } 1276 } 1277 1278 // --------------------------------------------------------------------------- 1279 // Public backend 1280 // --------------------------------------------------------------------------- 1281 1282 pub struct CodexBackend { 1283 codex_binary: String, 1284 sessions: DashMap<String, SessionHandle>, 1285 } 1286 1287 impl CodexBackend { 1288 pub fn new(codex_binary: String) -> Self { 1289 Self { 1290 codex_binary, 1291 sessions: DashMap::new(), 1292 } 1293 } 1294 } 1295 1296 impl AiBackend for CodexBackend { 1297 fn stream_request( 1298 &self, 1299 messages: Vec<Message>, 1300 _tools: Arc<HashMap<String, Tool>>, 1301 model: String, 1302 _user_id: String, 1303 session_id: String, 1304 cwd: Option<PathBuf>, 1305 resume_session_id: Option<String>, 1306 ctx: egui::Context, 1307 ) -> ( 1308 mpsc::Receiver<DaveApiResponse>, 1309 Option<tokio::task::JoinHandle<()>>, 1310 ) { 1311 let (response_tx, response_rx) = mpsc::channel(); 1312 1313 let prompt = shared::prepare_prompt(&messages, &resume_session_id); 1314 1315 tracing::debug!( 1316 "Codex request: session={}, resumed={}, prompt_len={}", 1317 session_id, 1318 resume_session_id.is_some(), 1319 prompt.len(), 1320 ); 1321 1322 let command_tx = { 1323 let entry = self.sessions.entry(session_id.clone()); 1324 let codex_binary = self.codex_binary.clone(); 1325 let model_clone = model.clone(); 1326 let cwd_clone = cwd.clone(); 1327 let resume_clone = resume_session_id.clone(); 1328 let handle = entry.or_insert_with(|| { 1329 let (command_tx, command_rx) = tokio_mpsc::channel(16); 1330 let sid = session_id.clone(); 1331 tokio::spawn(async move { 1332 session_actor( 1333 sid, 1334 cwd_clone, 1335 codex_binary, 1336 Some(model_clone), 1337 resume_clone, 1338 command_rx, 1339 ) 1340 .await; 1341 }); 1342 SessionHandle { command_tx } 1343 }); 1344 handle.command_tx.clone() 1345 }; 1346 1347 let handle = tokio::spawn(async move { 1348 if let Err(err) = command_tx 1349 .send(SessionCommand::Query { 1350 prompt, 1351 response_tx, 1352 ctx, 1353 }) 1354 .await 1355 { 1356 tracing::error!("Failed to send query to codex session actor: {}", err); 1357 } 1358 }); 1359 1360 (response_rx, Some(handle)) 1361 } 1362 1363 fn cleanup_session(&self, session_id: String) { 1364 if let Some((_, handle)) = self.sessions.remove(&session_id) { 1365 tokio::spawn(async move { 1366 if let Err(err) = handle.command_tx.send(SessionCommand::Shutdown).await { 1367 tracing::warn!("Failed to send shutdown to codex session: {}", err); 1368 } 1369 }); 1370 } 1371 } 1372 1373 fn interrupt_session(&self, session_id: String, ctx: egui::Context) { 1374 if let Some(handle) = self.sessions.get(&session_id) { 1375 let command_tx = handle.command_tx.clone(); 1376 tokio::spawn(async move { 1377 if let Err(err) = command_tx.send(SessionCommand::Interrupt { ctx }).await { 1378 tracing::warn!("Failed to send interrupt to codex session: {}", err); 1379 } 1380 }); 1381 } 1382 } 1383 1384 fn set_permission_mode(&self, session_id: String, mode: PermissionMode, ctx: egui::Context) { 1385 if let Some(handle) = self.sessions.get(&session_id) { 1386 let command_tx = handle.command_tx.clone(); 1387 tokio::spawn(async move { 1388 if let Err(err) = command_tx 1389 .send(SessionCommand::SetPermissionMode { mode, ctx }) 1390 .await 1391 { 1392 tracing::warn!( 1393 "Failed to send set_permission_mode to codex session: {}", 1394 err 1395 ); 1396 } 1397 }); 1398 } 1399 } 1400 1401 fn compact_session( 1402 &self, 1403 session_id: String, 1404 ctx: egui::Context, 1405 ) -> Option<mpsc::Receiver<DaveApiResponse>> { 1406 let handle = self.sessions.get(&session_id)?; 1407 let command_tx = handle.command_tx.clone(); 1408 let (response_tx, response_rx) = mpsc::channel(); 1409 tokio::spawn(async move { 1410 if let Err(err) = command_tx 1411 .send(SessionCommand::Compact { response_tx, ctx }) 1412 .await 1413 { 1414 tracing::warn!("Failed to send compact to codex session: {}", err); 1415 } 1416 }); 1417 Some(response_rx) 1418 } 1419 } 1420 1421 #[cfg(test)] 1422 mod tests { 1423 use super::*; 1424 use crate::messages::{AssistantMessage, DaveApiResponse}; 1425 use serde_json::json; 1426 use std::time::Duration; 1427 1428 /// Helper: build an RpcMessage from a method and params JSON 1429 fn notification(method: &str, params: Value) -> RpcMessage { 1430 RpcMessage { 1431 id: None, 1432 method: Some(method.to_string()), 1433 result: None, 1434 error: None, 1435 params: Some(params), 1436 } 1437 } 1438 1439 /// Helper: build an RpcMessage that is a server→client request (has id) 1440 fn server_request(id: u64, method: &str, params: Value) -> RpcMessage { 1441 RpcMessage { 1442 id: Some(id), 1443 method: Some(method.to_string()), 1444 result: None, 1445 error: None, 1446 params: Some(params), 1447 } 1448 } 1449 1450 // ----------------------------------------------------------------------- 1451 // Protocol serde tests 1452 // ----------------------------------------------------------------------- 1453 1454 #[test] 1455 fn test_rpc_request_serialization() { 1456 let req = RpcRequest { 1457 id: Some(1), 1458 method: "initialize", 1459 params: InitializeParams { 1460 client_info: ClientInfo { 1461 name: "dave".to_string(), 1462 version: "0.1.0".to_string(), 1463 }, 1464 capabilities: json!({}), 1465 }, 1466 }; 1467 let json = serde_json::to_string(&req).unwrap(); 1468 assert!(json.contains("\"id\":1")); 1469 assert!(json.contains("\"method\":\"initialize\"")); 1470 assert!(json.contains("\"clientInfo\"")); 1471 } 1472 1473 #[test] 1474 fn test_rpc_request_notification_omits_id() { 1475 let req: RpcRequest<Value> = RpcRequest { 1476 id: None, 1477 method: "initialized", 1478 params: json!({}), 1479 }; 1480 let json = serde_json::to_string(&req).unwrap(); 1481 assert!(!json.contains("\"id\"")); 1482 } 1483 1484 #[test] 1485 fn test_rpc_message_deserialization_response() { 1486 let json = r#"{"id":1,"result":{"serverInfo":{"name":"codex"}}}"#; 1487 let msg: RpcMessage = serde_json::from_str(json).unwrap(); 1488 assert_eq!(msg.id, Some(1)); 1489 assert!(msg.result.is_some()); 1490 assert!(msg.method.is_none()); 1491 } 1492 1493 #[test] 1494 fn test_rpc_message_deserialization_notification() { 1495 let json = r#"{"method":"item/agentMessage/delta","params":{"delta":"hello"}}"#; 1496 let msg: RpcMessage = serde_json::from_str(json).unwrap(); 1497 assert!(msg.id.is_none()); 1498 assert_eq!(msg.method.as_deref(), Some("item/agentMessage/delta")); 1499 } 1500 1501 #[test] 1502 fn test_thread_start_result_deserialization() { 1503 let json = r#"{"thread":{"id":"thread_abc123"},"model":"gpt-5.2-codex"}"#; 1504 let result: ThreadStartResult = serde_json::from_str(json).unwrap(); 1505 assert_eq!(result.thread.id, "thread_abc123"); 1506 assert_eq!(result.model.as_deref(), Some("gpt-5.2-codex")); 1507 } 1508 1509 #[test] 1510 fn test_approval_response_serialization() { 1511 let resp = ApprovalResponse { 1512 decision: ApprovalDecision::Accept, 1513 }; 1514 let json = serde_json::to_string(&resp).unwrap(); 1515 assert!(json.contains("\"decision\":\"accept\"")); 1516 1517 let resp = ApprovalResponse { 1518 decision: ApprovalDecision::Decline, 1519 }; 1520 let json = serde_json::to_string(&resp).unwrap(); 1521 assert!(json.contains("\"decision\":\"decline\"")); 1522 } 1523 1524 #[test] 1525 fn test_turn_input_serialization() { 1526 let input = TurnInput::Text { 1527 text: "hello".to_string(), 1528 }; 1529 let json = serde_json::to_string(&input).unwrap(); 1530 assert!(json.contains("\"type\":\"text\"")); 1531 assert!(json.contains("\"text\":\"hello\"")); 1532 } 1533 1534 // ----------------------------------------------------------------------- 1535 // handle_codex_message tests 1536 // ----------------------------------------------------------------------- 1537 1538 #[test] 1539 fn test_handle_delta_sends_token() { 1540 let (tx, rx) = mpsc::channel(); 1541 let ctx = egui::Context::default(); 1542 let mut subagents = Vec::new(); 1543 1544 let msg = notification("item/agentMessage/delta", json!({ "delta": "Hello world" })); 1545 1546 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1547 assert!(matches!(result, HandleResult::Continue)); 1548 1549 let response = rx.try_recv().unwrap(); 1550 match response { 1551 DaveApiResponse::Token(t) => assert_eq!(t, "Hello world"), 1552 other => panic!("Expected Token, got {:?}", std::mem::discriminant(&other)), 1553 } 1554 } 1555 1556 #[test] 1557 fn test_handle_turn_completed_success() { 1558 let (tx, _rx) = mpsc::channel(); 1559 let ctx = egui::Context::default(); 1560 let mut subagents = Vec::new(); 1561 1562 let msg = notification("turn/completed", json!({ "status": "completed" })); 1563 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1564 assert!(matches!(result, HandleResult::TurnDone)); 1565 } 1566 1567 #[test] 1568 fn test_handle_turn_completed_failure_sends_error() { 1569 let (tx, rx) = mpsc::channel(); 1570 let ctx = egui::Context::default(); 1571 let mut subagents = Vec::new(); 1572 1573 let msg = notification( 1574 "turn/completed", 1575 json!({ "status": "failed", "error": "rate limit exceeded" }), 1576 ); 1577 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1578 assert!(matches!(result, HandleResult::TurnDone)); 1579 1580 let response = rx.try_recv().unwrap(); 1581 match response { 1582 DaveApiResponse::Failed(err) => assert_eq!(err, "rate limit exceeded"), 1583 other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)), 1584 } 1585 } 1586 1587 #[test] 1588 fn test_handle_response_message_ignored() { 1589 let (tx, rx) = mpsc::channel(); 1590 let ctx = egui::Context::default(); 1591 let mut subagents = Vec::new(); 1592 1593 // A response (has id, no method) — should be ignored 1594 let msg = RpcMessage { 1595 id: Some(42), 1596 method: None, 1597 result: Some(json!({})), 1598 error: None, 1599 params: None, 1600 }; 1601 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1602 assert!(matches!(result, HandleResult::Continue)); 1603 assert!(rx.try_recv().is_err()); // nothing sent 1604 } 1605 1606 #[test] 1607 fn test_handle_unknown_method_ignored() { 1608 let (tx, rx) = mpsc::channel(); 1609 let ctx = egui::Context::default(); 1610 let mut subagents = Vec::new(); 1611 1612 let msg = notification("some/future/event", json!({})); 1613 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1614 assert!(matches!(result, HandleResult::Continue)); 1615 assert!(rx.try_recv().is_err()); 1616 } 1617 1618 #[test] 1619 fn test_handle_codex_event_error_sends_failed() { 1620 let (tx, rx) = mpsc::channel(); 1621 let ctx = egui::Context::default(); 1622 let mut subagents = Vec::new(); 1623 1624 let msg = notification( 1625 "codex/event/error", 1626 json!({ "message": "context window exceeded" }), 1627 ); 1628 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1629 assert!(matches!(result, HandleResult::Continue)); 1630 1631 let response = rx.try_recv().unwrap(); 1632 match response { 1633 DaveApiResponse::Failed(err) => assert_eq!(err, "context window exceeded"), 1634 other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)), 1635 } 1636 } 1637 1638 #[test] 1639 fn test_handle_error_notification_sends_failed() { 1640 let (tx, rx) = mpsc::channel(); 1641 let ctx = egui::Context::default(); 1642 let mut subagents = Vec::new(); 1643 1644 let msg = notification("error", json!({ "message": "something broke" })); 1645 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1646 assert!(matches!(result, HandleResult::Continue)); 1647 1648 let response = rx.try_recv().unwrap(); 1649 match response { 1650 DaveApiResponse::Failed(err) => assert_eq!(err, "something broke"), 1651 other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)), 1652 } 1653 } 1654 1655 #[test] 1656 fn test_handle_error_without_message_uses_default() { 1657 let (tx, rx) = mpsc::channel(); 1658 let ctx = egui::Context::default(); 1659 let mut subagents = Vec::new(); 1660 1661 let msg = notification("codex/event/error", json!({})); 1662 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1663 assert!(matches!(result, HandleResult::Continue)); 1664 1665 let response = rx.try_recv().unwrap(); 1666 match response { 1667 DaveApiResponse::Failed(err) => assert_eq!(err, "Codex error (no details)"), 1668 other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)), 1669 } 1670 } 1671 1672 #[test] 1673 fn test_handle_error_nested_msg_message() { 1674 let (tx, rx) = mpsc::channel(); 1675 let ctx = egui::Context::default(); 1676 let mut subagents = Vec::new(); 1677 1678 // Codex shape: params.msg.message is JSON with a "detail" field 1679 let msg = notification( 1680 "codex/event/error", 1681 json!({ 1682 "id": "1", 1683 "msg": { 1684 "type": "error", 1685 "message": "{\"detail\":\"The model is not supported.\"}", 1686 "codex_error_info": "other" 1687 }, 1688 "conversationId": "thread-1" 1689 }), 1690 ); 1691 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1692 assert!(matches!(result, HandleResult::Continue)); 1693 1694 let response = rx.try_recv().unwrap(); 1695 match response { 1696 DaveApiResponse::Failed(err) => assert_eq!(err, "The model is not supported."), 1697 other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)), 1698 } 1699 } 1700 1701 #[test] 1702 fn test_handle_error_nested_error_object() { 1703 let (tx, rx) = mpsc::channel(); 1704 let ctx = egui::Context::default(); 1705 let mut subagents = Vec::new(); 1706 1707 // Codex shape: params.error is an object with a "message" field 1708 let msg = notification( 1709 "error", 1710 json!({ 1711 "error": { 1712 "message": "{\"detail\":\"Rate limit exceeded.\"}", 1713 "codexErrorInfo": "other", 1714 "additionalDetails": null 1715 }, 1716 "willRetry": false, 1717 "threadId": "thread-1", 1718 "turnId": "1" 1719 }), 1720 ); 1721 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1722 assert!(matches!(result, HandleResult::Continue)); 1723 1724 let response = rx.try_recv().unwrap(); 1725 match response { 1726 DaveApiResponse::Failed(err) => assert_eq!(err, "Rate limit exceeded."), 1727 other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)), 1728 } 1729 } 1730 1731 #[test] 1732 fn test_handle_subagent_started() { 1733 let (tx, rx) = mpsc::channel(); 1734 let ctx = egui::Context::default(); 1735 let mut subagents = Vec::new(); 1736 1737 let msg = notification( 1738 "item/started", 1739 json!({ 1740 "type": "collabAgentToolCall", 1741 "itemId": "agent-1", 1742 "name": "research agent" 1743 }), 1744 ); 1745 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1746 assert!(matches!(result, HandleResult::Continue)); 1747 assert_eq!(subagents.len(), 1); 1748 assert_eq!(subagents[0], "agent-1"); 1749 1750 let response = rx.try_recv().unwrap(); 1751 match response { 1752 DaveApiResponse::SubagentSpawned(info) => { 1753 assert_eq!(info.task_id, "agent-1"); 1754 assert_eq!(info.description, "research agent"); 1755 } 1756 other => panic!( 1757 "Expected SubagentSpawned, got {:?}", 1758 std::mem::discriminant(&other) 1759 ), 1760 } 1761 } 1762 1763 #[test] 1764 fn test_handle_command_approval_auto_accept() { 1765 let (tx, rx) = mpsc::channel(); 1766 let ctx = egui::Context::default(); 1767 let mut subagents = Vec::new(); 1768 1769 // "git status" should be auto-accepted by default rules 1770 let msg = server_request( 1771 99, 1772 "item/commandExecution/requestApproval", 1773 json!({ "command": "git status" }), 1774 ); 1775 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1776 match result { 1777 HandleResult::AutoAccepted(id) => assert_eq!(id, 99), 1778 other => panic!( 1779 "Expected AutoAccepted, got {:?}", 1780 std::mem::discriminant(&other) 1781 ), 1782 } 1783 // No permission request sent to UI 1784 assert!(rx.try_recv().is_err()); 1785 } 1786 1787 #[test] 1788 fn test_handle_command_approval_needs_ui() { 1789 let (tx, rx) = mpsc::channel(); 1790 let ctx = egui::Context::default(); 1791 let mut subagents = Vec::new(); 1792 1793 // "rm -rf /" should NOT be auto-accepted 1794 let msg = server_request( 1795 100, 1796 "item/commandExecution/requestApproval", 1797 json!({ "command": "rm -rf /" }), 1798 ); 1799 let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0); 1800 match result { 1801 HandleResult::NeedsApproval { rpc_id, .. } => assert_eq!(rpc_id, 100), 1802 other => panic!( 1803 "Expected NeedsApproval, got {:?}", 1804 std::mem::discriminant(&other) 1805 ), 1806 } 1807 1808 // Permission request should have been sent to UI 1809 let response = rx.try_recv().unwrap(); 1810 assert!(matches!(response, DaveApiResponse::PermissionRequest(_))); 1811 } 1812 1813 // ----------------------------------------------------------------------- 1814 // handle_item_completed tests 1815 // ----------------------------------------------------------------------- 1816 1817 #[test] 1818 fn test_item_completed_command_execution() { 1819 let (tx, rx) = mpsc::channel(); 1820 let ctx = egui::Context::default(); 1821 let mut subagents = Vec::new(); 1822 1823 let completed = ItemCompletedParams { 1824 item_type: "commandExecution".to_string(), 1825 item_id: None, 1826 command: Some("ls -la".to_string()), 1827 exit_code: Some(0), 1828 output: Some("total 42\n".to_string()), 1829 file_path: None, 1830 diff: None, 1831 kind: None, 1832 result: None, 1833 pre_tokens: None, 1834 content: None, 1835 }; 1836 1837 handle_item_completed(&completed, &tx, &ctx, &mut subagents); 1838 1839 let response = rx.try_recv().unwrap(); 1840 match response { 1841 DaveApiResponse::ToolResult(tool) => { 1842 assert_eq!(tool.tool_name, "Bash"); 1843 assert!(tool.parent_task_id.is_none()); 1844 } 1845 other => panic!( 1846 "Expected ToolResult, got {:?}", 1847 std::mem::discriminant(&other) 1848 ), 1849 } 1850 } 1851 1852 #[test] 1853 fn test_item_completed_file_change_edit() { 1854 let (tx, rx) = mpsc::channel(); 1855 let ctx = egui::Context::default(); 1856 let mut subagents = Vec::new(); 1857 1858 let completed = ItemCompletedParams { 1859 item_type: "fileChange".to_string(), 1860 item_id: None, 1861 command: None, 1862 exit_code: None, 1863 output: None, 1864 file_path: Some("src/main.rs".to_string()), 1865 diff: Some("@@ -1,3 +1,3 @@\n-old\n+new\n context\n".to_string()), 1866 kind: Some(json!({"type": "edit"})), 1867 result: None, 1868 pre_tokens: None, 1869 content: None, 1870 }; 1871 1872 handle_item_completed(&completed, &tx, &ctx, &mut subagents); 1873 1874 let response = rx.try_recv().unwrap(); 1875 match response { 1876 DaveApiResponse::ToolResult(tool) => { 1877 assert_eq!(tool.tool_name, "Edit"); 1878 } 1879 other => panic!( 1880 "Expected ToolResult, got {:?}", 1881 std::mem::discriminant(&other) 1882 ), 1883 } 1884 } 1885 1886 #[test] 1887 fn test_item_completed_file_change_create() { 1888 let (tx, rx) = mpsc::channel(); 1889 let ctx = egui::Context::default(); 1890 let mut subagents = Vec::new(); 1891 1892 let completed = ItemCompletedParams { 1893 item_type: "fileChange".to_string(), 1894 item_id: None, 1895 command: None, 1896 exit_code: None, 1897 output: None, 1898 file_path: Some("new_file.rs".to_string()), 1899 diff: None, 1900 kind: Some(json!({"type": "create"})), 1901 result: None, 1902 pre_tokens: None, 1903 content: None, 1904 }; 1905 1906 handle_item_completed(&completed, &tx, &ctx, &mut subagents); 1907 1908 let response = rx.try_recv().unwrap(); 1909 match response { 1910 DaveApiResponse::ToolResult(tool) => { 1911 assert_eq!(tool.tool_name, "Write"); 1912 } 1913 other => panic!( 1914 "Expected ToolResult, got {:?}", 1915 std::mem::discriminant(&other) 1916 ), 1917 } 1918 } 1919 1920 #[test] 1921 fn test_item_completed_subagent() { 1922 let (tx, rx) = mpsc::channel(); 1923 let ctx = egui::Context::default(); 1924 let mut subagents = vec!["agent-1".to_string()]; 1925 1926 let completed = ItemCompletedParams { 1927 item_type: "collabAgentToolCall".to_string(), 1928 item_id: Some("agent-1".to_string()), 1929 command: None, 1930 exit_code: None, 1931 output: None, 1932 file_path: None, 1933 diff: None, 1934 kind: None, 1935 result: Some("Found 3 relevant files".to_string()), 1936 pre_tokens: None, 1937 content: None, 1938 }; 1939 1940 handle_item_completed(&completed, &tx, &ctx, &mut subagents); 1941 1942 // Subagent removed from stack 1943 assert!(subagents.is_empty()); 1944 1945 let response = rx.try_recv().unwrap(); 1946 match response { 1947 DaveApiResponse::SubagentCompleted { task_id, result } => { 1948 assert_eq!(task_id, "agent-1"); 1949 assert_eq!(result, "Found 3 relevant files"); 1950 } 1951 other => panic!( 1952 "Expected SubagentCompleted, got {:?}", 1953 std::mem::discriminant(&other) 1954 ), 1955 } 1956 } 1957 1958 #[test] 1959 fn test_item_completed_compaction() { 1960 let (tx, rx) = mpsc::channel(); 1961 let ctx = egui::Context::default(); 1962 let mut subagents = Vec::new(); 1963 1964 let completed = ItemCompletedParams { 1965 item_type: "contextCompaction".to_string(), 1966 item_id: None, 1967 command: None, 1968 exit_code: None, 1969 output: None, 1970 file_path: None, 1971 diff: None, 1972 kind: None, 1973 result: None, 1974 pre_tokens: Some(50000), 1975 content: None, 1976 }; 1977 1978 handle_item_completed(&completed, &tx, &ctx, &mut subagents); 1979 1980 let response = rx.try_recv().unwrap(); 1981 match response { 1982 DaveApiResponse::CompactionComplete(info) => { 1983 assert_eq!(info.pre_tokens, 50000); 1984 } 1985 other => panic!( 1986 "Expected CompactionComplete, got {:?}", 1987 std::mem::discriminant(&other) 1988 ), 1989 } 1990 } 1991 1992 #[test] 1993 fn test_item_completed_with_parent_subagent() { 1994 let (tx, rx) = mpsc::channel(); 1995 let ctx = egui::Context::default(); 1996 let mut subagents = vec!["parent-agent".to_string()]; 1997 1998 let completed = ItemCompletedParams { 1999 item_type: "commandExecution".to_string(), 2000 item_id: None, 2001 command: Some("cargo test".to_string()), 2002 exit_code: Some(0), 2003 output: Some("ok".to_string()), 2004 file_path: None, 2005 diff: None, 2006 kind: None, 2007 result: None, 2008 pre_tokens: None, 2009 content: None, 2010 }; 2011 2012 handle_item_completed(&completed, &tx, &ctx, &mut subagents); 2013 2014 let response = rx.try_recv().unwrap(); 2015 match response { 2016 DaveApiResponse::ToolResult(tool) => { 2017 assert_eq!(tool.tool_name, "Bash"); 2018 assert_eq!(tool.parent_task_id.as_deref(), Some("parent-agent")); 2019 } 2020 other => panic!( 2021 "Expected ToolResult, got {:?}", 2022 std::mem::discriminant(&other) 2023 ), 2024 } 2025 } 2026 2027 // ----------------------------------------------------------------------- 2028 // check_approval_or_forward tests 2029 // ----------------------------------------------------------------------- 2030 2031 #[test] 2032 fn test_approval_auto_accept_read_tool() { 2033 let (tx, rx) = mpsc::channel(); 2034 let ctx = egui::Context::default(); 2035 2036 // Glob/Grep/Read are always auto-accepted 2037 let result = check_approval_or_forward(1, "Glob", json!({"pattern": "*.rs"}), &tx, &ctx); 2038 assert!(matches!(result, HandleResult::AutoAccepted(1))); 2039 assert!(rx.try_recv().is_err()); // no UI request 2040 } 2041 2042 #[test] 2043 fn test_approval_forwards_dangerous_command() { 2044 let (tx, rx) = mpsc::channel(); 2045 let ctx = egui::Context::default(); 2046 2047 let result = 2048 check_approval_or_forward(42, "Bash", json!({"command": "sudo rm -rf /"}), &tx, &ctx); 2049 match result { 2050 HandleResult::NeedsApproval { rpc_id, .. } => assert_eq!(rpc_id, 42), 2051 other => panic!( 2052 "Expected NeedsApproval, got {:?}", 2053 std::mem::discriminant(&other) 2054 ), 2055 } 2056 2057 // Permission request sent to UI 2058 let response = rx.try_recv().unwrap(); 2059 match response { 2060 DaveApiResponse::PermissionRequest(pending) => { 2061 assert_eq!(pending.request.tool_name, "Bash"); 2062 } 2063 other => panic!( 2064 "Expected PermissionRequest, got {:?}", 2065 std::mem::discriminant(&other) 2066 ), 2067 } 2068 } 2069 2070 // ----------------------------------------------------------------------- 2071 // get_pending_user_messages tests 2072 // ----------------------------------------------------------------------- 2073 2074 #[test] 2075 fn pending_messages_single_user() { 2076 let messages = vec![Message::User("hello".into())]; 2077 assert_eq!(shared::get_pending_user_messages(&messages), "hello"); 2078 } 2079 2080 #[test] 2081 fn pending_messages_multiple_trailing_users() { 2082 let messages = vec![ 2083 Message::User("first".into()), 2084 Message::Assistant(AssistantMessage::from_text("reply".into())), 2085 Message::User("second".into()), 2086 Message::User("third".into()), 2087 Message::User("fourth".into()), 2088 ]; 2089 assert_eq!( 2090 shared::get_pending_user_messages(&messages), 2091 "second\nthird\nfourth" 2092 ); 2093 } 2094 2095 #[test] 2096 fn pending_messages_stops_at_non_user() { 2097 let messages = vec![ 2098 Message::User("old".into()), 2099 Message::User("also old".into()), 2100 Message::Assistant(AssistantMessage::from_text("reply".into())), 2101 Message::User("pending".into()), 2102 ]; 2103 assert_eq!(shared::get_pending_user_messages(&messages), "pending"); 2104 } 2105 2106 #[test] 2107 fn pending_messages_empty_when_last_is_assistant() { 2108 let messages = vec![ 2109 Message::User("hello".into()), 2110 Message::Assistant(AssistantMessage::from_text("reply".into())), 2111 ]; 2112 assert_eq!(shared::get_pending_user_messages(&messages), ""); 2113 } 2114 2115 #[test] 2116 fn pending_messages_empty_chat() { 2117 let messages: Vec<Message> = vec![]; 2118 assert_eq!(shared::get_pending_user_messages(&messages), ""); 2119 } 2120 2121 #[test] 2122 fn pending_messages_stops_at_tool_response() { 2123 let messages = vec![ 2124 Message::User("do something".into()), 2125 Message::Assistant(AssistantMessage::from_text("ok".into())), 2126 Message::ToolCalls(vec![crate::tools::ToolCall::invalid( 2127 "c1".into(), 2128 Some("Read".into()), 2129 None, 2130 "test".into(), 2131 )]), 2132 Message::ToolResponse(crate::tools::ToolResponse::error( 2133 "c1".into(), 2134 "result".into(), 2135 )), 2136 Message::User("queued 1".into()), 2137 Message::User("queued 2".into()), 2138 ]; 2139 assert_eq!( 2140 shared::get_pending_user_messages(&messages), 2141 "queued 1\nqueued 2" 2142 ); 2143 } 2144 2145 #[test] 2146 fn pending_messages_preserves_order() { 2147 let messages = vec![ 2148 Message::User("a".into()), 2149 Message::User("b".into()), 2150 Message::User("c".into()), 2151 ]; 2152 assert_eq!(shared::get_pending_user_messages(&messages), "a\nb\nc"); 2153 } 2154 2155 // ----------------------------------------------------------------------- 2156 // Integration tests — mock Codex server over duplex streams 2157 // ----------------------------------------------------------------------- 2158 2159 /// Mock Codex server that speaks JSONL over duplex streams. 2160 struct MockCodex { 2161 /// Read what the actor writes (actor's "stdin" from mock's perspective). 2162 reader: tokio::io::Lines<BufReader<tokio::io::DuplexStream>>, 2163 /// Write what the actor reads (actor's "stdout" from mock's perspective). 2164 writer: tokio::io::BufWriter<tokio::io::DuplexStream>, 2165 } 2166 2167 impl MockCodex { 2168 /// Read one JSONL message sent by the actor. 2169 async fn read_message(&mut self) -> RpcMessage { 2170 let line = self.reader.next_line().await.unwrap().unwrap(); 2171 serde_json::from_str(&line).unwrap() 2172 } 2173 2174 /// Send a JSONL message to the actor. 2175 async fn send_line(&mut self, value: &Value) { 2176 let mut line = serde_json::to_string(value).unwrap(); 2177 line.push('\n'); 2178 self.writer.write_all(line.as_bytes()).await.unwrap(); 2179 self.writer.flush().await.unwrap(); 2180 } 2181 2182 /// Handle the `initialize` → `initialized` handshake. 2183 async fn handle_init(&mut self) { 2184 let req = self.read_message().await; 2185 assert_eq!(req.method.as_deref(), Some("initialize")); 2186 let id = req.id.unwrap(); 2187 self.send_line(&json!({ 2188 "id": id, 2189 "result": { "serverInfo": { "name": "mock-codex", "version": "0.0.0" } } 2190 })) 2191 .await; 2192 let notif = self.read_message().await; 2193 assert_eq!(notif.method.as_deref(), Some("initialized")); 2194 } 2195 2196 /// Handle `thread/start` and return the thread ID. 2197 async fn handle_thread_start(&mut self) -> String { 2198 let req = self.read_message().await; 2199 assert_eq!(req.method.as_deref(), Some("thread/start")); 2200 let id = req.id.unwrap(); 2201 let thread_id = "mock-thread-1"; 2202 self.send_line(&json!({ 2203 "id": id, 2204 "result": { "thread": { "id": thread_id }, "model": "mock-model" } 2205 })) 2206 .await; 2207 thread_id.to_string() 2208 } 2209 2210 /// Handle `turn/start` and return the turn ID. 2211 async fn handle_turn_start(&mut self) -> String { 2212 let req = self.read_message().await; 2213 assert_eq!(req.method.as_deref(), Some("turn/start")); 2214 let id = req.id.unwrap(); 2215 let turn_id = "mock-turn-1"; 2216 self.send_line(&json!({ 2217 "id": id, 2218 "result": { "turn": { "id": turn_id } } 2219 })) 2220 .await; 2221 turn_id.to_string() 2222 } 2223 2224 /// Send an `item/agentMessage/delta` notification. 2225 async fn send_delta(&mut self, text: &str) { 2226 self.send_line(&json!({ 2227 "method": "item/agentMessage/delta", 2228 "params": { "delta": text } 2229 })) 2230 .await; 2231 } 2232 2233 /// Send a `turn/completed` notification. 2234 async fn send_turn_completed(&mut self, status: &str) { 2235 self.send_line(&json!({ 2236 "method": "turn/completed", 2237 "params": { "status": status } 2238 })) 2239 .await; 2240 } 2241 2242 /// Send an `item/completed` notification. 2243 async fn send_item_completed(&mut self, params: Value) { 2244 self.send_line(&json!({ 2245 "method": "item/completed", 2246 "params": params 2247 })) 2248 .await; 2249 } 2250 2251 /// Send an `item/started` notification. 2252 async fn send_item_started(&mut self, params: Value) { 2253 self.send_line(&json!({ 2254 "method": "item/started", 2255 "params": params 2256 })) 2257 .await; 2258 } 2259 2260 /// Send an approval request (server→client request with id). 2261 async fn send_approval_request(&mut self, rpc_id: u64, method: &str, params: Value) { 2262 self.send_line(&json!({ 2263 "id": rpc_id, 2264 "method": method, 2265 "params": params 2266 })) 2267 .await; 2268 } 2269 } 2270 2271 /// Create a mock codex server and spawn the session actor loop. 2272 /// Returns the mock, a command sender, and the actor task handle. 2273 fn setup_integration_test() -> ( 2274 MockCodex, 2275 tokio_mpsc::Sender<SessionCommand>, 2276 tokio::task::JoinHandle<()>, 2277 ) { 2278 // "stdout" channel: mock writes → actor reads 2279 let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192); 2280 // "stdin" channel: actor writes → mock reads 2281 let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192); 2282 2283 let mock = MockCodex { 2284 reader: BufReader::new(mock_stdin_read).lines(), 2285 writer: tokio::io::BufWriter::new(mock_stdout_write), 2286 }; 2287 2288 let actor_writer = tokio::io::BufWriter::new(actor_stdin_write); 2289 let actor_reader = BufReader::new(actor_stdout_read).lines(); 2290 2291 let (command_tx, mut command_rx) = tokio_mpsc::channel(16); 2292 2293 let handle = tokio::spawn(async move { 2294 session_actor_loop( 2295 "test-session", 2296 actor_writer, 2297 actor_reader, 2298 Some("mock-model"), 2299 None, 2300 None, 2301 &mut command_rx, 2302 ) 2303 .await; 2304 }); 2305 2306 (mock, command_tx, handle) 2307 } 2308 2309 /// Send a Query command and return the response receiver. 2310 async fn send_query( 2311 command_tx: &tokio_mpsc::Sender<SessionCommand>, 2312 prompt: &str, 2313 ) -> mpsc::Receiver<DaveApiResponse> { 2314 let (response_tx, response_rx) = mpsc::channel(); 2315 command_tx 2316 .send(SessionCommand::Query { 2317 prompt: prompt.to_string(), 2318 response_tx, 2319 ctx: egui::Context::default(), 2320 }) 2321 .await 2322 .unwrap(); 2323 response_rx 2324 } 2325 2326 /// Collect all responses from the channel. 2327 fn collect_responses(rx: &mpsc::Receiver<DaveApiResponse>) -> Vec<DaveApiResponse> { 2328 rx.try_iter().collect() 2329 } 2330 2331 /// Drain and discard the initial SessionInfo that the first query emits. 2332 fn drain_session_info(rx: &mpsc::Receiver<DaveApiResponse>) { 2333 let resp = rx 2334 .recv_timeout(Duration::from_secs(5)) 2335 .expect("timed out waiting for SessionInfo"); 2336 assert!( 2337 matches!(resp, DaveApiResponse::SessionInfo(_)), 2338 "Expected SessionInfo as first response, got {:?}", 2339 std::mem::discriminant(&resp) 2340 ); 2341 } 2342 2343 // -- Integration tests -- 2344 2345 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2346 async fn test_integration_streaming_tokens() { 2347 let (mut mock, command_tx, handle) = setup_integration_test(); 2348 2349 mock.handle_init().await; 2350 mock.handle_thread_start().await; 2351 2352 let response_rx = send_query(&command_tx, "Hello").await; 2353 mock.handle_turn_start().await; 2354 2355 mock.send_delta("Hello").await; 2356 mock.send_delta(" world").await; 2357 mock.send_delta("!").await; 2358 mock.send_turn_completed("completed").await; 2359 2360 // Drop sender — actor finishes processing remaining lines, 2361 // then command_rx.recv() returns None and the loop exits. 2362 drop(command_tx); 2363 handle.await.unwrap(); 2364 2365 let tokens: Vec<String> = collect_responses(&response_rx) 2366 .into_iter() 2367 .filter_map(|r| match r { 2368 DaveApiResponse::Token(t) => Some(t), 2369 _ => None, 2370 }) 2371 .collect(); 2372 assert_eq!(tokens, vec!["Hello", " world", "!"]); 2373 } 2374 2375 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2376 async fn test_integration_command_execution() { 2377 let (mut mock, command_tx, handle) = setup_integration_test(); 2378 2379 mock.handle_init().await; 2380 mock.handle_thread_start().await; 2381 2382 let response_rx = send_query(&command_tx, "list files").await; 2383 mock.handle_turn_start().await; 2384 2385 mock.send_item_completed(json!({ 2386 "type": "commandExecution", 2387 "command": "ls -la", 2388 "exitCode": 0, 2389 "output": "total 42\nfoo.rs\n" 2390 })) 2391 .await; 2392 mock.send_turn_completed("completed").await; 2393 2394 drop(command_tx); 2395 handle.await.unwrap(); 2396 2397 let tool_results: Vec<_> = collect_responses(&response_rx) 2398 .into_iter() 2399 .filter_map(|r| match r { 2400 DaveApiResponse::ToolResult(t) => Some(t), 2401 _ => None, 2402 }) 2403 .collect(); 2404 assert_eq!(tool_results.len(), 1); 2405 assert_eq!(tool_results[0].tool_name, "Bash"); 2406 } 2407 2408 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2409 async fn test_integration_file_change() { 2410 let (mut mock, command_tx, handle) = setup_integration_test(); 2411 2412 mock.handle_init().await; 2413 mock.handle_thread_start().await; 2414 2415 let response_rx = send_query(&command_tx, "edit file").await; 2416 mock.handle_turn_start().await; 2417 2418 mock.send_item_completed(json!({ 2419 "type": "fileChange", 2420 "filePath": "src/main.rs", 2421 "diff": "@@ -1,3 +1,3 @@\n-old\n+new\n context\n", 2422 "kind": { "type": "edit" } 2423 })) 2424 .await; 2425 mock.send_turn_completed("completed").await; 2426 2427 drop(command_tx); 2428 handle.await.unwrap(); 2429 2430 let tool_results: Vec<_> = collect_responses(&response_rx) 2431 .into_iter() 2432 .filter_map(|r| match r { 2433 DaveApiResponse::ToolResult(t) => Some(t), 2434 _ => None, 2435 }) 2436 .collect(); 2437 assert_eq!(tool_results.len(), 1); 2438 assert_eq!(tool_results[0].tool_name, "Edit"); 2439 } 2440 2441 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2442 async fn test_integration_approval_accept() { 2443 let (mut mock, command_tx, handle) = setup_integration_test(); 2444 2445 mock.handle_init().await; 2446 mock.handle_thread_start().await; 2447 2448 let response_rx = send_query(&command_tx, "delete stuff").await; 2449 mock.handle_turn_start().await; 2450 drain_session_info(&response_rx); 2451 2452 // Send a command that won't be auto-accepted 2453 mock.send_approval_request( 2454 42, 2455 "item/commandExecution/requestApproval", 2456 json!({ "command": "rm -rf /tmp/important" }), 2457 ) 2458 .await; 2459 2460 // Actor should forward a PermissionRequest 2461 let resp = response_rx 2462 .recv_timeout(Duration::from_secs(5)) 2463 .expect("timed out waiting for PermissionRequest"); 2464 let pending = match resp { 2465 DaveApiResponse::PermissionRequest(p) => p, 2466 other => panic!( 2467 "Expected PermissionRequest, got {:?}", 2468 std::mem::discriminant(&other) 2469 ), 2470 }; 2471 assert_eq!(pending.request.tool_name, "Bash"); 2472 2473 // Approve it 2474 pending 2475 .response_tx 2476 .send(PermissionResponse::Allow { message: None }) 2477 .unwrap(); 2478 2479 // Mock should receive the acceptance 2480 let approval_msg = mock.read_message().await; 2481 assert_eq!(approval_msg.id, Some(42)); 2482 let result = approval_msg.result.unwrap(); 2483 assert_eq!(result["decision"], "accept"); 2484 2485 mock.send_turn_completed("completed").await; 2486 drop(command_tx); 2487 handle.await.unwrap(); 2488 } 2489 2490 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2491 async fn test_integration_approval_deny() { 2492 let (mut mock, command_tx, handle) = setup_integration_test(); 2493 2494 mock.handle_init().await; 2495 mock.handle_thread_start().await; 2496 2497 let response_rx = send_query(&command_tx, "dangerous").await; 2498 mock.handle_turn_start().await; 2499 drain_session_info(&response_rx); 2500 2501 mock.send_approval_request( 2502 99, 2503 "item/commandExecution/requestApproval", 2504 json!({ "command": "sudo rm -rf /" }), 2505 ) 2506 .await; 2507 2508 let resp = response_rx 2509 .recv_timeout(Duration::from_secs(5)) 2510 .expect("timed out waiting for PermissionRequest"); 2511 let pending = match resp { 2512 DaveApiResponse::PermissionRequest(p) => p, 2513 _ => panic!("Expected PermissionRequest"), 2514 }; 2515 2516 // Deny it 2517 pending 2518 .response_tx 2519 .send(PermissionResponse::Deny { 2520 reason: "too dangerous".to_string(), 2521 }) 2522 .unwrap(); 2523 2524 // Mock should receive the decline 2525 let approval_msg = mock.read_message().await; 2526 assert_eq!(approval_msg.id, Some(99)); 2527 let result = approval_msg.result.unwrap(); 2528 assert_eq!(result["decision"], "decline"); 2529 2530 mock.send_turn_completed("completed").await; 2531 drop(command_tx); 2532 handle.await.unwrap(); 2533 } 2534 2535 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2536 async fn test_integration_auto_accept() { 2537 let (mut mock, command_tx, handle) = setup_integration_test(); 2538 2539 mock.handle_init().await; 2540 mock.handle_thread_start().await; 2541 2542 let response_rx = send_query(&command_tx, "check status").await; 2543 mock.handle_turn_start().await; 2544 2545 // "git status" should be auto-accepted 2546 mock.send_approval_request( 2547 50, 2548 "item/commandExecution/requestApproval", 2549 json!({ "command": "git status" }), 2550 ) 2551 .await; 2552 2553 // Mock should receive the auto-acceptance immediately (no UI involved) 2554 let approval_msg = mock.read_message().await; 2555 assert_eq!(approval_msg.id, Some(50)); 2556 let result = approval_msg.result.unwrap(); 2557 assert_eq!(result["decision"], "accept"); 2558 2559 // No PermissionRequest should have been sent 2560 // (the response_rx should be empty or only have non-permission items) 2561 mock.send_turn_completed("completed").await; 2562 2563 drop(command_tx); 2564 handle.await.unwrap(); 2565 2566 let permission_requests: Vec<_> = collect_responses(&response_rx) 2567 .into_iter() 2568 .filter(|r| matches!(r, DaveApiResponse::PermissionRequest(_))) 2569 .collect(); 2570 assert!( 2571 permission_requests.is_empty(), 2572 "Auto-accepted commands should not generate PermissionRequests" 2573 ); 2574 } 2575 2576 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2577 async fn test_integration_multiple_turns() { 2578 let (mut mock, command_tx, handle) = setup_integration_test(); 2579 2580 mock.handle_init().await; 2581 mock.handle_thread_start().await; 2582 2583 // First turn 2584 let rx1 = send_query(&command_tx, "first").await; 2585 mock.handle_turn_start().await; 2586 drain_session_info(&rx1); 2587 mock.send_delta("reply 1").await; 2588 mock.send_turn_completed("completed").await; 2589 2590 // Wait for the first turn's token to confirm the actor is processing 2591 let resp = rx1 2592 .recv_timeout(Duration::from_secs(5)) 2593 .expect("timed out waiting for first turn token"); 2594 assert!(matches!(resp, DaveApiResponse::Token(_))); 2595 2596 // Brief yield for turn_completed to be processed 2597 tokio::time::sleep(Duration::from_millis(100)).await; 2598 2599 // Second turn 2600 let rx2 = send_query(&command_tx, "second").await; 2601 mock.handle_turn_start().await; 2602 mock.send_delta("reply 2").await; 2603 mock.send_turn_completed("completed").await; 2604 2605 drop(command_tx); 2606 handle.await.unwrap(); 2607 2608 let tokens2: Vec<String> = collect_responses(&rx2) 2609 .into_iter() 2610 .filter_map(|r| match r { 2611 DaveApiResponse::Token(t) => Some(t), 2612 _ => None, 2613 }) 2614 .collect(); 2615 assert_eq!(tokens2, vec!["reply 2"]); 2616 } 2617 2618 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2619 async fn test_integration_subagent_lifecycle() { 2620 let (mut mock, command_tx, handle) = setup_integration_test(); 2621 2622 mock.handle_init().await; 2623 mock.handle_thread_start().await; 2624 2625 let response_rx = send_query(&command_tx, "research").await; 2626 mock.handle_turn_start().await; 2627 2628 // Subagent starts 2629 mock.send_item_started(json!({ 2630 "type": "collabAgentToolCall", 2631 "itemId": "agent-42", 2632 "name": "research agent" 2633 })) 2634 .await; 2635 2636 // Command inside subagent 2637 mock.send_item_completed(json!({ 2638 "type": "commandExecution", 2639 "command": "grep -r pattern .", 2640 "exitCode": 0, 2641 "output": "found 3 matches" 2642 })) 2643 .await; 2644 2645 // Subagent completes 2646 mock.send_item_completed(json!({ 2647 "type": "collabAgentToolCall", 2648 "itemId": "agent-42", 2649 "result": "Found relevant information" 2650 })) 2651 .await; 2652 2653 mock.send_turn_completed("completed").await; 2654 2655 drop(command_tx); 2656 handle.await.unwrap(); 2657 2658 let responses = collect_responses(&response_rx); 2659 2660 // Should have: SubagentSpawned, ToolResult (with parent), SubagentCompleted 2661 let spawned: Vec<_> = responses 2662 .iter() 2663 .filter(|r| matches!(r, DaveApiResponse::SubagentSpawned(_))) 2664 .collect(); 2665 assert_eq!(spawned.len(), 1); 2666 2667 let tool_results: Vec<_> = responses 2668 .iter() 2669 .filter_map(|r| match r { 2670 DaveApiResponse::ToolResult(t) => Some(t), 2671 _ => None, 2672 }) 2673 .collect(); 2674 assert_eq!(tool_results.len(), 1); 2675 assert_eq!(tool_results[0].parent_task_id.as_deref(), Some("agent-42")); 2676 2677 let completed: Vec<_> = responses 2678 .iter() 2679 .filter(|r| matches!(r, DaveApiResponse::SubagentCompleted { .. })) 2680 .collect(); 2681 assert_eq!(completed.len(), 1); 2682 } 2683 2684 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2685 async fn test_integration_shutdown_during_stream() { 2686 let (mut mock, command_tx, handle) = setup_integration_test(); 2687 2688 mock.handle_init().await; 2689 mock.handle_thread_start().await; 2690 2691 let response_rx = send_query(&command_tx, "long task").await; 2692 mock.handle_turn_start().await; 2693 drain_session_info(&response_rx); 2694 2695 mock.send_delta("partial").await; 2696 2697 // Wait for token to arrive before sending Shutdown 2698 let resp = response_rx 2699 .recv_timeout(Duration::from_secs(5)) 2700 .expect("timed out waiting for token"); 2701 assert!( 2702 matches!(&resp, DaveApiResponse::Token(t) if t == "partial"), 2703 "Expected Token(\"partial\")" 2704 ); 2705 2706 // Now shutdown while still inside the turn (no turn_completed sent) 2707 command_tx.send(SessionCommand::Shutdown).await.unwrap(); 2708 handle.await.unwrap(); 2709 } 2710 2711 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2712 async fn test_integration_process_eof() { 2713 let (mut mock, command_tx, handle) = setup_integration_test(); 2714 2715 mock.handle_init().await; 2716 mock.handle_thread_start().await; 2717 2718 let response_rx = send_query(&command_tx, "hello").await; 2719 mock.handle_turn_start().await; 2720 drain_session_info(&response_rx); 2721 2722 mock.send_delta("partial").await; 2723 2724 // Drop the mock's writer — simulates process exit 2725 drop(mock.writer); 2726 2727 // Actor should detect EOF and send a Failed response 2728 let failed = response_rx 2729 .recv_timeout(Duration::from_secs(5)) 2730 .expect("timed out waiting for response after EOF"); 2731 2732 // First response might be the token, keep reading 2733 let mut got_failed = false; 2734 2735 match failed { 2736 DaveApiResponse::Token(t) => { 2737 assert_eq!(t, "partial"); 2738 } 2739 DaveApiResponse::Failed(_) => got_failed = true, 2740 _ => {} 2741 } 2742 2743 if !got_failed { 2744 let resp = response_rx 2745 .recv_timeout(Duration::from_secs(5)) 2746 .expect("timed out waiting for Failed after EOF"); 2747 match resp { 2748 DaveApiResponse::Failed(msg) => { 2749 assert!( 2750 msg.contains("exited unexpectedly") || msg.contains("EOF"), 2751 "Unexpected error message: {}", 2752 msg 2753 ); 2754 } 2755 other => panic!( 2756 "Expected Failed after EOF, got {:?}", 2757 std::mem::discriminant(&other) 2758 ), 2759 } 2760 } 2761 2762 // Actor should exit after EOF 2763 command_tx.send(SessionCommand::Shutdown).await.ok(); // might fail if actor already exited 2764 handle.await.unwrap(); 2765 } 2766 2767 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2768 async fn test_integration_init_failure() { 2769 // "stdout" channel: mock writes → actor reads 2770 let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192); 2771 // "stdin" channel: actor writes → mock reads 2772 let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192); 2773 2774 let mut mock_reader = BufReader::new(mock_stdin_read).lines(); 2775 let mut mock_writer = tokio::io::BufWriter::new(mock_stdout_write); 2776 2777 let actor_writer = tokio::io::BufWriter::new(actor_stdin_write); 2778 let actor_reader = BufReader::new(actor_stdout_read).lines(); 2779 2780 let (command_tx, mut command_rx) = tokio_mpsc::channel(16); 2781 2782 let handle = tokio::spawn(async move { 2783 session_actor_loop( 2784 "test-session", 2785 actor_writer, 2786 actor_reader, 2787 Some("mock-model"), 2788 None, 2789 None, 2790 &mut command_rx, 2791 ) 2792 .await; 2793 }); 2794 2795 // Read the initialize request 2796 let line = mock_reader.next_line().await.unwrap().unwrap(); 2797 let req: RpcMessage = serde_json::from_str(&line).unwrap(); 2798 let id = req.id.unwrap(); 2799 2800 // Send an error response 2801 let error_resp = json!({ 2802 "id": id, 2803 "error": { "code": -1, "message": "mock init failure" } 2804 }); 2805 let mut error_line = serde_json::to_string(&error_resp).unwrap(); 2806 error_line.push('\n'); 2807 mock_writer.write_all(error_line.as_bytes()).await.unwrap(); 2808 mock_writer.flush().await.unwrap(); 2809 2810 // The actor should drain commands with error. Send a query and a shutdown. 2811 let (response_tx, response_rx) = mpsc::channel(); 2812 command_tx 2813 .send(SessionCommand::Query { 2814 prompt: "hello".to_string(), 2815 response_tx, 2816 ctx: egui::Context::default(), 2817 }) 2818 .await 2819 .unwrap(); 2820 command_tx.send(SessionCommand::Shutdown).await.unwrap(); 2821 2822 handle.await.unwrap(); 2823 2824 // The query should have received an error 2825 let resp = response_rx 2826 .recv_timeout(Duration::from_secs(5)) 2827 .expect("expected error response after init failure"); 2828 match resp { 2829 DaveApiResponse::Failed(msg) => { 2830 assert!( 2831 msg.contains("init handshake"), 2832 "Expected init handshake error, got: {}", 2833 msg 2834 ); 2835 } 2836 other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)), 2837 } 2838 } 2839 2840 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2841 async fn test_integration_turn_error() { 2842 let (mut mock, command_tx, handle) = setup_integration_test(); 2843 2844 mock.handle_init().await; 2845 mock.handle_thread_start().await; 2846 2847 let response_rx = send_query(&command_tx, "hello").await; 2848 2849 // Read turn/start request and send an error response 2850 let req = mock.read_message().await; 2851 assert_eq!(req.method.as_deref(), Some("turn/start")); 2852 let id = req.id.unwrap(); 2853 mock.send_line(&json!({ 2854 "id": id, 2855 "error": { "code": -32000, "message": "rate limit exceeded" } 2856 })) 2857 .await; 2858 2859 // Give actor time to process 2860 tokio::time::sleep(Duration::from_millis(100)).await; 2861 2862 command_tx.send(SessionCommand::Shutdown).await.unwrap(); 2863 handle.await.unwrap(); 2864 2865 let responses = collect_responses(&response_rx); 2866 let failures: Vec<_> = responses 2867 .iter() 2868 .filter_map(|r| match r { 2869 DaveApiResponse::Failed(msg) => Some(msg.clone()), 2870 _ => None, 2871 }) 2872 .collect(); 2873 assert_eq!(failures.len(), 1); 2874 assert_eq!(failures[0], "rate limit exceeded"); 2875 } 2876 2877 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2878 async fn test_integration_file_change_approval() { 2879 let (mut mock, command_tx, handle) = setup_integration_test(); 2880 2881 mock.handle_init().await; 2882 mock.handle_thread_start().await; 2883 2884 let response_rx = send_query(&command_tx, "create file").await; 2885 mock.handle_turn_start().await; 2886 drain_session_info(&response_rx); 2887 2888 // File change approval request (create) 2889 mock.send_approval_request( 2890 77, 2891 "item/fileChange/requestApproval", 2892 json!({ 2893 "filePath": "new_file.rs", 2894 "diff": "+fn main() {}", 2895 "kind": { "type": "create" } 2896 }), 2897 ) 2898 .await; 2899 2900 let resp = response_rx 2901 .recv_timeout(Duration::from_secs(5)) 2902 .expect("timed out waiting for PermissionRequest"); 2903 let pending = match resp { 2904 DaveApiResponse::PermissionRequest(p) => p, 2905 other => panic!( 2906 "Expected PermissionRequest, got {:?}", 2907 std::mem::discriminant(&other) 2908 ), 2909 }; 2910 // File create should map to "Write" tool 2911 assert_eq!(pending.request.tool_name, "Write"); 2912 2913 pending 2914 .response_tx 2915 .send(PermissionResponse::Allow { message: None }) 2916 .unwrap(); 2917 2918 let approval_msg = mock.read_message().await; 2919 assert_eq!(approval_msg.id, Some(77)); 2920 assert_eq!(approval_msg.result.unwrap()["decision"], "accept"); 2921 2922 mock.send_turn_completed("completed").await; 2923 drop(command_tx); 2924 handle.await.unwrap(); 2925 } 2926 2927 /// Create a mock codex server with `resume_session_id` set, so the actor 2928 /// sends `thread/resume` instead of `thread/start`. 2929 fn setup_integration_test_with_resume( 2930 resume_id: &str, 2931 ) -> ( 2932 MockCodex, 2933 tokio_mpsc::Sender<SessionCommand>, 2934 tokio::task::JoinHandle<()>, 2935 ) { 2936 let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192); 2937 let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192); 2938 2939 let mock = MockCodex { 2940 reader: BufReader::new(mock_stdin_read).lines(), 2941 writer: tokio::io::BufWriter::new(mock_stdout_write), 2942 }; 2943 2944 let actor_writer = tokio::io::BufWriter::new(actor_stdin_write); 2945 let actor_reader = BufReader::new(actor_stdout_read).lines(); 2946 2947 let (command_tx, mut command_rx) = tokio_mpsc::channel(16); 2948 let resume_id = resume_id.to_string(); 2949 2950 let handle = tokio::spawn(async move { 2951 session_actor_loop( 2952 "test-session-resume", 2953 actor_writer, 2954 actor_reader, 2955 Some("mock-model"), 2956 None, 2957 Some(&resume_id), 2958 &mut command_rx, 2959 ) 2960 .await; 2961 }); 2962 2963 (mock, command_tx, handle) 2964 } 2965 2966 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 2967 async fn test_integration_interrupt_during_stream() { 2968 let (mut mock, command_tx, handle) = setup_integration_test(); 2969 2970 mock.handle_init().await; 2971 mock.handle_thread_start().await; 2972 2973 let response_rx = send_query(&command_tx, "count to 100").await; 2974 mock.handle_turn_start().await; 2975 drain_session_info(&response_rx); 2976 2977 // Send a few tokens 2978 mock.send_delta("one ").await; 2979 mock.send_delta("two ").await; 2980 2981 // Give actor time to process the tokens 2982 tokio::time::sleep(Duration::from_millis(50)).await; 2983 2984 // Verify we got them 2985 let tok1 = response_rx 2986 .recv_timeout(Duration::from_secs(2)) 2987 .expect("expected token 1"); 2988 assert!(matches!(tok1, DaveApiResponse::Token(ref t) if t == "one ")); 2989 2990 // Send interrupt 2991 command_tx 2992 .send(SessionCommand::Interrupt { 2993 ctx: egui::Context::default(), 2994 }) 2995 .await 2996 .unwrap(); 2997 2998 // The actor should send turn/interrupt to codex 2999 let interrupt_msg = mock.read_message().await; 3000 assert_eq!(interrupt_msg.method.as_deref(), Some("turn/interrupt")); 3001 3002 // Codex responds with turn/completed after interrupt 3003 mock.send_turn_completed("interrupted").await; 3004 3005 // Actor should be ready for next command now 3006 drop(command_tx); 3007 handle.await.unwrap(); 3008 3009 // Verify we got the tokens before interrupt 3010 let responses = collect_responses(&response_rx); 3011 let tokens: Vec<_> = responses 3012 .iter() 3013 .filter_map(|r| match r { 3014 DaveApiResponse::Token(t) => Some(t.clone()), 3015 _ => None, 3016 }) 3017 .collect(); 3018 assert!(tokens.contains(&"two ".to_string())); 3019 } 3020 3021 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 3022 async fn test_integration_interrupt_during_approval() { 3023 let (mut mock, command_tx, handle) = setup_integration_test(); 3024 3025 mock.handle_init().await; 3026 mock.handle_thread_start().await; 3027 3028 let response_rx = send_query(&command_tx, "run something").await; 3029 mock.handle_turn_start().await; 3030 drain_session_info(&response_rx); 3031 3032 // Send an approval request 3033 mock.send_approval_request( 3034 50, 3035 "item/commandExecution/requestApproval", 3036 json!({ "command": "rm -rf /" }), 3037 ) 3038 .await; 3039 3040 // Wait for the PermissionRequest to arrive at the test 3041 let resp = response_rx 3042 .recv_timeout(Duration::from_secs(5)) 3043 .expect("timed out waiting for PermissionRequest"); 3044 match resp { 3045 DaveApiResponse::PermissionRequest(_pending) => { 3046 // Don't respond to the pending permission — send interrupt instead 3047 } 3048 other => panic!( 3049 "Expected PermissionRequest, got {:?}", 3050 std::mem::discriminant(&other) 3051 ), 3052 } 3053 3054 // Send interrupt while approval is pending 3055 command_tx 3056 .send(SessionCommand::Interrupt { 3057 ctx: egui::Context::default(), 3058 }) 3059 .await 3060 .unwrap(); 3061 3062 // Actor should send cancel for the approval 3063 let cancel_msg = mock.read_message().await; 3064 assert_eq!(cancel_msg.id, Some(50)); 3065 assert_eq!(cancel_msg.result.unwrap()["decision"], "cancel"); 3066 3067 // Then send turn/interrupt 3068 let interrupt_msg = mock.read_message().await; 3069 assert_eq!(interrupt_msg.method.as_deref(), Some("turn/interrupt")); 3070 3071 // Codex completes the turn 3072 mock.send_turn_completed("interrupted").await; 3073 3074 drop(command_tx); 3075 handle.await.unwrap(); 3076 } 3077 3078 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 3079 async fn test_integration_query_during_active_turn() { 3080 let (mut mock, command_tx, handle) = setup_integration_test(); 3081 3082 mock.handle_init().await; 3083 mock.handle_thread_start().await; 3084 3085 let response_rx1 = send_query(&command_tx, "first query").await; 3086 mock.handle_turn_start().await; 3087 3088 // Send some tokens so the turn is clearly active 3089 mock.send_delta("working...").await; 3090 3091 // Give actor time to enter the streaming loop 3092 tokio::time::sleep(Duration::from_millis(50)).await; 3093 3094 // Send a second query while the first is still active 3095 let response_rx2 = send_query(&command_tx, "second query").await; 3096 3097 // The second query should be immediately rejected 3098 let rejection = response_rx2 3099 .recv_timeout(Duration::from_secs(5)) 3100 .expect("timed out waiting for rejection"); 3101 match rejection { 3102 DaveApiResponse::Failed(msg) => { 3103 assert_eq!(msg, "Query already in progress"); 3104 } 3105 other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)), 3106 } 3107 3108 // First query continues normally 3109 mock.send_turn_completed("completed").await; 3110 3111 drop(command_tx); 3112 handle.await.unwrap(); 3113 3114 // Verify first query got its token 3115 let responses = collect_responses(&response_rx1); 3116 let tokens: Vec<_> = responses 3117 .iter() 3118 .filter_map(|r| match r { 3119 DaveApiResponse::Token(t) => Some(t.clone()), 3120 _ => None, 3121 }) 3122 .collect(); 3123 assert!(tokens.contains(&"working...".to_string())); 3124 } 3125 3126 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 3127 async fn test_integration_thread_resume() { 3128 let (mut mock, command_tx, handle) = 3129 setup_integration_test_with_resume("existing-thread-42"); 3130 3131 // Init handshake is the same 3132 mock.handle_init().await; 3133 3134 // Actor should send thread/resume instead of thread/start 3135 let req = mock.read_message().await; 3136 assert_eq!(req.method.as_deref(), Some("thread/resume")); 3137 let params = req.params.unwrap(); 3138 assert_eq!(params["threadId"], "existing-thread-42"); 3139 3140 // Respond with success 3141 let id = req.id.unwrap(); 3142 mock.send_line(&json!({ 3143 "id": id, 3144 "result": { "thread": { "id": "existing-thread-42" } } 3145 })) 3146 .await; 3147 3148 // Now send a query — should work the same as normal 3149 let response_rx = send_query(&command_tx, "resume prompt").await; 3150 mock.handle_turn_start().await; 3151 mock.send_delta("resumed!").await; 3152 mock.send_turn_completed("completed").await; 3153 3154 drop(command_tx); 3155 handle.await.unwrap(); 3156 3157 let responses = collect_responses(&response_rx); 3158 let tokens: Vec<_> = responses 3159 .iter() 3160 .filter_map(|r| match r { 3161 DaveApiResponse::Token(t) => Some(t.clone()), 3162 _ => None, 3163 }) 3164 .collect(); 3165 assert_eq!(tokens, vec!["resumed!"]); 3166 } 3167 3168 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 3169 async fn test_integration_malformed_jsonl() { 3170 let (mut mock, command_tx, handle) = setup_integration_test(); 3171 3172 mock.handle_init().await; 3173 mock.handle_thread_start().await; 3174 3175 let response_rx = send_query(&command_tx, "test").await; 3176 mock.handle_turn_start().await; 3177 3178 // Send valid token 3179 mock.send_delta("before").await; 3180 3181 // Send garbage that isn't valid JSON 3182 let mut garbage = "this is not json at all\n".to_string(); 3183 mock.writer.write_all(garbage.as_bytes()).await.unwrap(); 3184 mock.writer.flush().await.unwrap(); 3185 3186 // Send another valid token after the garbage 3187 mock.send_delta("after").await; 3188 3189 // Complete the turn 3190 mock.send_turn_completed("completed").await; 3191 3192 drop(command_tx); 3193 handle.await.unwrap(); 3194 3195 // Both valid tokens should have been received — the garbage line 3196 // should have been skipped with a warning, not crash the actor 3197 let responses = collect_responses(&response_rx); 3198 let tokens: Vec<_> = responses 3199 .iter() 3200 .filter_map(|r| match r { 3201 DaveApiResponse::Token(t) => Some(t.clone()), 3202 _ => None, 3203 }) 3204 .collect(); 3205 assert!( 3206 tokens.contains(&"before".to_string()), 3207 "Missing 'before' token, got: {:?}", 3208 tokens 3209 ); 3210 assert!( 3211 tokens.contains(&"after".to_string()), 3212 "Missing 'after' token after malformed line, got: {:?}", 3213 tokens 3214 ); 3215 } 3216 3217 // ----------------------------------------------------------------------- 3218 // Real-binary integration tests — require `codex` on PATH 3219 // Run with: cargo test -p notedeck_dave -- --ignored 3220 // ----------------------------------------------------------------------- 3221 3222 /// Helper: spawn a real codex app-server process and wire it into 3223 /// `session_actor_loop`. Returns the command sender, response receiver, 3224 /// and join handle. 3225 fn setup_real_codex_test() -> ( 3226 tokio_mpsc::Sender<SessionCommand>, 3227 mpsc::Receiver<DaveApiResponse>, 3228 tokio::task::JoinHandle<()>, 3229 ) { 3230 let codex_binary = std::env::var("CODEX_BINARY").unwrap_or_else(|_| "codex".to_string()); 3231 3232 let mut child = spawn_codex(&codex_binary, &None) 3233 .expect("Failed to spawn codex app-server — is codex installed?"); 3234 3235 let stdin = child.stdin.take().expect("stdin piped"); 3236 let stdout = child.stdout.take().expect("stdout piped"); 3237 3238 // Drain stderr to prevent pipe deadlock 3239 if let Some(stderr) = child.stderr.take() { 3240 tokio::spawn(async move { 3241 let mut lines = BufReader::new(stderr).lines(); 3242 while let Ok(Some(line)) = lines.next_line().await { 3243 eprintln!("[codex stderr] {}", line); 3244 } 3245 }); 3246 } 3247 3248 let writer = tokio::io::BufWriter::new(stdin); 3249 let reader = BufReader::new(stdout).lines(); 3250 3251 let (command_tx, mut command_rx) = tokio_mpsc::channel(16); 3252 3253 let handle = tokio::spawn(async move { 3254 session_actor_loop( 3255 "real-codex-test", 3256 writer, 3257 reader, 3258 None, // use codex default model 3259 None, // use current directory 3260 None, // no resume 3261 &mut command_rx, 3262 ) 3263 .await; 3264 let _ = child.kill().await; 3265 }); 3266 3267 let (response_tx, response_rx) = mpsc::channel(); 3268 // Send an initial query to trigger handshake + thread start + turn 3269 let command_tx_clone = command_tx.clone(); 3270 let rt_handle = tokio::runtime::Handle::current(); 3271 rt_handle.spawn(async move { 3272 command_tx_clone 3273 .send(SessionCommand::Query { 3274 prompt: "Say exactly: hello world".to_string(), 3275 response_tx, 3276 ctx: egui::Context::default(), 3277 }) 3278 .await 3279 .unwrap(); 3280 }); 3281 3282 (command_tx, response_rx, handle) 3283 } 3284 3285 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 3286 #[ignore] // Requires `codex` binary on PATH 3287 async fn test_real_codex_streaming() { 3288 let (command_tx, response_rx, handle) = setup_real_codex_test(); 3289 3290 // Wait for at least one token (with a generous timeout for API calls) 3291 let mut got_token = false; 3292 let deadline = std::time::Instant::now() + Duration::from_secs(60); 3293 3294 while std::time::Instant::now() < deadline { 3295 match response_rx.recv_timeout(Duration::from_secs(1)) { 3296 Ok(DaveApiResponse::Token(t)) => { 3297 eprintln!("[test] got token: {:?}", t); 3298 got_token = true; 3299 } 3300 Ok(DaveApiResponse::PermissionRequest(pending)) => { 3301 // Auto-accept any permission requests during test 3302 eprintln!( 3303 "[test] auto-accepting permission: {}", 3304 pending.request.tool_name 3305 ); 3306 let _ = pending 3307 .response_tx 3308 .send(PermissionResponse::Allow { message: None }); 3309 } 3310 Ok(DaveApiResponse::Failed(msg)) => { 3311 panic!("[test] codex turn failed: {}", msg); 3312 } 3313 Ok(other) => { 3314 eprintln!("[test] got response: {:?}", std::mem::discriminant(&other)); 3315 } 3316 Err(mpsc::RecvTimeoutError::Timeout) => { 3317 if got_token { 3318 break; // Got at least one token; stop waiting 3319 } 3320 } 3321 Err(mpsc::RecvTimeoutError::Disconnected) => break, 3322 } 3323 } 3324 3325 assert!( 3326 got_token, 3327 "Expected at least one Token response from real codex" 3328 ); 3329 3330 drop(command_tx); 3331 let _ = tokio::time::timeout(Duration::from_secs(10), handle).await; 3332 } 3333 3334 #[tokio::test(flavor = "multi_thread", worker_threads = 2)] 3335 #[ignore] // Requires `codex` binary on PATH 3336 async fn test_real_codex_turn_completes() { 3337 let (command_tx, response_rx, handle) = setup_real_codex_test(); 3338 3339 // Wait for turn to complete 3340 let mut got_turn_done = false; 3341 let mut got_any_response = false; 3342 let deadline = std::time::Instant::now() + Duration::from_secs(120); 3343 3344 while std::time::Instant::now() < deadline { 3345 match response_rx.recv_timeout(Duration::from_secs(2)) { 3346 Ok(DaveApiResponse::Token(_)) => { 3347 got_any_response = true; 3348 } 3349 Ok(DaveApiResponse::PermissionRequest(pending)) => { 3350 got_any_response = true; 3351 let _ = pending 3352 .response_tx 3353 .send(PermissionResponse::Allow { message: None }); 3354 } 3355 Ok(DaveApiResponse::Failed(msg)) => { 3356 eprintln!("[test] turn failed: {}", msg); 3357 // A failure is still a "completion" — codex responded 3358 got_turn_done = true; 3359 break; 3360 } 3361 Ok(_) => { 3362 got_any_response = true; 3363 } 3364 Err(mpsc::RecvTimeoutError::Timeout) => { 3365 if got_any_response { 3366 // Responses have stopped coming — turn likely completed 3367 // (turn/completed causes the actor to stop sending 3368 // and wait for the next command) 3369 got_turn_done = true; 3370 break; 3371 } 3372 } 3373 Err(mpsc::RecvTimeoutError::Disconnected) => { 3374 got_turn_done = true; 3375 break; 3376 } 3377 } 3378 } 3379 3380 assert!( 3381 got_turn_done, 3382 "Expected real codex turn to complete within timeout" 3383 ); 3384 3385 drop(command_tx); 3386 let _ = tokio::time::timeout(Duration::from_secs(10), handle).await; 3387 } 3388 }