notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

codex.rs (125158B)


      1 //! Codex backend — orchestrates OpenAI's Codex CLI (`codex app-server`)
      2 //! via its JSON-RPC-over-stdio protocol.
      3 
      4 use super::codex_protocol::*;
      5 use super::shared::{self, SessionCommand, SessionHandle};
      6 use crate::backend::traits::AiBackend;
      7 use crate::file_update::{FileUpdate, FileUpdateType};
      8 use crate::messages::{
      9     CompactionInfo, DaveApiResponse, PermissionResponse, SessionInfo, SubagentInfo, SubagentStatus,
     10     UsageInfo,
     11 };
     12 use crate::tools::Tool;
     13 use crate::Message;
     14 use claude_agent_sdk_rs::PermissionMode;
     15 use dashmap::DashMap;
     16 use serde_json::Value;
     17 use std::collections::HashMap;
     18 use std::path::PathBuf;
     19 use std::sync::mpsc;
     20 use std::sync::Arc;
     21 use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
     22 use tokio::process::{Child, Command};
     23 use tokio::sync::mpsc as tokio_mpsc;
     24 use tokio::sync::oneshot;
     25 use uuid::Uuid;
     26 
     27 // ---------------------------------------------------------------------------
     28 // Session actor
     29 // ---------------------------------------------------------------------------
     30 
     31 /// Result of processing a single Codex JSON-RPC message.
     32 enum HandleResult {
     33     /// Normal notification processed, keep reading.
     34     Continue,
     35     /// `turn/completed` received — this turn is done.
     36     TurnDone,
     37     /// Auto-accept matched — send accept for this rpc_id immediately.
     38     AutoAccepted(u64),
     39     /// Needs UI approval — stash the receiver and wait for the user.
     40     NeedsApproval {
     41         rpc_id: u64,
     42         rx: oneshot::Receiver<PermissionResponse>,
     43     },
     44 }
     45 
     46 /// Per-session actor that owns the `codex app-server` child process.
     47 async fn session_actor(
     48     session_id: String,
     49     cwd: Option<PathBuf>,
     50     codex_binary: String,
     51     model: Option<String>,
     52     resume_session_id: Option<String>,
     53     mut command_rx: tokio_mpsc::Receiver<SessionCommand>,
     54 ) {
     55     // Spawn the codex app-server child process
     56     let mut child = match spawn_codex(&codex_binary, &cwd) {
     57         Ok(c) => c,
     58         Err(err) => {
     59             tracing::error!("Session {} failed to spawn codex: {}", session_id, err);
     60             drain_commands_with_error(&mut command_rx, &format!("Failed to spawn codex: {}", err))
     61                 .await;
     62             return;
     63         }
     64     };
     65 
     66     let stdin = child.stdin.take().expect("stdin piped");
     67     let stdout = child.stdout.take().expect("stdout piped");
     68 
     69     // Drain stderr in a background task to prevent pipe deadlock
     70     if let Some(stderr) = child.stderr.take() {
     71         let sid = session_id.clone();
     72         tokio::spawn(async move {
     73             let mut lines = BufReader::new(stderr).lines();
     74             while let Ok(Some(line)) = lines.next_line().await {
     75                 tracing::trace!("Codex stderr [{}]: {}", sid, line);
     76             }
     77         });
     78     }
     79 
     80     let writer = tokio::io::BufWriter::new(stdin);
     81     let reader = BufReader::new(stdout).lines();
     82     let cwd_str = cwd.as_ref().map(|p| p.to_string_lossy().into_owned());
     83 
     84     session_actor_loop(
     85         &session_id,
     86         writer,
     87         reader,
     88         model.as_deref(),
     89         cwd_str.as_deref(),
     90         resume_session_id.as_deref(),
     91         &mut command_rx,
     92     )
     93     .await;
     94 
     95     let _ = child.kill().await;
     96     tracing::debug!("Session {} actor exited", session_id);
     97 }
     98 
     99 /// Core session loop, generic over I/O for testability.
    100 ///
    101 /// Performs the init handshake, thread start/resume, and main command loop.
    102 /// Returns when the session is shut down or an unrecoverable error occurs.
    103 /// The caller is responsible for process lifecycle (spawn, kill).
    104 async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
    105     session_id: &str,
    106     mut writer: tokio::io::BufWriter<W>,
    107     mut reader: tokio::io::Lines<R>,
    108     model: Option<&str>,
    109     cwd: Option<&str>,
    110     resume_session_id: Option<&str>,
    111     command_rx: &mut tokio_mpsc::Receiver<SessionCommand>,
    112 ) {
    113     // ---- init handshake ----
    114     if let Err(err) = do_init_handshake(&mut writer, &mut reader).await {
    115         tracing::error!("Session {} init handshake failed: {}", session_id, err);
    116         drain_commands_with_error(command_rx, &format!("Codex init handshake failed: {}", err))
    117             .await;
    118         return;
    119     }
    120 
    121     // ---- thread start / resume ----
    122     let thread_id = if let Some(tid) = resume_session_id {
    123         match send_thread_resume(&mut writer, &mut reader, tid).await {
    124             Ok(id) => id,
    125             Err(err) => {
    126                 tracing::error!("Session {} thread/resume failed: {}", session_id, err);
    127                 drain_commands_with_error(
    128                     command_rx,
    129                     &format!("Codex thread/resume failed: {}", err),
    130                 )
    131                 .await;
    132                 return;
    133             }
    134         }
    135     } else {
    136         match send_thread_start(&mut writer, &mut reader, model, cwd).await {
    137             Ok(id) => id,
    138             Err(err) => {
    139                 tracing::error!("Session {} thread/start failed: {}", session_id, err);
    140                 drain_commands_with_error(
    141                     command_rx,
    142                     &format!("Codex thread/start failed: {}", err),
    143                 )
    144                 .await;
    145                 return;
    146             }
    147         }
    148     };
    149 
    150     tracing::info!(
    151         "Session {} connected to codex, thread_id={}",
    152         session_id,
    153         thread_id
    154     );
    155 
    156     // ---- main command loop ----
    157     let mut request_counter: u64 = 10; // start after init IDs
    158     let mut current_turn_id: Option<String> = None;
    159     let mut sent_session_info = false;
    160     let mut turn_count: u32 = 0;
    161 
    162     while let Some(cmd) = command_rx.recv().await {
    163         match cmd {
    164             SessionCommand::Query {
    165                 prompt,
    166                 response_tx,
    167                 ctx,
    168             } => {
    169                 // Emit session info on the first query so Nostr replication can start
    170                 if !sent_session_info {
    171                     sent_session_info = true;
    172                     let info = SessionInfo {
    173                         model: model.map(|s| s.to_string()),
    174                         claude_session_id: Some(thread_id.clone()),
    175                         ..Default::default()
    176                     };
    177                     let _ = response_tx.send(DaveApiResponse::SessionInfo(info));
    178                     ctx.request_repaint();
    179                 }
    180 
    181                 // Send turn/start
    182                 turn_count += 1;
    183                 request_counter += 1;
    184                 let turn_req_id = request_counter;
    185                 if let Err(err) =
    186                     send_turn_start(&mut writer, turn_req_id, &thread_id, &prompt, model).await
    187                 {
    188                     tracing::error!("Session {} turn/start failed: {}", session_id, err);
    189                     let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
    190                     continue;
    191                 }
    192 
    193                 // Read the turn/start response
    194                 match read_response_for_id(&mut reader, turn_req_id).await {
    195                     Ok(msg) => {
    196                         if let Some(err) = msg.error {
    197                             tracing::error!(
    198                                 "Session {} turn/start error: {}",
    199                                 session_id,
    200                                 err.message
    201                             );
    202                             let _ = response_tx.send(DaveApiResponse::Failed(err.message));
    203                             continue;
    204                         }
    205                         if let Some(result) = &msg.result {
    206                             current_turn_id = result
    207                                 .get("turn")
    208                                 .and_then(|t| t.get("id"))
    209                                 .and_then(|v| v.as_str())
    210                                 .map(|s| s.to_string());
    211                         }
    212                     }
    213                     Err(err) => {
    214                         tracing::error!(
    215                             "Session {} failed reading turn/start response: {}",
    216                             session_id,
    217                             err
    218                         );
    219                         let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
    220                         continue;
    221                     }
    222                 }
    223 
    224                 // Stream notifications until turn/completed
    225                 let mut subagent_stack: Vec<String> = Vec::new();
    226                 let mut turn_done = false;
    227                 let mut pending_approval: Option<(u64, oneshot::Receiver<PermissionResponse>)> =
    228                     None;
    229 
    230                 while !turn_done {
    231                     if let Some((rpc_id, mut rx)) = pending_approval.take() {
    232                         // ---- approval-wait state ----
    233                         // Codex is blocked waiting for our response, so no new
    234                         // lines will arrive. Select between the UI response and
    235                         // commands (interrupt / shutdown).
    236                         tokio::select! {
    237                             biased;
    238 
    239                             Some(cmd) = command_rx.recv() => {
    240                                 match cmd {
    241                                     SessionCommand::Interrupt { ctx: int_ctx } => {
    242                                         tracing::debug!("Session {} interrupted during approval", session_id);
    243                                         // Cancel the approval and interrupt the turn
    244                                         let _ = send_approval_response(&mut writer, rpc_id, ApprovalDecision::Cancel).await;
    245                                         if let Some(ref tid) = current_turn_id {
    246                                             request_counter += 1;
    247                                             let _ = send_turn_interrupt(&mut writer, request_counter, &thread_id, tid).await;
    248                                         }
    249                                         int_ctx.request_repaint();
    250                                         // Don't restore pending — it's been cancelled
    251                                     }
    252                                     SessionCommand::Shutdown => {
    253                                         tracing::debug!("Session {} shutting down during approval", session_id);
    254                                         return;
    255                                     }
    256                                     SessionCommand::Query { response_tx: new_tx, .. } => {
    257                                         let _ = new_tx.send(DaveApiResponse::Failed(
    258                                             "Query already in progress".to_string(),
    259                                         ));
    260                                         // Restore the pending approval — still waiting
    261                                         pending_approval = Some((rpc_id, rx));
    262                                     }
    263                                     SessionCommand::SetPermissionMode { ctx: mode_ctx, .. } => {
    264                                         mode_ctx.request_repaint();
    265                                         pending_approval = Some((rpc_id, rx));
    266                                     }
    267                                     SessionCommand::Compact { response_tx: compact_tx, .. } => {
    268                                         let _ = compact_tx.send(DaveApiResponse::Failed(
    269                                             "Cannot compact during active turn".to_string(),
    270                                         ));
    271                                         pending_approval = Some((rpc_id, rx));
    272                                     }
    273                                 }
    274                             }
    275 
    276                             result = &mut rx => {
    277                                 let decision = match result {
    278                                     Ok(PermissionResponse::Allow { .. }) => ApprovalDecision::Accept,
    279                                     Ok(PermissionResponse::Deny { .. }) => ApprovalDecision::Decline,
    280                                     Err(_) => ApprovalDecision::Cancel,
    281                                 };
    282                                 let _ = send_approval_response(&mut writer, rpc_id, decision).await;
    283                             }
    284                         }
    285                     } else {
    286                         // ---- normal streaming state ----
    287                         tokio::select! {
    288                             biased;
    289 
    290                             Some(cmd) = command_rx.recv() => {
    291                                 match cmd {
    292                                     SessionCommand::Interrupt { ctx: int_ctx } => {
    293                                         tracing::debug!("Session {} interrupted", session_id);
    294                                         if let Some(ref tid) = current_turn_id {
    295                                             request_counter += 1;
    296                                             let _ = send_turn_interrupt(&mut writer, request_counter, &thread_id, tid).await;
    297                                         }
    298                                         int_ctx.request_repaint();
    299                                     }
    300                                     SessionCommand::Query { response_tx: new_tx, .. } => {
    301                                         let _ = new_tx.send(DaveApiResponse::Failed(
    302                                             "Query already in progress".to_string(),
    303                                         ));
    304                                     }
    305                                     SessionCommand::SetPermissionMode { mode, ctx: mode_ctx } => {
    306                                         tracing::debug!(
    307                                             "Session {} ignoring permission mode {:?} (not supported by Codex)",
    308                                             session_id, mode
    309                                         );
    310                                         mode_ctx.request_repaint();
    311                                     }
    312                                     SessionCommand::Compact { response_tx: compact_tx, .. } => {
    313                                         let _ = compact_tx.send(DaveApiResponse::Failed(
    314                                             "Cannot compact during active turn".to_string(),
    315                                         ));
    316                                     }
    317                                     SessionCommand::Shutdown => {
    318                                         tracing::debug!("Session {} shutting down during query", session_id);
    319                                         return;
    320                                     }
    321                                 }
    322                             }
    323 
    324                             line_result = reader.next_line() => {
    325                                 match line_result {
    326                                     Ok(Some(line)) => {
    327                                         let msg: RpcMessage = match serde_json::from_str(&line) {
    328                                             Ok(m) => m,
    329                                             Err(err) => {
    330                                                 tracing::warn!("Codex parse error: {} in: {}", err, &line[..line.len().min(200)]);
    331                                                 continue;
    332                                             }
    333                                         };
    334 
    335                                         match handle_codex_message(
    336                                             msg,
    337                                             &response_tx,
    338                                             &ctx,
    339                                             &mut subagent_stack,
    340                                             &turn_count,
    341                                         ) {
    342                                             HandleResult::Continue => {}
    343                                             HandleResult::TurnDone => {
    344                                                 turn_done = true;
    345                                             }
    346                                             HandleResult::AutoAccepted(rpc_id) => {
    347                                                 let _ = send_approval_response(
    348                                                     &mut writer, rpc_id, ApprovalDecision::Accept,
    349                                                 ).await;
    350                                             }
    351                                             HandleResult::NeedsApproval { rpc_id, rx } => {
    352                                                 pending_approval = Some((rpc_id, rx));
    353                                             }
    354                                         }
    355                                     }
    356                                     Ok(None) => {
    357                                         tracing::error!("Session {} codex process exited unexpectedly", session_id);
    358                                         let _ = response_tx.send(DaveApiResponse::Failed(
    359                                             "Codex process exited unexpectedly".to_string(),
    360                                         ));
    361                                         turn_done = true;
    362                                     }
    363                                     Err(err) => {
    364                                         tracing::error!("Session {} read error: {}", session_id, err);
    365                                         let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
    366                                         turn_done = true;
    367                                     }
    368                                 }
    369                             }
    370                         }
    371                     }
    372                 }
    373 
    374                 current_turn_id = None;
    375                 // Drop the response channel so the main loop sees a
    376                 // Disconnected signal and finalizes the assistant message
    377                 // (builds kind-1988 event for Nostr replication).
    378                 drop(response_tx);
    379                 tracing::debug!("Turn complete for session {}", session_id);
    380             }
    381             SessionCommand::Interrupt { ctx } => {
    382                 ctx.request_repaint();
    383             }
    384             SessionCommand::SetPermissionMode { mode, ctx } => {
    385                 tracing::debug!(
    386                     "Session {} ignoring permission mode {:?} (not supported by Codex)",
    387                     session_id,
    388                     mode
    389                 );
    390                 ctx.request_repaint();
    391             }
    392             SessionCommand::Compact { response_tx, ctx } => {
    393                 request_counter += 1;
    394                 let compact_req_id = request_counter;
    395 
    396                 // Send thread/compact/start RPC
    397                 if let Err(err) = send_thread_compact(&mut writer, compact_req_id, &thread_id).await
    398                 {
    399                     tracing::error!(
    400                         "Session {} thread/compact/start failed: {}",
    401                         session_id,
    402                         err
    403                     );
    404                     let _ = response_tx.send(DaveApiResponse::Failed(err));
    405                     ctx.request_repaint();
    406                     continue;
    407                 }
    408 
    409                 // Read the RPC response (empty {})
    410                 match read_response_for_id(&mut reader, compact_req_id).await {
    411                     Ok(msg) => {
    412                         if let Some(err) = msg.error {
    413                             tracing::error!(
    414                                 "Session {} thread/compact/start error: {}",
    415                                 session_id,
    416                                 err.message
    417                             );
    418                             let _ = response_tx.send(DaveApiResponse::Failed(err.message));
    419                             ctx.request_repaint();
    420                             continue;
    421                         }
    422                     }
    423                     Err(err) => {
    424                         tracing::error!(
    425                             "Session {} failed reading compact response: {}",
    426                             session_id,
    427                             err
    428                         );
    429                         let _ = response_tx.send(DaveApiResponse::Failed(err));
    430                         ctx.request_repaint();
    431                         continue;
    432                     }
    433                 }
    434 
    435                 // Compact accepted — stream notifications until item/completed
    436                 let _ = response_tx.send(DaveApiResponse::CompactionStarted);
    437                 ctx.request_repaint();
    438 
    439                 loop {
    440                     tokio::select! {
    441                         biased;
    442 
    443                         Some(cmd) = command_rx.recv() => {
    444                             match cmd {
    445                                 SessionCommand::Shutdown => {
    446                                     tracing::debug!("Session {} shutting down during compact", session_id);
    447                                     return;
    448                                 }
    449                                 _ => {
    450                                     // Ignore other commands during compaction
    451                                 }
    452                             }
    453                         }
    454 
    455                         line_result = reader.next_line() => {
    456                             match line_result {
    457                                 Ok(Some(line)) => {
    458                                     let msg: RpcMessage = match serde_json::from_str(&line) {
    459                                         Ok(m) => m,
    460                                         Err(err) => {
    461                                             tracing::warn!("Codex parse error during compact: {}", err);
    462                                             continue;
    463                                         }
    464                                     };
    465 
    466                                     // Look for item/completed with contextCompaction
    467                                     if msg.method.as_deref() == Some("item/completed") {
    468                                         if let Some(ref params) = msg.params {
    469                                             let item_type = params.get("type")
    470                                                 .and_then(|v| v.as_str())
    471                                                 .unwrap_or("");
    472                                             if item_type == "contextCompaction" {
    473                                                 let pre_tokens = params.get("preTokens")
    474                                                     .and_then(|v| v.as_u64())
    475                                                     .unwrap_or(0);
    476                                                 let _ = response_tx.send(DaveApiResponse::CompactionComplete(
    477                                                     CompactionInfo { pre_tokens },
    478                                                 ));
    479                                                 ctx.request_repaint();
    480                                                 break;
    481                                             }
    482                                         }
    483                                     }
    484                                 }
    485                                 Ok(None) => {
    486                                     tracing::error!("Session {} codex process exited during compact", session_id);
    487                                     let _ = response_tx.send(DaveApiResponse::Failed(
    488                                         "Codex process exited during compaction".to_string(),
    489                                     ));
    490                                     break;
    491                                 }
    492                                 Err(err) => {
    493                                     tracing::error!("Session {} read error during compact: {}", session_id, err);
    494                                     let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
    495                                     break;
    496                                 }
    497                             }
    498                         }
    499                     }
    500                 }
    501 
    502                 // Drop response channel to signal completion
    503                 drop(response_tx);
    504                 tracing::debug!("Compaction complete for session {}", session_id);
    505             }
    506             SessionCommand::Shutdown => {
    507                 tracing::debug!("Session {} shutting down", session_id);
    508                 break;
    509             }
    510         }
    511     }
    512 }
    513 
    514 // ---------------------------------------------------------------------------
    515 // Codex message handling (synchronous — no writer needed)
    516 // ---------------------------------------------------------------------------
    517 
    518 /// Extract a human-readable error message from a Codex error notification.
    519 ///
    520 /// Codex sends errors in several different shapes:
    521 ///   - `params.message` (string)
    522 ///   - `params.msg.message` (nested JSON string containing `{"detail":"..."}`)
    523 ///   - `params.error.message` (object with a `message` field)
    524 ///   - top-level `msg.error.message` (RPC error envelope)
    525 fn extract_codex_error(msg: &RpcMessage) -> String {
    526     if let Some(params) = &msg.params {
    527         // params.message (string)
    528         if let Some(s) = params.get("message").and_then(|m| m.as_str()) {
    529             return s.to_string();
    530         }
    531         // params.msg.message (nested — may itself be JSON like {"detail":"..."})
    532         if let Some(inner) = params
    533             .get("msg")
    534             .and_then(|m| m.get("message"))
    535             .and_then(|m| m.as_str())
    536         {
    537             // Try to unwrap a JSON {"detail":"..."} wrapper
    538             if let Ok(v) = serde_json::from_str::<serde_json::Value>(inner) {
    539                 if let Some(detail) = v.get("detail").and_then(|d| d.as_str()) {
    540                     return detail.to_string();
    541                 }
    542             }
    543             return inner.to_string();
    544         }
    545         // params.error.message (error is an object, not a string)
    546         if let Some(inner) = params
    547             .get("error")
    548             .and_then(|e| e.get("message"))
    549             .and_then(|m| m.as_str())
    550         {
    551             if let Ok(v) = serde_json::from_str::<serde_json::Value>(inner) {
    552                 if let Some(detail) = v.get("detail").and_then(|d| d.as_str()) {
    553                     return detail.to_string();
    554                 }
    555             }
    556             return inner.to_string();
    557         }
    558         // params.error (plain string)
    559         if let Some(s) = params.get("error").and_then(|e| e.as_str()) {
    560             return s.to_string();
    561         }
    562     }
    563     // Top-level RPC error envelope
    564     if let Some(rpc_err) = &msg.error {
    565         return rpc_err.message.clone();
    566     }
    567     // Last resort — dump raw params
    568     if let Some(p) = &msg.params {
    569         tracing::debug!("Codex error unknown shape: {}", p);
    570     }
    571     "Codex error (no details)".to_string()
    572 }
    573 
    574 /// Process a single incoming Codex JSON-RPC message. Returns a `HandleResult`
    575 /// indicating what the caller should do next (continue, finish turn, or handle
    576 /// an approval).
    577 fn handle_codex_message(
    578     msg: RpcMessage,
    579     response_tx: &mpsc::Sender<DaveApiResponse>,
    580     ctx: &egui::Context,
    581     subagent_stack: &mut Vec<String>,
    582     turn_count: &u32,
    583 ) -> HandleResult {
    584     let method = match &msg.method {
    585         Some(m) => m.as_str(),
    586         None => {
    587             tracing::debug!("codex msg with no method (response): id={:?}", msg.id);
    588             return HandleResult::Continue;
    589         }
    590     };
    591 
    592     tracing::debug!(
    593         "codex msg: method={} id={:?} has_params={}",
    594         method,
    595         msg.id,
    596         msg.params.is_some()
    597     );
    598 
    599     match method {
    600         "item/agentMessage/delta" => {
    601             if let Some(params) = msg.params {
    602                 if let Ok(delta) = serde_json::from_value::<AgentMessageDeltaParams>(params) {
    603                     let _ = response_tx.send(DaveApiResponse::Token(delta.delta));
    604                     ctx.request_repaint();
    605                 }
    606             }
    607         }
    608 
    609         "item/started" => {
    610             if let Some(params) = msg.params {
    611                 if let Ok(started) = serde_json::from_value::<ItemStartedParams>(params) {
    612                     match started.item_type.as_str() {
    613                         "collabAgentToolCall" => {
    614                             let item_id = started
    615                                 .item_id
    616                                 .unwrap_or_else(|| Uuid::new_v4().to_string());
    617                             subagent_stack.push(item_id.clone());
    618                             let info = SubagentInfo {
    619                                 task_id: item_id,
    620                                 description: started.name.unwrap_or_else(|| "agent".to_string()),
    621                                 subagent_type: "codex-agent".to_string(),
    622                                 status: SubagentStatus::Running,
    623                                 output: String::new(),
    624                                 max_output_size: 4000,
    625                                 tool_results: Vec::new(),
    626                             };
    627                             let _ = response_tx.send(DaveApiResponse::SubagentSpawned(info));
    628                             ctx.request_repaint();
    629                         }
    630                         "commandExecution" => {
    631                             let cmd = started.command.unwrap_or_default();
    632                             let tool_input = serde_json::json!({ "command": cmd });
    633                             let result_value = serde_json::json!({});
    634                             shared::send_tool_result(
    635                                 "Bash",
    636                                 &tool_input,
    637                                 &result_value,
    638                                 None,
    639                                 subagent_stack,
    640                                 response_tx,
    641                                 ctx,
    642                             );
    643                         }
    644                         "fileChange" => {
    645                             let path = started.file_path.unwrap_or_default();
    646                             let tool_input = serde_json::json!({ "file_path": path });
    647                             let result_value = serde_json::json!({});
    648                             shared::send_tool_result(
    649                                 "Edit",
    650                                 &tool_input,
    651                                 &result_value,
    652                                 None,
    653                                 subagent_stack,
    654                                 response_tx,
    655                                 ctx,
    656                             );
    657                         }
    658                         "contextCompaction" => {
    659                             let _ = response_tx.send(DaveApiResponse::CompactionStarted);
    660                             ctx.request_repaint();
    661                         }
    662                         _ => {}
    663                     }
    664                 }
    665             }
    666         }
    667 
    668         "item/completed" => {
    669             if let Some(params) = msg.params {
    670                 if let Ok(completed) = serde_json::from_value::<ItemCompletedParams>(params) {
    671                     handle_item_completed(&completed, response_tx, ctx, subagent_stack);
    672                 }
    673             }
    674         }
    675 
    676         "item/commandExecution/requestApproval" => {
    677             tracing::info!(
    678                 "CMD APPROVAL: id={:?} has_params={}",
    679                 msg.id,
    680                 msg.params.is_some()
    681             );
    682             if let (Some(rpc_id), Some(params)) = (msg.id, msg.params) {
    683                 tracing::info!(
    684                     "CMD APPROVAL params: {}",
    685                     serde_json::to_string(&params).unwrap_or_default()
    686                 );
    687                 match serde_json::from_value::<CommandApprovalParams>(params) {
    688                     Ok(approval) => {
    689                         let cmd = approval.command_string();
    690                         tracing::info!("CMD APPROVAL deserialized ok: command={}", cmd);
    691                         return check_approval_or_forward(
    692                             rpc_id,
    693                             "Bash",
    694                             serde_json::json!({ "command": cmd }),
    695                             response_tx,
    696                             ctx,
    697                         );
    698                     }
    699                     Err(e) => {
    700                         tracing::error!("CMD APPROVAL deser FAILED: {}", e);
    701                     }
    702                 }
    703             } else {
    704                 tracing::warn!("CMD APPROVAL missing id or params");
    705             }
    706         }
    707 
    708         "item/fileChange/requestApproval" => {
    709             tracing::info!(
    710                 "FILE APPROVAL: id={:?} has_params={}",
    711                 msg.id,
    712                 msg.params.is_some()
    713             );
    714             if let (Some(rpc_id), Some(params)) = (msg.id, msg.params) {
    715                 tracing::info!(
    716                     "FILE APPROVAL params: {}",
    717                     serde_json::to_string(&params).unwrap_or_default()
    718                 );
    719                 match serde_json::from_value::<FileChangeApprovalParams>(params) {
    720                     Ok(approval) => {
    721                         let file_path = approval.file_path.as_deref().unwrap_or("unknown");
    722                         let kind_str = approval
    723                             .kind
    724                             .as_ref()
    725                             .and_then(|k| k.get("type").and_then(|t| t.as_str()))
    726                             .unwrap_or("edit");
    727 
    728                         let (tool_name, tool_input) = match kind_str {
    729                             "create" => (
    730                                 "Write",
    731                                 serde_json::json!({
    732                                     "file_path": file_path,
    733                                     "content": approval.diff.as_deref().unwrap_or(""),
    734                                 }),
    735                             ),
    736                             _ => (
    737                                 "Edit",
    738                                 serde_json::json!({
    739                                     "file_path": file_path,
    740                                     "old_string": "",
    741                                     "new_string": approval.diff.as_deref().unwrap_or(""),
    742                                 }),
    743                             ),
    744                         };
    745 
    746                         tracing::info!(
    747                             "FILE APPROVAL deserialized ok: tool={} file={}",
    748                             tool_name,
    749                             file_path
    750                         );
    751                         return check_approval_or_forward(
    752                             rpc_id,
    753                             tool_name,
    754                             tool_input,
    755                             response_tx,
    756                             ctx,
    757                         );
    758                     }
    759                     Err(e) => {
    760                         tracing::error!("FILE APPROVAL deser FAILED: {}", e);
    761                     }
    762                 }
    763             } else {
    764                 tracing::warn!("FILE APPROVAL missing id or params");
    765             }
    766         }
    767 
    768         "thread/tokenUsage/updated" => {
    769             if let Some(params) = msg.params {
    770                 if let Ok(usage) = serde_json::from_value::<TokenUsageParams>(params) {
    771                     let info = UsageInfo {
    772                         input_tokens: usage.token_usage.total.input_tokens as u64,
    773                         output_tokens: usage.token_usage.total.output_tokens as u64,
    774                         num_turns: *turn_count,
    775                         cost_usd: None,
    776                     };
    777                     let _ = response_tx.send(DaveApiResponse::QueryComplete(info));
    778                     ctx.request_repaint();
    779                 }
    780             }
    781         }
    782 
    783         "turn/completed" => {
    784             if let Some(params) = msg.params {
    785                 if let Ok(completed) = serde_json::from_value::<TurnCompletedParams>(params) {
    786                     if completed.status == "failed" {
    787                         let err_msg = completed.error.unwrap_or_else(|| "Turn failed".to_string());
    788                         let _ = response_tx.send(DaveApiResponse::Failed(err_msg));
    789                     }
    790                 }
    791             }
    792             return HandleResult::TurnDone;
    793         }
    794 
    795         "codex/event/patch_apply_begin" => {
    796             // Legacy event carrying full file-change details (paths + diffs).
    797             // The V2 `item/completed` for fileChange is sparse, so we extract
    798             // the diff from the legacy event and emit ToolResults here.
    799             if let Some(params) = &msg.params {
    800                 if let Some(changes) = params
    801                     .get("msg")
    802                     .and_then(|m| m.get("changes"))
    803                     .and_then(|c| c.as_object())
    804                 {
    805                     for (path, change) in changes {
    806                         let change_type = change
    807                             .get("type")
    808                             .and_then(|t| t.as_str())
    809                             .unwrap_or("update");
    810 
    811                         let (tool_name, diff_text) = match change_type {
    812                             "add" => {
    813                                 let content =
    814                                     change.get("content").and_then(|c| c.as_str()).unwrap_or("");
    815                                 ("Write", content.to_string())
    816                             }
    817                             "delete" => ("Edit", "(file deleted)".to_string()),
    818                             _ => {
    819                                 // "update" — has unified_diff
    820                                 let diff = change
    821                                     .get("unified_diff")
    822                                     .and_then(|d| d.as_str())
    823                                     .unwrap_or("");
    824                                 ("Edit", diff.to_string())
    825                             }
    826                         };
    827 
    828                         let tool_input = serde_json::json!({
    829                             "file_path": path,
    830                             "diff": diff_text,
    831                         });
    832                         let result_value = serde_json::json!({ "status": "ok" });
    833                         let file_update =
    834                             make_codex_file_update(path, tool_name, change_type, &diff_text);
    835                         shared::send_tool_result(
    836                             tool_name,
    837                             &tool_input,
    838                             &result_value,
    839                             file_update,
    840                             subagent_stack,
    841                             response_tx,
    842                             ctx,
    843                         );
    844                     }
    845                     ctx.request_repaint();
    846                 }
    847             }
    848         }
    849 
    850         "codex/event/error" | "error" => {
    851             let err_msg: String = extract_codex_error(&msg);
    852             tracing::warn!("Codex error: {}", err_msg);
    853             let _ = response_tx.send(DaveApiResponse::Failed(err_msg));
    854             ctx.request_repaint();
    855         }
    856 
    857         other => {
    858             tracing::debug!(
    859                 "Unhandled codex notification: {} id={:?} params={}",
    860                 other,
    861                 msg.id,
    862                 msg.params
    863                     .as_ref()
    864                     .map(|p| serde_json::to_string(p).unwrap_or_default())
    865                     .unwrap_or_default()
    866             );
    867         }
    868     }
    869 
    870     HandleResult::Continue
    871 }
    872 
    873 /// Check auto-accept rules. If matched, return `AutoAccepted`. Otherwise
    874 /// create a `PendingPermission`, send it to the UI, and return `NeedsApproval`
    875 /// with the oneshot receiver.
    876 fn check_approval_or_forward(
    877     rpc_id: u64,
    878     tool_name: &str,
    879     tool_input: Value,
    880     response_tx: &mpsc::Sender<DaveApiResponse>,
    881     ctx: &egui::Context,
    882 ) -> HandleResult {
    883     if shared::should_auto_accept(tool_name, &tool_input) {
    884         return HandleResult::AutoAccepted(rpc_id);
    885     }
    886 
    887     match shared::forward_permission_to_ui(tool_name, tool_input, response_tx, ctx) {
    888         Some(rx) => HandleResult::NeedsApproval { rpc_id, rx },
    889         // Can't reach UI — auto-accept as fallback
    890         None => HandleResult::AutoAccepted(rpc_id),
    891     }
    892 }
    893 
    894 /// Build a `FileUpdate` from codex file-change data.
    895 fn make_codex_file_update(
    896     path: &str,
    897     tool_name: &str,
    898     change_type: &str,
    899     diff_text: &str,
    900 ) -> Option<FileUpdate> {
    901     let update_type = match (tool_name, change_type) {
    902         ("Write", _) | (_, "add") | (_, "create") => FileUpdateType::Write {
    903             content: diff_text.to_string(),
    904         },
    905         _ => FileUpdateType::UnifiedDiff {
    906             diff: diff_text.to_string(),
    907         },
    908     };
    909     Some(FileUpdate::new(path.to_string(), update_type))
    910 }
    911 
    912 /// Handle a completed item from Codex.
    913 fn handle_item_completed(
    914     completed: &ItemCompletedParams,
    915     response_tx: &mpsc::Sender<DaveApiResponse>,
    916     ctx: &egui::Context,
    917     subagent_stack: &mut Vec<String>,
    918 ) {
    919     match completed.item_type.as_str() {
    920         "commandExecution" => {
    921             let command = completed.command.clone().unwrap_or_default();
    922             let exit_code = completed.exit_code.unwrap_or(-1);
    923             let output = completed.output.clone().unwrap_or_default();
    924 
    925             let tool_input = serde_json::json!({ "command": command });
    926             let result_value = serde_json::json!({ "output": output, "exit_code": exit_code });
    927             shared::send_tool_result(
    928                 "Bash",
    929                 &tool_input,
    930                 &result_value,
    931                 None,
    932                 subagent_stack,
    933                 response_tx,
    934                 ctx,
    935             );
    936         }
    937 
    938         "fileChange" => {
    939             let file_path = completed.file_path.clone().unwrap_or_default();
    940             let diff = completed.diff.clone();
    941 
    942             let kind_str = completed
    943                 .kind
    944                 .as_ref()
    945                 .and_then(|k| k.get("type").and_then(|t| t.as_str()))
    946                 .unwrap_or("edit");
    947 
    948             let tool_name = match kind_str {
    949                 "create" => "Write",
    950                 _ => "Edit",
    951             };
    952 
    953             let tool_input = serde_json::json!({
    954                 "file_path": file_path,
    955                 "diff": diff,
    956             });
    957             let result_value = serde_json::json!({ "status": "ok" });
    958             let file_update = make_codex_file_update(
    959                 &file_path,
    960                 tool_name,
    961                 kind_str,
    962                 diff.as_deref().unwrap_or(""),
    963             );
    964             shared::send_tool_result(
    965                 tool_name,
    966                 &tool_input,
    967                 &result_value,
    968                 file_update,
    969                 subagent_stack,
    970                 response_tx,
    971                 ctx,
    972             );
    973         }
    974 
    975         "collabAgentToolCall" => {
    976             if let Some(item_id) = &completed.item_id {
    977                 let result_text = completed
    978                     .result
    979                     .clone()
    980                     .unwrap_or_else(|| "completed".to_string());
    981                 shared::complete_subagent(item_id, &result_text, subagent_stack, response_tx, ctx);
    982             }
    983         }
    984 
    985         "contextCompaction" => {
    986             let pre_tokens = completed.pre_tokens.unwrap_or(0);
    987             let _ = response_tx.send(DaveApiResponse::CompactionComplete(CompactionInfo {
    988                 pre_tokens,
    989             }));
    990             ctx.request_repaint();
    991         }
    992 
    993         other => {
    994             tracing::debug!("Unhandled item/completed type: {}", other);
    995         }
    996     }
    997 }
    998 
    999 // ---------------------------------------------------------------------------
   1000 // Codex process spawning and JSON-RPC helpers
   1001 // ---------------------------------------------------------------------------
   1002 
   1003 fn spawn_codex(binary: &str, cwd: &Option<PathBuf>) -> Result<Child, std::io::Error> {
   1004     let mut cmd = Command::new(binary);
   1005     cmd.arg("app-server");
   1006     cmd.stdin(std::process::Stdio::piped());
   1007     cmd.stdout(std::process::Stdio::piped());
   1008     cmd.stderr(std::process::Stdio::piped());
   1009     if let Some(dir) = cwd {
   1010         cmd.current_dir(dir);
   1011     }
   1012     cmd.spawn()
   1013 }
   1014 
   1015 /// Send a JSONL request on stdin.
   1016 async fn send_request<P: serde::Serialize, W: AsyncWrite + Unpin>(
   1017     writer: &mut tokio::io::BufWriter<W>,
   1018     req: &RpcRequest<P>,
   1019 ) -> Result<(), std::io::Error> {
   1020     let mut line = serde_json::to_string(req)
   1021         .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
   1022     line.push('\n');
   1023     writer.write_all(line.as_bytes()).await?;
   1024     writer.flush().await?;
   1025     Ok(())
   1026 }
   1027 
   1028 /// Send a JSON-RPC response (for approval requests).
   1029 async fn send_rpc_response<W: AsyncWrite + Unpin>(
   1030     writer: &mut tokio::io::BufWriter<W>,
   1031     id: u64,
   1032     result: Value,
   1033 ) -> Result<(), std::io::Error> {
   1034     let resp = serde_json::json!({ "id": id, "result": result });
   1035     let mut line = serde_json::to_string(&resp)
   1036         .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
   1037     line.push('\n');
   1038     writer.write_all(line.as_bytes()).await?;
   1039     writer.flush().await?;
   1040     Ok(())
   1041 }
   1042 
   1043 /// Send an approval decision response.
   1044 async fn send_approval_response<W: AsyncWrite + Unpin>(
   1045     writer: &mut tokio::io::BufWriter<W>,
   1046     rpc_id: u64,
   1047     decision: ApprovalDecision,
   1048 ) -> Result<(), std::io::Error> {
   1049     let result = serde_json::to_value(ApprovalResponse { decision }).unwrap();
   1050     send_rpc_response(writer, rpc_id, result).await
   1051 }
   1052 
   1053 /// Perform the `initialize` → `initialized` handshake.
   1054 async fn do_init_handshake<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
   1055     writer: &mut tokio::io::BufWriter<W>,
   1056     reader: &mut tokio::io::Lines<R>,
   1057 ) -> Result<(), String> {
   1058     let req = RpcRequest {
   1059         id: Some(1),
   1060         method: "initialize",
   1061         params: InitializeParams {
   1062             client_info: ClientInfo {
   1063                 name: "dave".to_string(),
   1064                 version: env!("CARGO_PKG_VERSION").to_string(),
   1065             },
   1066             capabilities: serde_json::json!({}),
   1067         },
   1068     };
   1069 
   1070     send_request(writer, &req)
   1071         .await
   1072         .map_err(|e| format!("Failed to send initialize: {}", e))?;
   1073 
   1074     let resp = read_response_for_id(reader, 1)
   1075         .await
   1076         .map_err(|e| format!("Failed to read initialize response: {}", e))?;
   1077 
   1078     if let Some(err) = resp.error {
   1079         return Err(format!("Initialize error: {}", err.message));
   1080     }
   1081 
   1082     // Send `initialized` notification (no id, no response expected)
   1083     let notif: RpcRequest<Value> = RpcRequest {
   1084         id: None,
   1085         method: "initialized",
   1086         params: serde_json::json!({}),
   1087     };
   1088     send_request(writer, &notif)
   1089         .await
   1090         .map_err(|e| format!("Failed to send initialized: {}", e))?;
   1091 
   1092     Ok(())
   1093 }
   1094 
   1095 /// Send `thread/start` and return the thread ID.
   1096 async fn send_thread_start<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
   1097     writer: &mut tokio::io::BufWriter<W>,
   1098     reader: &mut tokio::io::Lines<R>,
   1099     model: Option<&str>,
   1100     cwd: Option<&str>,
   1101 ) -> Result<String, String> {
   1102     let req = RpcRequest {
   1103         id: Some(2),
   1104         method: "thread/start",
   1105         params: ThreadStartParams {
   1106             model: model.map(|s| s.to_string()),
   1107             cwd: cwd.map(|s| s.to_string()),
   1108             approval_policy: Some("on-request".to_string()),
   1109         },
   1110     };
   1111 
   1112     send_request(writer, &req)
   1113         .await
   1114         .map_err(|e| format!("Failed to send thread/start: {}", e))?;
   1115 
   1116     let resp = read_response_for_id(reader, 2)
   1117         .await
   1118         .map_err(|e| format!("Failed to read thread/start response: {}", e))?;
   1119 
   1120     if let Some(err) = resp.error {
   1121         return Err(format!("thread/start error: {}", err.message));
   1122     }
   1123 
   1124     let result = resp.result.ok_or("No result in thread/start response")?;
   1125     let thread_result: ThreadStartResult = serde_json::from_value(result)
   1126         .map_err(|e| format!("Failed to parse thread/start result: {}", e))?;
   1127 
   1128     Ok(thread_result.thread.id)
   1129 }
   1130 
   1131 /// Send `thread/resume` and return the thread ID.
   1132 async fn send_thread_resume<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
   1133     writer: &mut tokio::io::BufWriter<W>,
   1134     reader: &mut tokio::io::Lines<R>,
   1135     thread_id: &str,
   1136 ) -> Result<String, String> {
   1137     let req = RpcRequest {
   1138         id: Some(3),
   1139         method: "thread/resume",
   1140         params: ThreadResumeParams {
   1141             thread_id: thread_id.to_string(),
   1142         },
   1143     };
   1144 
   1145     send_request(writer, &req)
   1146         .await
   1147         .map_err(|e| format!("Failed to send thread/resume: {}", e))?;
   1148 
   1149     let resp = read_response_for_id(reader, 3)
   1150         .await
   1151         .map_err(|e| format!("Failed to read thread/resume response: {}", e))?;
   1152 
   1153     if let Some(err) = resp.error {
   1154         return Err(format!("thread/resume error: {}", err.message));
   1155     }
   1156 
   1157     Ok(thread_id.to_string())
   1158 }
   1159 
   1160 /// Send `turn/start`.
   1161 async fn send_turn_start<W: AsyncWrite + Unpin>(
   1162     writer: &mut tokio::io::BufWriter<W>,
   1163     req_id: u64,
   1164     thread_id: &str,
   1165     prompt: &str,
   1166     model: Option<&str>,
   1167 ) -> Result<(), String> {
   1168     let req = RpcRequest {
   1169         id: Some(req_id),
   1170         method: "turn/start",
   1171         params: TurnStartParams {
   1172             thread_id: thread_id.to_string(),
   1173             input: vec![TurnInput::Text {
   1174                 text: prompt.to_string(),
   1175             }],
   1176             model: model.map(|s| s.to_string()),
   1177             effort: None,
   1178         },
   1179     };
   1180 
   1181     send_request(writer, &req)
   1182         .await
   1183         .map_err(|e| format!("Failed to send turn/start: {}", e))
   1184 }
   1185 
   1186 /// Send `turn/interrupt`.
   1187 async fn send_turn_interrupt<W: AsyncWrite + Unpin>(
   1188     writer: &mut tokio::io::BufWriter<W>,
   1189     req_id: u64,
   1190     thread_id: &str,
   1191     turn_id: &str,
   1192 ) -> Result<(), String> {
   1193     let req = RpcRequest {
   1194         id: Some(req_id),
   1195         method: "turn/interrupt",
   1196         params: TurnInterruptParams {
   1197             thread_id: thread_id.to_string(),
   1198             turn_id: turn_id.to_string(),
   1199         },
   1200     };
   1201 
   1202     send_request(writer, &req)
   1203         .await
   1204         .map_err(|e| format!("Failed to send turn/interrupt: {}", e))
   1205 }
   1206 
   1207 /// Send `thread/compact/start`.
   1208 async fn send_thread_compact<W: AsyncWrite + Unpin>(
   1209     writer: &mut tokio::io::BufWriter<W>,
   1210     req_id: u64,
   1211     thread_id: &str,
   1212 ) -> Result<(), String> {
   1213     let req = RpcRequest {
   1214         id: Some(req_id),
   1215         method: "thread/compact/start",
   1216         params: ThreadCompactParams {
   1217             thread_id: thread_id.to_string(),
   1218         },
   1219     };
   1220 
   1221     send_request(writer, &req)
   1222         .await
   1223         .map_err(|e| format!("Failed to send thread/compact/start: {}", e))
   1224 }
   1225 
   1226 /// Read lines until we find a response matching the given request id.
   1227 /// Non-matching messages (notifications) are logged and skipped.
   1228 async fn read_response_for_id<R: AsyncBufRead + Unpin>(
   1229     reader: &mut tokio::io::Lines<R>,
   1230     expected_id: u64,
   1231 ) -> Result<RpcMessage, String> {
   1232     loop {
   1233         let line = reader
   1234             .next_line()
   1235             .await
   1236             .map_err(|e| format!("IO error: {}", e))?
   1237             .ok_or_else(|| "EOF while waiting for response".to_string())?;
   1238 
   1239         let msg: RpcMessage = serde_json::from_str(&line).map_err(|e| {
   1240             format!(
   1241                 "JSON parse error: {} in: {}",
   1242                 e,
   1243                 &line[..line.len().min(200)]
   1244             )
   1245         })?;
   1246 
   1247         if msg.id == Some(expected_id) {
   1248             return Ok(msg);
   1249         }
   1250 
   1251         tracing::trace!(
   1252             "Skipping message during handshake (waiting for id={}): method={:?}",
   1253             expected_id,
   1254             msg.method
   1255         );
   1256     }
   1257 }
   1258 
   1259 /// Drain pending commands, sending error to any Query commands.
   1260 async fn drain_commands_with_error(
   1261     command_rx: &mut tokio_mpsc::Receiver<SessionCommand>,
   1262     error: &str,
   1263 ) {
   1264     while let Some(cmd) = command_rx.recv().await {
   1265         match &cmd {
   1266             SessionCommand::Query { response_tx, .. }
   1267             | SessionCommand::Compact { response_tx, .. } => {
   1268                 let _ = response_tx.send(DaveApiResponse::Failed(error.to_string()));
   1269             }
   1270             _ => {}
   1271         }
   1272         if matches!(cmd, SessionCommand::Shutdown) {
   1273             break;
   1274         }
   1275     }
   1276 }
   1277 
   1278 // ---------------------------------------------------------------------------
   1279 // Public backend
   1280 // ---------------------------------------------------------------------------
   1281 
   1282 pub struct CodexBackend {
   1283     codex_binary: String,
   1284     sessions: DashMap<String, SessionHandle>,
   1285 }
   1286 
   1287 impl CodexBackend {
   1288     pub fn new(codex_binary: String) -> Self {
   1289         Self {
   1290             codex_binary,
   1291             sessions: DashMap::new(),
   1292         }
   1293     }
   1294 }
   1295 
   1296 impl AiBackend for CodexBackend {
   1297     fn stream_request(
   1298         &self,
   1299         messages: Vec<Message>,
   1300         _tools: Arc<HashMap<String, Tool>>,
   1301         model: String,
   1302         _user_id: String,
   1303         session_id: String,
   1304         cwd: Option<PathBuf>,
   1305         resume_session_id: Option<String>,
   1306         ctx: egui::Context,
   1307     ) -> (
   1308         mpsc::Receiver<DaveApiResponse>,
   1309         Option<tokio::task::JoinHandle<()>>,
   1310     ) {
   1311         let (response_tx, response_rx) = mpsc::channel();
   1312 
   1313         let prompt = shared::prepare_prompt(&messages, &resume_session_id);
   1314 
   1315         tracing::debug!(
   1316             "Codex request: session={}, resumed={}, prompt_len={}",
   1317             session_id,
   1318             resume_session_id.is_some(),
   1319             prompt.len(),
   1320         );
   1321 
   1322         let command_tx = {
   1323             let entry = self.sessions.entry(session_id.clone());
   1324             let codex_binary = self.codex_binary.clone();
   1325             let model_clone = model.clone();
   1326             let cwd_clone = cwd.clone();
   1327             let resume_clone = resume_session_id.clone();
   1328             let handle = entry.or_insert_with(|| {
   1329                 let (command_tx, command_rx) = tokio_mpsc::channel(16);
   1330                 let sid = session_id.clone();
   1331                 tokio::spawn(async move {
   1332                     session_actor(
   1333                         sid,
   1334                         cwd_clone,
   1335                         codex_binary,
   1336                         Some(model_clone),
   1337                         resume_clone,
   1338                         command_rx,
   1339                     )
   1340                     .await;
   1341                 });
   1342                 SessionHandle { command_tx }
   1343             });
   1344             handle.command_tx.clone()
   1345         };
   1346 
   1347         let handle = tokio::spawn(async move {
   1348             if let Err(err) = command_tx
   1349                 .send(SessionCommand::Query {
   1350                     prompt,
   1351                     response_tx,
   1352                     ctx,
   1353                 })
   1354                 .await
   1355             {
   1356                 tracing::error!("Failed to send query to codex session actor: {}", err);
   1357             }
   1358         });
   1359 
   1360         (response_rx, Some(handle))
   1361     }
   1362 
   1363     fn cleanup_session(&self, session_id: String) {
   1364         if let Some((_, handle)) = self.sessions.remove(&session_id) {
   1365             tokio::spawn(async move {
   1366                 if let Err(err) = handle.command_tx.send(SessionCommand::Shutdown).await {
   1367                     tracing::warn!("Failed to send shutdown to codex session: {}", err);
   1368                 }
   1369             });
   1370         }
   1371     }
   1372 
   1373     fn interrupt_session(&self, session_id: String, ctx: egui::Context) {
   1374         if let Some(handle) = self.sessions.get(&session_id) {
   1375             let command_tx = handle.command_tx.clone();
   1376             tokio::spawn(async move {
   1377                 if let Err(err) = command_tx.send(SessionCommand::Interrupt { ctx }).await {
   1378                     tracing::warn!("Failed to send interrupt to codex session: {}", err);
   1379                 }
   1380             });
   1381         }
   1382     }
   1383 
   1384     fn set_permission_mode(&self, session_id: String, mode: PermissionMode, ctx: egui::Context) {
   1385         if let Some(handle) = self.sessions.get(&session_id) {
   1386             let command_tx = handle.command_tx.clone();
   1387             tokio::spawn(async move {
   1388                 if let Err(err) = command_tx
   1389                     .send(SessionCommand::SetPermissionMode { mode, ctx })
   1390                     .await
   1391                 {
   1392                     tracing::warn!(
   1393                         "Failed to send set_permission_mode to codex session: {}",
   1394                         err
   1395                     );
   1396                 }
   1397             });
   1398         }
   1399     }
   1400 
   1401     fn compact_session(
   1402         &self,
   1403         session_id: String,
   1404         ctx: egui::Context,
   1405     ) -> Option<mpsc::Receiver<DaveApiResponse>> {
   1406         let handle = self.sessions.get(&session_id)?;
   1407         let command_tx = handle.command_tx.clone();
   1408         let (response_tx, response_rx) = mpsc::channel();
   1409         tokio::spawn(async move {
   1410             if let Err(err) = command_tx
   1411                 .send(SessionCommand::Compact { response_tx, ctx })
   1412                 .await
   1413             {
   1414                 tracing::warn!("Failed to send compact to codex session: {}", err);
   1415             }
   1416         });
   1417         Some(response_rx)
   1418     }
   1419 }
   1420 
   1421 #[cfg(test)]
   1422 mod tests {
   1423     use super::*;
   1424     use crate::messages::{AssistantMessage, DaveApiResponse};
   1425     use serde_json::json;
   1426     use std::time::Duration;
   1427 
   1428     /// Helper: build an RpcMessage from a method and params JSON
   1429     fn notification(method: &str, params: Value) -> RpcMessage {
   1430         RpcMessage {
   1431             id: None,
   1432             method: Some(method.to_string()),
   1433             result: None,
   1434             error: None,
   1435             params: Some(params),
   1436         }
   1437     }
   1438 
   1439     /// Helper: build an RpcMessage that is a server→client request (has id)
   1440     fn server_request(id: u64, method: &str, params: Value) -> RpcMessage {
   1441         RpcMessage {
   1442             id: Some(id),
   1443             method: Some(method.to_string()),
   1444             result: None,
   1445             error: None,
   1446             params: Some(params),
   1447         }
   1448     }
   1449 
   1450     // -----------------------------------------------------------------------
   1451     // Protocol serde tests
   1452     // -----------------------------------------------------------------------
   1453 
   1454     #[test]
   1455     fn test_rpc_request_serialization() {
   1456         let req = RpcRequest {
   1457             id: Some(1),
   1458             method: "initialize",
   1459             params: InitializeParams {
   1460                 client_info: ClientInfo {
   1461                     name: "dave".to_string(),
   1462                     version: "0.1.0".to_string(),
   1463                 },
   1464                 capabilities: json!({}),
   1465             },
   1466         };
   1467         let json = serde_json::to_string(&req).unwrap();
   1468         assert!(json.contains("\"id\":1"));
   1469         assert!(json.contains("\"method\":\"initialize\""));
   1470         assert!(json.contains("\"clientInfo\""));
   1471     }
   1472 
   1473     #[test]
   1474     fn test_rpc_request_notification_omits_id() {
   1475         let req: RpcRequest<Value> = RpcRequest {
   1476             id: None,
   1477             method: "initialized",
   1478             params: json!({}),
   1479         };
   1480         let json = serde_json::to_string(&req).unwrap();
   1481         assert!(!json.contains("\"id\""));
   1482     }
   1483 
   1484     #[test]
   1485     fn test_rpc_message_deserialization_response() {
   1486         let json = r#"{"id":1,"result":{"serverInfo":{"name":"codex"}}}"#;
   1487         let msg: RpcMessage = serde_json::from_str(json).unwrap();
   1488         assert_eq!(msg.id, Some(1));
   1489         assert!(msg.result.is_some());
   1490         assert!(msg.method.is_none());
   1491     }
   1492 
   1493     #[test]
   1494     fn test_rpc_message_deserialization_notification() {
   1495         let json = r#"{"method":"item/agentMessage/delta","params":{"delta":"hello"}}"#;
   1496         let msg: RpcMessage = serde_json::from_str(json).unwrap();
   1497         assert!(msg.id.is_none());
   1498         assert_eq!(msg.method.as_deref(), Some("item/agentMessage/delta"));
   1499     }
   1500 
   1501     #[test]
   1502     fn test_thread_start_result_deserialization() {
   1503         let json = r#"{"thread":{"id":"thread_abc123"},"model":"gpt-5.2-codex"}"#;
   1504         let result: ThreadStartResult = serde_json::from_str(json).unwrap();
   1505         assert_eq!(result.thread.id, "thread_abc123");
   1506         assert_eq!(result.model.as_deref(), Some("gpt-5.2-codex"));
   1507     }
   1508 
   1509     #[test]
   1510     fn test_approval_response_serialization() {
   1511         let resp = ApprovalResponse {
   1512             decision: ApprovalDecision::Accept,
   1513         };
   1514         let json = serde_json::to_string(&resp).unwrap();
   1515         assert!(json.contains("\"decision\":\"accept\""));
   1516 
   1517         let resp = ApprovalResponse {
   1518             decision: ApprovalDecision::Decline,
   1519         };
   1520         let json = serde_json::to_string(&resp).unwrap();
   1521         assert!(json.contains("\"decision\":\"decline\""));
   1522     }
   1523 
   1524     #[test]
   1525     fn test_turn_input_serialization() {
   1526         let input = TurnInput::Text {
   1527             text: "hello".to_string(),
   1528         };
   1529         let json = serde_json::to_string(&input).unwrap();
   1530         assert!(json.contains("\"type\":\"text\""));
   1531         assert!(json.contains("\"text\":\"hello\""));
   1532     }
   1533 
   1534     // -----------------------------------------------------------------------
   1535     // handle_codex_message tests
   1536     // -----------------------------------------------------------------------
   1537 
   1538     #[test]
   1539     fn test_handle_delta_sends_token() {
   1540         let (tx, rx) = mpsc::channel();
   1541         let ctx = egui::Context::default();
   1542         let mut subagents = Vec::new();
   1543 
   1544         let msg = notification("item/agentMessage/delta", json!({ "delta": "Hello world" }));
   1545 
   1546         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1547         assert!(matches!(result, HandleResult::Continue));
   1548 
   1549         let response = rx.try_recv().unwrap();
   1550         match response {
   1551             DaveApiResponse::Token(t) => assert_eq!(t, "Hello world"),
   1552             other => panic!("Expected Token, got {:?}", std::mem::discriminant(&other)),
   1553         }
   1554     }
   1555 
   1556     #[test]
   1557     fn test_handle_turn_completed_success() {
   1558         let (tx, _rx) = mpsc::channel();
   1559         let ctx = egui::Context::default();
   1560         let mut subagents = Vec::new();
   1561 
   1562         let msg = notification("turn/completed", json!({ "status": "completed" }));
   1563         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1564         assert!(matches!(result, HandleResult::TurnDone));
   1565     }
   1566 
   1567     #[test]
   1568     fn test_handle_turn_completed_failure_sends_error() {
   1569         let (tx, rx) = mpsc::channel();
   1570         let ctx = egui::Context::default();
   1571         let mut subagents = Vec::new();
   1572 
   1573         let msg = notification(
   1574             "turn/completed",
   1575             json!({ "status": "failed", "error": "rate limit exceeded" }),
   1576         );
   1577         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1578         assert!(matches!(result, HandleResult::TurnDone));
   1579 
   1580         let response = rx.try_recv().unwrap();
   1581         match response {
   1582             DaveApiResponse::Failed(err) => assert_eq!(err, "rate limit exceeded"),
   1583             other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
   1584         }
   1585     }
   1586 
   1587     #[test]
   1588     fn test_handle_response_message_ignored() {
   1589         let (tx, rx) = mpsc::channel();
   1590         let ctx = egui::Context::default();
   1591         let mut subagents = Vec::new();
   1592 
   1593         // A response (has id, no method) — should be ignored
   1594         let msg = RpcMessage {
   1595             id: Some(42),
   1596             method: None,
   1597             result: Some(json!({})),
   1598             error: None,
   1599             params: None,
   1600         };
   1601         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1602         assert!(matches!(result, HandleResult::Continue));
   1603         assert!(rx.try_recv().is_err()); // nothing sent
   1604     }
   1605 
   1606     #[test]
   1607     fn test_handle_unknown_method_ignored() {
   1608         let (tx, rx) = mpsc::channel();
   1609         let ctx = egui::Context::default();
   1610         let mut subagents = Vec::new();
   1611 
   1612         let msg = notification("some/future/event", json!({}));
   1613         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1614         assert!(matches!(result, HandleResult::Continue));
   1615         assert!(rx.try_recv().is_err());
   1616     }
   1617 
   1618     #[test]
   1619     fn test_handle_codex_event_error_sends_failed() {
   1620         let (tx, rx) = mpsc::channel();
   1621         let ctx = egui::Context::default();
   1622         let mut subagents = Vec::new();
   1623 
   1624         let msg = notification(
   1625             "codex/event/error",
   1626             json!({ "message": "context window exceeded" }),
   1627         );
   1628         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1629         assert!(matches!(result, HandleResult::Continue));
   1630 
   1631         let response = rx.try_recv().unwrap();
   1632         match response {
   1633             DaveApiResponse::Failed(err) => assert_eq!(err, "context window exceeded"),
   1634             other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
   1635         }
   1636     }
   1637 
   1638     #[test]
   1639     fn test_handle_error_notification_sends_failed() {
   1640         let (tx, rx) = mpsc::channel();
   1641         let ctx = egui::Context::default();
   1642         let mut subagents = Vec::new();
   1643 
   1644         let msg = notification("error", json!({ "message": "something broke" }));
   1645         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1646         assert!(matches!(result, HandleResult::Continue));
   1647 
   1648         let response = rx.try_recv().unwrap();
   1649         match response {
   1650             DaveApiResponse::Failed(err) => assert_eq!(err, "something broke"),
   1651             other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
   1652         }
   1653     }
   1654 
   1655     #[test]
   1656     fn test_handle_error_without_message_uses_default() {
   1657         let (tx, rx) = mpsc::channel();
   1658         let ctx = egui::Context::default();
   1659         let mut subagents = Vec::new();
   1660 
   1661         let msg = notification("codex/event/error", json!({}));
   1662         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1663         assert!(matches!(result, HandleResult::Continue));
   1664 
   1665         let response = rx.try_recv().unwrap();
   1666         match response {
   1667             DaveApiResponse::Failed(err) => assert_eq!(err, "Codex error (no details)"),
   1668             other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
   1669         }
   1670     }
   1671 
   1672     #[test]
   1673     fn test_handle_error_nested_msg_message() {
   1674         let (tx, rx) = mpsc::channel();
   1675         let ctx = egui::Context::default();
   1676         let mut subagents = Vec::new();
   1677 
   1678         // Codex shape: params.msg.message is JSON with a "detail" field
   1679         let msg = notification(
   1680             "codex/event/error",
   1681             json!({
   1682                 "id": "1",
   1683                 "msg": {
   1684                     "type": "error",
   1685                     "message": "{\"detail\":\"The model is not supported.\"}",
   1686                     "codex_error_info": "other"
   1687                 },
   1688                 "conversationId": "thread-1"
   1689             }),
   1690         );
   1691         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1692         assert!(matches!(result, HandleResult::Continue));
   1693 
   1694         let response = rx.try_recv().unwrap();
   1695         match response {
   1696             DaveApiResponse::Failed(err) => assert_eq!(err, "The model is not supported."),
   1697             other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
   1698         }
   1699     }
   1700 
   1701     #[test]
   1702     fn test_handle_error_nested_error_object() {
   1703         let (tx, rx) = mpsc::channel();
   1704         let ctx = egui::Context::default();
   1705         let mut subagents = Vec::new();
   1706 
   1707         // Codex shape: params.error is an object with a "message" field
   1708         let msg = notification(
   1709             "error",
   1710             json!({
   1711                 "error": {
   1712                     "message": "{\"detail\":\"Rate limit exceeded.\"}",
   1713                     "codexErrorInfo": "other",
   1714                     "additionalDetails": null
   1715                 },
   1716                 "willRetry": false,
   1717                 "threadId": "thread-1",
   1718                 "turnId": "1"
   1719             }),
   1720         );
   1721         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1722         assert!(matches!(result, HandleResult::Continue));
   1723 
   1724         let response = rx.try_recv().unwrap();
   1725         match response {
   1726             DaveApiResponse::Failed(err) => assert_eq!(err, "Rate limit exceeded."),
   1727             other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
   1728         }
   1729     }
   1730 
   1731     #[test]
   1732     fn test_handle_subagent_started() {
   1733         let (tx, rx) = mpsc::channel();
   1734         let ctx = egui::Context::default();
   1735         let mut subagents = Vec::new();
   1736 
   1737         let msg = notification(
   1738             "item/started",
   1739             json!({
   1740                 "type": "collabAgentToolCall",
   1741                 "itemId": "agent-1",
   1742                 "name": "research agent"
   1743             }),
   1744         );
   1745         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1746         assert!(matches!(result, HandleResult::Continue));
   1747         assert_eq!(subagents.len(), 1);
   1748         assert_eq!(subagents[0], "agent-1");
   1749 
   1750         let response = rx.try_recv().unwrap();
   1751         match response {
   1752             DaveApiResponse::SubagentSpawned(info) => {
   1753                 assert_eq!(info.task_id, "agent-1");
   1754                 assert_eq!(info.description, "research agent");
   1755             }
   1756             other => panic!(
   1757                 "Expected SubagentSpawned, got {:?}",
   1758                 std::mem::discriminant(&other)
   1759             ),
   1760         }
   1761     }
   1762 
   1763     #[test]
   1764     fn test_handle_command_approval_auto_accept() {
   1765         let (tx, rx) = mpsc::channel();
   1766         let ctx = egui::Context::default();
   1767         let mut subagents = Vec::new();
   1768 
   1769         // "git status" should be auto-accepted by default rules
   1770         let msg = server_request(
   1771             99,
   1772             "item/commandExecution/requestApproval",
   1773             json!({ "command": "git status" }),
   1774         );
   1775         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1776         match result {
   1777             HandleResult::AutoAccepted(id) => assert_eq!(id, 99),
   1778             other => panic!(
   1779                 "Expected AutoAccepted, got {:?}",
   1780                 std::mem::discriminant(&other)
   1781             ),
   1782         }
   1783         // No permission request sent to UI
   1784         assert!(rx.try_recv().is_err());
   1785     }
   1786 
   1787     #[test]
   1788     fn test_handle_command_approval_needs_ui() {
   1789         let (tx, rx) = mpsc::channel();
   1790         let ctx = egui::Context::default();
   1791         let mut subagents = Vec::new();
   1792 
   1793         // "rm -rf /" should NOT be auto-accepted
   1794         let msg = server_request(
   1795             100,
   1796             "item/commandExecution/requestApproval",
   1797             json!({ "command": "rm -rf /" }),
   1798         );
   1799         let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
   1800         match result {
   1801             HandleResult::NeedsApproval { rpc_id, .. } => assert_eq!(rpc_id, 100),
   1802             other => panic!(
   1803                 "Expected NeedsApproval, got {:?}",
   1804                 std::mem::discriminant(&other)
   1805             ),
   1806         }
   1807 
   1808         // Permission request should have been sent to UI
   1809         let response = rx.try_recv().unwrap();
   1810         assert!(matches!(response, DaveApiResponse::PermissionRequest(_)));
   1811     }
   1812 
   1813     // -----------------------------------------------------------------------
   1814     // handle_item_completed tests
   1815     // -----------------------------------------------------------------------
   1816 
   1817     #[test]
   1818     fn test_item_completed_command_execution() {
   1819         let (tx, rx) = mpsc::channel();
   1820         let ctx = egui::Context::default();
   1821         let mut subagents = Vec::new();
   1822 
   1823         let completed = ItemCompletedParams {
   1824             item_type: "commandExecution".to_string(),
   1825             item_id: None,
   1826             command: Some("ls -la".to_string()),
   1827             exit_code: Some(0),
   1828             output: Some("total 42\n".to_string()),
   1829             file_path: None,
   1830             diff: None,
   1831             kind: None,
   1832             result: None,
   1833             pre_tokens: None,
   1834             content: None,
   1835         };
   1836 
   1837         handle_item_completed(&completed, &tx, &ctx, &mut subagents);
   1838 
   1839         let response = rx.try_recv().unwrap();
   1840         match response {
   1841             DaveApiResponse::ToolResult(tool) => {
   1842                 assert_eq!(tool.tool_name, "Bash");
   1843                 assert!(tool.parent_task_id.is_none());
   1844             }
   1845             other => panic!(
   1846                 "Expected ToolResult, got {:?}",
   1847                 std::mem::discriminant(&other)
   1848             ),
   1849         }
   1850     }
   1851 
   1852     #[test]
   1853     fn test_item_completed_file_change_edit() {
   1854         let (tx, rx) = mpsc::channel();
   1855         let ctx = egui::Context::default();
   1856         let mut subagents = Vec::new();
   1857 
   1858         let completed = ItemCompletedParams {
   1859             item_type: "fileChange".to_string(),
   1860             item_id: None,
   1861             command: None,
   1862             exit_code: None,
   1863             output: None,
   1864             file_path: Some("src/main.rs".to_string()),
   1865             diff: Some("@@ -1,3 +1,3 @@\n-old\n+new\n context\n".to_string()),
   1866             kind: Some(json!({"type": "edit"})),
   1867             result: None,
   1868             pre_tokens: None,
   1869             content: None,
   1870         };
   1871 
   1872         handle_item_completed(&completed, &tx, &ctx, &mut subagents);
   1873 
   1874         let response = rx.try_recv().unwrap();
   1875         match response {
   1876             DaveApiResponse::ToolResult(tool) => {
   1877                 assert_eq!(tool.tool_name, "Edit");
   1878             }
   1879             other => panic!(
   1880                 "Expected ToolResult, got {:?}",
   1881                 std::mem::discriminant(&other)
   1882             ),
   1883         }
   1884     }
   1885 
   1886     #[test]
   1887     fn test_item_completed_file_change_create() {
   1888         let (tx, rx) = mpsc::channel();
   1889         let ctx = egui::Context::default();
   1890         let mut subagents = Vec::new();
   1891 
   1892         let completed = ItemCompletedParams {
   1893             item_type: "fileChange".to_string(),
   1894             item_id: None,
   1895             command: None,
   1896             exit_code: None,
   1897             output: None,
   1898             file_path: Some("new_file.rs".to_string()),
   1899             diff: None,
   1900             kind: Some(json!({"type": "create"})),
   1901             result: None,
   1902             pre_tokens: None,
   1903             content: None,
   1904         };
   1905 
   1906         handle_item_completed(&completed, &tx, &ctx, &mut subagents);
   1907 
   1908         let response = rx.try_recv().unwrap();
   1909         match response {
   1910             DaveApiResponse::ToolResult(tool) => {
   1911                 assert_eq!(tool.tool_name, "Write");
   1912             }
   1913             other => panic!(
   1914                 "Expected ToolResult, got {:?}",
   1915                 std::mem::discriminant(&other)
   1916             ),
   1917         }
   1918     }
   1919 
   1920     #[test]
   1921     fn test_item_completed_subagent() {
   1922         let (tx, rx) = mpsc::channel();
   1923         let ctx = egui::Context::default();
   1924         let mut subagents = vec!["agent-1".to_string()];
   1925 
   1926         let completed = ItemCompletedParams {
   1927             item_type: "collabAgentToolCall".to_string(),
   1928             item_id: Some("agent-1".to_string()),
   1929             command: None,
   1930             exit_code: None,
   1931             output: None,
   1932             file_path: None,
   1933             diff: None,
   1934             kind: None,
   1935             result: Some("Found 3 relevant files".to_string()),
   1936             pre_tokens: None,
   1937             content: None,
   1938         };
   1939 
   1940         handle_item_completed(&completed, &tx, &ctx, &mut subagents);
   1941 
   1942         // Subagent removed from stack
   1943         assert!(subagents.is_empty());
   1944 
   1945         let response = rx.try_recv().unwrap();
   1946         match response {
   1947             DaveApiResponse::SubagentCompleted { task_id, result } => {
   1948                 assert_eq!(task_id, "agent-1");
   1949                 assert_eq!(result, "Found 3 relevant files");
   1950             }
   1951             other => panic!(
   1952                 "Expected SubagentCompleted, got {:?}",
   1953                 std::mem::discriminant(&other)
   1954             ),
   1955         }
   1956     }
   1957 
   1958     #[test]
   1959     fn test_item_completed_compaction() {
   1960         let (tx, rx) = mpsc::channel();
   1961         let ctx = egui::Context::default();
   1962         let mut subagents = Vec::new();
   1963 
   1964         let completed = ItemCompletedParams {
   1965             item_type: "contextCompaction".to_string(),
   1966             item_id: None,
   1967             command: None,
   1968             exit_code: None,
   1969             output: None,
   1970             file_path: None,
   1971             diff: None,
   1972             kind: None,
   1973             result: None,
   1974             pre_tokens: Some(50000),
   1975             content: None,
   1976         };
   1977 
   1978         handle_item_completed(&completed, &tx, &ctx, &mut subagents);
   1979 
   1980         let response = rx.try_recv().unwrap();
   1981         match response {
   1982             DaveApiResponse::CompactionComplete(info) => {
   1983                 assert_eq!(info.pre_tokens, 50000);
   1984             }
   1985             other => panic!(
   1986                 "Expected CompactionComplete, got {:?}",
   1987                 std::mem::discriminant(&other)
   1988             ),
   1989         }
   1990     }
   1991 
   1992     #[test]
   1993     fn test_item_completed_with_parent_subagent() {
   1994         let (tx, rx) = mpsc::channel();
   1995         let ctx = egui::Context::default();
   1996         let mut subagents = vec!["parent-agent".to_string()];
   1997 
   1998         let completed = ItemCompletedParams {
   1999             item_type: "commandExecution".to_string(),
   2000             item_id: None,
   2001             command: Some("cargo test".to_string()),
   2002             exit_code: Some(0),
   2003             output: Some("ok".to_string()),
   2004             file_path: None,
   2005             diff: None,
   2006             kind: None,
   2007             result: None,
   2008             pre_tokens: None,
   2009             content: None,
   2010         };
   2011 
   2012         handle_item_completed(&completed, &tx, &ctx, &mut subagents);
   2013 
   2014         let response = rx.try_recv().unwrap();
   2015         match response {
   2016             DaveApiResponse::ToolResult(tool) => {
   2017                 assert_eq!(tool.tool_name, "Bash");
   2018                 assert_eq!(tool.parent_task_id.as_deref(), Some("parent-agent"));
   2019             }
   2020             other => panic!(
   2021                 "Expected ToolResult, got {:?}",
   2022                 std::mem::discriminant(&other)
   2023             ),
   2024         }
   2025     }
   2026 
   2027     // -----------------------------------------------------------------------
   2028     // check_approval_or_forward tests
   2029     // -----------------------------------------------------------------------
   2030 
   2031     #[test]
   2032     fn test_approval_auto_accept_read_tool() {
   2033         let (tx, rx) = mpsc::channel();
   2034         let ctx = egui::Context::default();
   2035 
   2036         // Glob/Grep/Read are always auto-accepted
   2037         let result = check_approval_or_forward(1, "Glob", json!({"pattern": "*.rs"}), &tx, &ctx);
   2038         assert!(matches!(result, HandleResult::AutoAccepted(1)));
   2039         assert!(rx.try_recv().is_err()); // no UI request
   2040     }
   2041 
   2042     #[test]
   2043     fn test_approval_forwards_dangerous_command() {
   2044         let (tx, rx) = mpsc::channel();
   2045         let ctx = egui::Context::default();
   2046 
   2047         let result =
   2048             check_approval_or_forward(42, "Bash", json!({"command": "sudo rm -rf /"}), &tx, &ctx);
   2049         match result {
   2050             HandleResult::NeedsApproval { rpc_id, .. } => assert_eq!(rpc_id, 42),
   2051             other => panic!(
   2052                 "Expected NeedsApproval, got {:?}",
   2053                 std::mem::discriminant(&other)
   2054             ),
   2055         }
   2056 
   2057         // Permission request sent to UI
   2058         let response = rx.try_recv().unwrap();
   2059         match response {
   2060             DaveApiResponse::PermissionRequest(pending) => {
   2061                 assert_eq!(pending.request.tool_name, "Bash");
   2062             }
   2063             other => panic!(
   2064                 "Expected PermissionRequest, got {:?}",
   2065                 std::mem::discriminant(&other)
   2066             ),
   2067         }
   2068     }
   2069 
   2070     // -----------------------------------------------------------------------
   2071     // get_pending_user_messages tests
   2072     // -----------------------------------------------------------------------
   2073 
   2074     #[test]
   2075     fn pending_messages_single_user() {
   2076         let messages = vec![Message::User("hello".into())];
   2077         assert_eq!(shared::get_pending_user_messages(&messages), "hello");
   2078     }
   2079 
   2080     #[test]
   2081     fn pending_messages_multiple_trailing_users() {
   2082         let messages = vec![
   2083             Message::User("first".into()),
   2084             Message::Assistant(AssistantMessage::from_text("reply".into())),
   2085             Message::User("second".into()),
   2086             Message::User("third".into()),
   2087             Message::User("fourth".into()),
   2088         ];
   2089         assert_eq!(
   2090             shared::get_pending_user_messages(&messages),
   2091             "second\nthird\nfourth"
   2092         );
   2093     }
   2094 
   2095     #[test]
   2096     fn pending_messages_stops_at_non_user() {
   2097         let messages = vec![
   2098             Message::User("old".into()),
   2099             Message::User("also old".into()),
   2100             Message::Assistant(AssistantMessage::from_text("reply".into())),
   2101             Message::User("pending".into()),
   2102         ];
   2103         assert_eq!(shared::get_pending_user_messages(&messages), "pending");
   2104     }
   2105 
   2106     #[test]
   2107     fn pending_messages_empty_when_last_is_assistant() {
   2108         let messages = vec![
   2109             Message::User("hello".into()),
   2110             Message::Assistant(AssistantMessage::from_text("reply".into())),
   2111         ];
   2112         assert_eq!(shared::get_pending_user_messages(&messages), "");
   2113     }
   2114 
   2115     #[test]
   2116     fn pending_messages_empty_chat() {
   2117         let messages: Vec<Message> = vec![];
   2118         assert_eq!(shared::get_pending_user_messages(&messages), "");
   2119     }
   2120 
   2121     #[test]
   2122     fn pending_messages_stops_at_tool_response() {
   2123         let messages = vec![
   2124             Message::User("do something".into()),
   2125             Message::Assistant(AssistantMessage::from_text("ok".into())),
   2126             Message::ToolCalls(vec![crate::tools::ToolCall::invalid(
   2127                 "c1".into(),
   2128                 Some("Read".into()),
   2129                 None,
   2130                 "test".into(),
   2131             )]),
   2132             Message::ToolResponse(crate::tools::ToolResponse::error(
   2133                 "c1".into(),
   2134                 "result".into(),
   2135             )),
   2136             Message::User("queued 1".into()),
   2137             Message::User("queued 2".into()),
   2138         ];
   2139         assert_eq!(
   2140             shared::get_pending_user_messages(&messages),
   2141             "queued 1\nqueued 2"
   2142         );
   2143     }
   2144 
   2145     #[test]
   2146     fn pending_messages_preserves_order() {
   2147         let messages = vec![
   2148             Message::User("a".into()),
   2149             Message::User("b".into()),
   2150             Message::User("c".into()),
   2151         ];
   2152         assert_eq!(shared::get_pending_user_messages(&messages), "a\nb\nc");
   2153     }
   2154 
   2155     // -----------------------------------------------------------------------
   2156     // Integration tests — mock Codex server over duplex streams
   2157     // -----------------------------------------------------------------------
   2158 
   2159     /// Mock Codex server that speaks JSONL over duplex streams.
   2160     struct MockCodex {
   2161         /// Read what the actor writes (actor's "stdin" from mock's perspective).
   2162         reader: tokio::io::Lines<BufReader<tokio::io::DuplexStream>>,
   2163         /// Write what the actor reads (actor's "stdout" from mock's perspective).
   2164         writer: tokio::io::BufWriter<tokio::io::DuplexStream>,
   2165     }
   2166 
   2167     impl MockCodex {
   2168         /// Read one JSONL message sent by the actor.
   2169         async fn read_message(&mut self) -> RpcMessage {
   2170             let line = self.reader.next_line().await.unwrap().unwrap();
   2171             serde_json::from_str(&line).unwrap()
   2172         }
   2173 
   2174         /// Send a JSONL message to the actor.
   2175         async fn send_line(&mut self, value: &Value) {
   2176             let mut line = serde_json::to_string(value).unwrap();
   2177             line.push('\n');
   2178             self.writer.write_all(line.as_bytes()).await.unwrap();
   2179             self.writer.flush().await.unwrap();
   2180         }
   2181 
   2182         /// Handle the `initialize` → `initialized` handshake.
   2183         async fn handle_init(&mut self) {
   2184             let req = self.read_message().await;
   2185             assert_eq!(req.method.as_deref(), Some("initialize"));
   2186             let id = req.id.unwrap();
   2187             self.send_line(&json!({
   2188                 "id": id,
   2189                 "result": { "serverInfo": { "name": "mock-codex", "version": "0.0.0" } }
   2190             }))
   2191             .await;
   2192             let notif = self.read_message().await;
   2193             assert_eq!(notif.method.as_deref(), Some("initialized"));
   2194         }
   2195 
   2196         /// Handle `thread/start` and return the thread ID.
   2197         async fn handle_thread_start(&mut self) -> String {
   2198             let req = self.read_message().await;
   2199             assert_eq!(req.method.as_deref(), Some("thread/start"));
   2200             let id = req.id.unwrap();
   2201             let thread_id = "mock-thread-1";
   2202             self.send_line(&json!({
   2203                 "id": id,
   2204                 "result": { "thread": { "id": thread_id }, "model": "mock-model" }
   2205             }))
   2206             .await;
   2207             thread_id.to_string()
   2208         }
   2209 
   2210         /// Handle `turn/start` and return the turn ID.
   2211         async fn handle_turn_start(&mut self) -> String {
   2212             let req = self.read_message().await;
   2213             assert_eq!(req.method.as_deref(), Some("turn/start"));
   2214             let id = req.id.unwrap();
   2215             let turn_id = "mock-turn-1";
   2216             self.send_line(&json!({
   2217                 "id": id,
   2218                 "result": { "turn": { "id": turn_id } }
   2219             }))
   2220             .await;
   2221             turn_id.to_string()
   2222         }
   2223 
   2224         /// Send an `item/agentMessage/delta` notification.
   2225         async fn send_delta(&mut self, text: &str) {
   2226             self.send_line(&json!({
   2227                 "method": "item/agentMessage/delta",
   2228                 "params": { "delta": text }
   2229             }))
   2230             .await;
   2231         }
   2232 
   2233         /// Send a `turn/completed` notification.
   2234         async fn send_turn_completed(&mut self, status: &str) {
   2235             self.send_line(&json!({
   2236                 "method": "turn/completed",
   2237                 "params": { "status": status }
   2238             }))
   2239             .await;
   2240         }
   2241 
   2242         /// Send an `item/completed` notification.
   2243         async fn send_item_completed(&mut self, params: Value) {
   2244             self.send_line(&json!({
   2245                 "method": "item/completed",
   2246                 "params": params
   2247             }))
   2248             .await;
   2249         }
   2250 
   2251         /// Send an `item/started` notification.
   2252         async fn send_item_started(&mut self, params: Value) {
   2253             self.send_line(&json!({
   2254                 "method": "item/started",
   2255                 "params": params
   2256             }))
   2257             .await;
   2258         }
   2259 
   2260         /// Send an approval request (server→client request with id).
   2261         async fn send_approval_request(&mut self, rpc_id: u64, method: &str, params: Value) {
   2262             self.send_line(&json!({
   2263                 "id": rpc_id,
   2264                 "method": method,
   2265                 "params": params
   2266             }))
   2267             .await;
   2268         }
   2269     }
   2270 
   2271     /// Create a mock codex server and spawn the session actor loop.
   2272     /// Returns the mock, a command sender, and the actor task handle.
   2273     fn setup_integration_test() -> (
   2274         MockCodex,
   2275         tokio_mpsc::Sender<SessionCommand>,
   2276         tokio::task::JoinHandle<()>,
   2277     ) {
   2278         // "stdout" channel: mock writes → actor reads
   2279         let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192);
   2280         // "stdin" channel: actor writes → mock reads
   2281         let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192);
   2282 
   2283         let mock = MockCodex {
   2284             reader: BufReader::new(mock_stdin_read).lines(),
   2285             writer: tokio::io::BufWriter::new(mock_stdout_write),
   2286         };
   2287 
   2288         let actor_writer = tokio::io::BufWriter::new(actor_stdin_write);
   2289         let actor_reader = BufReader::new(actor_stdout_read).lines();
   2290 
   2291         let (command_tx, mut command_rx) = tokio_mpsc::channel(16);
   2292 
   2293         let handle = tokio::spawn(async move {
   2294             session_actor_loop(
   2295                 "test-session",
   2296                 actor_writer,
   2297                 actor_reader,
   2298                 Some("mock-model"),
   2299                 None,
   2300                 None,
   2301                 &mut command_rx,
   2302             )
   2303             .await;
   2304         });
   2305 
   2306         (mock, command_tx, handle)
   2307     }
   2308 
   2309     /// Send a Query command and return the response receiver.
   2310     async fn send_query(
   2311         command_tx: &tokio_mpsc::Sender<SessionCommand>,
   2312         prompt: &str,
   2313     ) -> mpsc::Receiver<DaveApiResponse> {
   2314         let (response_tx, response_rx) = mpsc::channel();
   2315         command_tx
   2316             .send(SessionCommand::Query {
   2317                 prompt: prompt.to_string(),
   2318                 response_tx,
   2319                 ctx: egui::Context::default(),
   2320             })
   2321             .await
   2322             .unwrap();
   2323         response_rx
   2324     }
   2325 
   2326     /// Collect all responses from the channel.
   2327     fn collect_responses(rx: &mpsc::Receiver<DaveApiResponse>) -> Vec<DaveApiResponse> {
   2328         rx.try_iter().collect()
   2329     }
   2330 
   2331     /// Drain and discard the initial SessionInfo that the first query emits.
   2332     fn drain_session_info(rx: &mpsc::Receiver<DaveApiResponse>) {
   2333         let resp = rx
   2334             .recv_timeout(Duration::from_secs(5))
   2335             .expect("timed out waiting for SessionInfo");
   2336         assert!(
   2337             matches!(resp, DaveApiResponse::SessionInfo(_)),
   2338             "Expected SessionInfo as first response, got {:?}",
   2339             std::mem::discriminant(&resp)
   2340         );
   2341     }
   2342 
   2343     // -- Integration tests --
   2344 
   2345     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2346     async fn test_integration_streaming_tokens() {
   2347         let (mut mock, command_tx, handle) = setup_integration_test();
   2348 
   2349         mock.handle_init().await;
   2350         mock.handle_thread_start().await;
   2351 
   2352         let response_rx = send_query(&command_tx, "Hello").await;
   2353         mock.handle_turn_start().await;
   2354 
   2355         mock.send_delta("Hello").await;
   2356         mock.send_delta(" world").await;
   2357         mock.send_delta("!").await;
   2358         mock.send_turn_completed("completed").await;
   2359 
   2360         // Drop sender — actor finishes processing remaining lines,
   2361         // then command_rx.recv() returns None and the loop exits.
   2362         drop(command_tx);
   2363         handle.await.unwrap();
   2364 
   2365         let tokens: Vec<String> = collect_responses(&response_rx)
   2366             .into_iter()
   2367             .filter_map(|r| match r {
   2368                 DaveApiResponse::Token(t) => Some(t),
   2369                 _ => None,
   2370             })
   2371             .collect();
   2372         assert_eq!(tokens, vec!["Hello", " world", "!"]);
   2373     }
   2374 
   2375     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2376     async fn test_integration_command_execution() {
   2377         let (mut mock, command_tx, handle) = setup_integration_test();
   2378 
   2379         mock.handle_init().await;
   2380         mock.handle_thread_start().await;
   2381 
   2382         let response_rx = send_query(&command_tx, "list files").await;
   2383         mock.handle_turn_start().await;
   2384 
   2385         mock.send_item_completed(json!({
   2386             "type": "commandExecution",
   2387             "command": "ls -la",
   2388             "exitCode": 0,
   2389             "output": "total 42\nfoo.rs\n"
   2390         }))
   2391         .await;
   2392         mock.send_turn_completed("completed").await;
   2393 
   2394         drop(command_tx);
   2395         handle.await.unwrap();
   2396 
   2397         let tool_results: Vec<_> = collect_responses(&response_rx)
   2398             .into_iter()
   2399             .filter_map(|r| match r {
   2400                 DaveApiResponse::ToolResult(t) => Some(t),
   2401                 _ => None,
   2402             })
   2403             .collect();
   2404         assert_eq!(tool_results.len(), 1);
   2405         assert_eq!(tool_results[0].tool_name, "Bash");
   2406     }
   2407 
   2408     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2409     async fn test_integration_file_change() {
   2410         let (mut mock, command_tx, handle) = setup_integration_test();
   2411 
   2412         mock.handle_init().await;
   2413         mock.handle_thread_start().await;
   2414 
   2415         let response_rx = send_query(&command_tx, "edit file").await;
   2416         mock.handle_turn_start().await;
   2417 
   2418         mock.send_item_completed(json!({
   2419             "type": "fileChange",
   2420             "filePath": "src/main.rs",
   2421             "diff": "@@ -1,3 +1,3 @@\n-old\n+new\n context\n",
   2422             "kind": { "type": "edit" }
   2423         }))
   2424         .await;
   2425         mock.send_turn_completed("completed").await;
   2426 
   2427         drop(command_tx);
   2428         handle.await.unwrap();
   2429 
   2430         let tool_results: Vec<_> = collect_responses(&response_rx)
   2431             .into_iter()
   2432             .filter_map(|r| match r {
   2433                 DaveApiResponse::ToolResult(t) => Some(t),
   2434                 _ => None,
   2435             })
   2436             .collect();
   2437         assert_eq!(tool_results.len(), 1);
   2438         assert_eq!(tool_results[0].tool_name, "Edit");
   2439     }
   2440 
   2441     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2442     async fn test_integration_approval_accept() {
   2443         let (mut mock, command_tx, handle) = setup_integration_test();
   2444 
   2445         mock.handle_init().await;
   2446         mock.handle_thread_start().await;
   2447 
   2448         let response_rx = send_query(&command_tx, "delete stuff").await;
   2449         mock.handle_turn_start().await;
   2450         drain_session_info(&response_rx);
   2451 
   2452         // Send a command that won't be auto-accepted
   2453         mock.send_approval_request(
   2454             42,
   2455             "item/commandExecution/requestApproval",
   2456             json!({ "command": "rm -rf /tmp/important" }),
   2457         )
   2458         .await;
   2459 
   2460         // Actor should forward a PermissionRequest
   2461         let resp = response_rx
   2462             .recv_timeout(Duration::from_secs(5))
   2463             .expect("timed out waiting for PermissionRequest");
   2464         let pending = match resp {
   2465             DaveApiResponse::PermissionRequest(p) => p,
   2466             other => panic!(
   2467                 "Expected PermissionRequest, got {:?}",
   2468                 std::mem::discriminant(&other)
   2469             ),
   2470         };
   2471         assert_eq!(pending.request.tool_name, "Bash");
   2472 
   2473         // Approve it
   2474         pending
   2475             .response_tx
   2476             .send(PermissionResponse::Allow { message: None })
   2477             .unwrap();
   2478 
   2479         // Mock should receive the acceptance
   2480         let approval_msg = mock.read_message().await;
   2481         assert_eq!(approval_msg.id, Some(42));
   2482         let result = approval_msg.result.unwrap();
   2483         assert_eq!(result["decision"], "accept");
   2484 
   2485         mock.send_turn_completed("completed").await;
   2486         drop(command_tx);
   2487         handle.await.unwrap();
   2488     }
   2489 
   2490     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2491     async fn test_integration_approval_deny() {
   2492         let (mut mock, command_tx, handle) = setup_integration_test();
   2493 
   2494         mock.handle_init().await;
   2495         mock.handle_thread_start().await;
   2496 
   2497         let response_rx = send_query(&command_tx, "dangerous").await;
   2498         mock.handle_turn_start().await;
   2499         drain_session_info(&response_rx);
   2500 
   2501         mock.send_approval_request(
   2502             99,
   2503             "item/commandExecution/requestApproval",
   2504             json!({ "command": "sudo rm -rf /" }),
   2505         )
   2506         .await;
   2507 
   2508         let resp = response_rx
   2509             .recv_timeout(Duration::from_secs(5))
   2510             .expect("timed out waiting for PermissionRequest");
   2511         let pending = match resp {
   2512             DaveApiResponse::PermissionRequest(p) => p,
   2513             _ => panic!("Expected PermissionRequest"),
   2514         };
   2515 
   2516         // Deny it
   2517         pending
   2518             .response_tx
   2519             .send(PermissionResponse::Deny {
   2520                 reason: "too dangerous".to_string(),
   2521             })
   2522             .unwrap();
   2523 
   2524         // Mock should receive the decline
   2525         let approval_msg = mock.read_message().await;
   2526         assert_eq!(approval_msg.id, Some(99));
   2527         let result = approval_msg.result.unwrap();
   2528         assert_eq!(result["decision"], "decline");
   2529 
   2530         mock.send_turn_completed("completed").await;
   2531         drop(command_tx);
   2532         handle.await.unwrap();
   2533     }
   2534 
   2535     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2536     async fn test_integration_auto_accept() {
   2537         let (mut mock, command_tx, handle) = setup_integration_test();
   2538 
   2539         mock.handle_init().await;
   2540         mock.handle_thread_start().await;
   2541 
   2542         let response_rx = send_query(&command_tx, "check status").await;
   2543         mock.handle_turn_start().await;
   2544 
   2545         // "git status" should be auto-accepted
   2546         mock.send_approval_request(
   2547             50,
   2548             "item/commandExecution/requestApproval",
   2549             json!({ "command": "git status" }),
   2550         )
   2551         .await;
   2552 
   2553         // Mock should receive the auto-acceptance immediately (no UI involved)
   2554         let approval_msg = mock.read_message().await;
   2555         assert_eq!(approval_msg.id, Some(50));
   2556         let result = approval_msg.result.unwrap();
   2557         assert_eq!(result["decision"], "accept");
   2558 
   2559         // No PermissionRequest should have been sent
   2560         // (the response_rx should be empty or only have non-permission items)
   2561         mock.send_turn_completed("completed").await;
   2562 
   2563         drop(command_tx);
   2564         handle.await.unwrap();
   2565 
   2566         let permission_requests: Vec<_> = collect_responses(&response_rx)
   2567             .into_iter()
   2568             .filter(|r| matches!(r, DaveApiResponse::PermissionRequest(_)))
   2569             .collect();
   2570         assert!(
   2571             permission_requests.is_empty(),
   2572             "Auto-accepted commands should not generate PermissionRequests"
   2573         );
   2574     }
   2575 
   2576     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2577     async fn test_integration_multiple_turns() {
   2578         let (mut mock, command_tx, handle) = setup_integration_test();
   2579 
   2580         mock.handle_init().await;
   2581         mock.handle_thread_start().await;
   2582 
   2583         // First turn
   2584         let rx1 = send_query(&command_tx, "first").await;
   2585         mock.handle_turn_start().await;
   2586         drain_session_info(&rx1);
   2587         mock.send_delta("reply 1").await;
   2588         mock.send_turn_completed("completed").await;
   2589 
   2590         // Wait for the first turn's token to confirm the actor is processing
   2591         let resp = rx1
   2592             .recv_timeout(Duration::from_secs(5))
   2593             .expect("timed out waiting for first turn token");
   2594         assert!(matches!(resp, DaveApiResponse::Token(_)));
   2595 
   2596         // Brief yield for turn_completed to be processed
   2597         tokio::time::sleep(Duration::from_millis(100)).await;
   2598 
   2599         // Second turn
   2600         let rx2 = send_query(&command_tx, "second").await;
   2601         mock.handle_turn_start().await;
   2602         mock.send_delta("reply 2").await;
   2603         mock.send_turn_completed("completed").await;
   2604 
   2605         drop(command_tx);
   2606         handle.await.unwrap();
   2607 
   2608         let tokens2: Vec<String> = collect_responses(&rx2)
   2609             .into_iter()
   2610             .filter_map(|r| match r {
   2611                 DaveApiResponse::Token(t) => Some(t),
   2612                 _ => None,
   2613             })
   2614             .collect();
   2615         assert_eq!(tokens2, vec!["reply 2"]);
   2616     }
   2617 
   2618     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2619     async fn test_integration_subagent_lifecycle() {
   2620         let (mut mock, command_tx, handle) = setup_integration_test();
   2621 
   2622         mock.handle_init().await;
   2623         mock.handle_thread_start().await;
   2624 
   2625         let response_rx = send_query(&command_tx, "research").await;
   2626         mock.handle_turn_start().await;
   2627 
   2628         // Subagent starts
   2629         mock.send_item_started(json!({
   2630             "type": "collabAgentToolCall",
   2631             "itemId": "agent-42",
   2632             "name": "research agent"
   2633         }))
   2634         .await;
   2635 
   2636         // Command inside subagent
   2637         mock.send_item_completed(json!({
   2638             "type": "commandExecution",
   2639             "command": "grep -r pattern .",
   2640             "exitCode": 0,
   2641             "output": "found 3 matches"
   2642         }))
   2643         .await;
   2644 
   2645         // Subagent completes
   2646         mock.send_item_completed(json!({
   2647             "type": "collabAgentToolCall",
   2648             "itemId": "agent-42",
   2649             "result": "Found relevant information"
   2650         }))
   2651         .await;
   2652 
   2653         mock.send_turn_completed("completed").await;
   2654 
   2655         drop(command_tx);
   2656         handle.await.unwrap();
   2657 
   2658         let responses = collect_responses(&response_rx);
   2659 
   2660         // Should have: SubagentSpawned, ToolResult (with parent), SubagentCompleted
   2661         let spawned: Vec<_> = responses
   2662             .iter()
   2663             .filter(|r| matches!(r, DaveApiResponse::SubagentSpawned(_)))
   2664             .collect();
   2665         assert_eq!(spawned.len(), 1);
   2666 
   2667         let tool_results: Vec<_> = responses
   2668             .iter()
   2669             .filter_map(|r| match r {
   2670                 DaveApiResponse::ToolResult(t) => Some(t),
   2671                 _ => None,
   2672             })
   2673             .collect();
   2674         assert_eq!(tool_results.len(), 1);
   2675         assert_eq!(tool_results[0].parent_task_id.as_deref(), Some("agent-42"));
   2676 
   2677         let completed: Vec<_> = responses
   2678             .iter()
   2679             .filter(|r| matches!(r, DaveApiResponse::SubagentCompleted { .. }))
   2680             .collect();
   2681         assert_eq!(completed.len(), 1);
   2682     }
   2683 
   2684     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2685     async fn test_integration_shutdown_during_stream() {
   2686         let (mut mock, command_tx, handle) = setup_integration_test();
   2687 
   2688         mock.handle_init().await;
   2689         mock.handle_thread_start().await;
   2690 
   2691         let response_rx = send_query(&command_tx, "long task").await;
   2692         mock.handle_turn_start().await;
   2693         drain_session_info(&response_rx);
   2694 
   2695         mock.send_delta("partial").await;
   2696 
   2697         // Wait for token to arrive before sending Shutdown
   2698         let resp = response_rx
   2699             .recv_timeout(Duration::from_secs(5))
   2700             .expect("timed out waiting for token");
   2701         assert!(
   2702             matches!(&resp, DaveApiResponse::Token(t) if t == "partial"),
   2703             "Expected Token(\"partial\")"
   2704         );
   2705 
   2706         // Now shutdown while still inside the turn (no turn_completed sent)
   2707         command_tx.send(SessionCommand::Shutdown).await.unwrap();
   2708         handle.await.unwrap();
   2709     }
   2710 
   2711     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2712     async fn test_integration_process_eof() {
   2713         let (mut mock, command_tx, handle) = setup_integration_test();
   2714 
   2715         mock.handle_init().await;
   2716         mock.handle_thread_start().await;
   2717 
   2718         let response_rx = send_query(&command_tx, "hello").await;
   2719         mock.handle_turn_start().await;
   2720         drain_session_info(&response_rx);
   2721 
   2722         mock.send_delta("partial").await;
   2723 
   2724         // Drop the mock's writer — simulates process exit
   2725         drop(mock.writer);
   2726 
   2727         // Actor should detect EOF and send a Failed response
   2728         let failed = response_rx
   2729             .recv_timeout(Duration::from_secs(5))
   2730             .expect("timed out waiting for response after EOF");
   2731 
   2732         // First response might be the token, keep reading
   2733         let mut got_failed = false;
   2734 
   2735         match failed {
   2736             DaveApiResponse::Token(t) => {
   2737                 assert_eq!(t, "partial");
   2738             }
   2739             DaveApiResponse::Failed(_) => got_failed = true,
   2740             _ => {}
   2741         }
   2742 
   2743         if !got_failed {
   2744             let resp = response_rx
   2745                 .recv_timeout(Duration::from_secs(5))
   2746                 .expect("timed out waiting for Failed after EOF");
   2747             match resp {
   2748                 DaveApiResponse::Failed(msg) => {
   2749                     assert!(
   2750                         msg.contains("exited unexpectedly") || msg.contains("EOF"),
   2751                         "Unexpected error message: {}",
   2752                         msg
   2753                     );
   2754                 }
   2755                 other => panic!(
   2756                     "Expected Failed after EOF, got {:?}",
   2757                     std::mem::discriminant(&other)
   2758                 ),
   2759             }
   2760         }
   2761 
   2762         // Actor should exit after EOF
   2763         command_tx.send(SessionCommand::Shutdown).await.ok(); // might fail if actor already exited
   2764         handle.await.unwrap();
   2765     }
   2766 
   2767     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2768     async fn test_integration_init_failure() {
   2769         // "stdout" channel: mock writes → actor reads
   2770         let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192);
   2771         // "stdin" channel: actor writes → mock reads
   2772         let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192);
   2773 
   2774         let mut mock_reader = BufReader::new(mock_stdin_read).lines();
   2775         let mut mock_writer = tokio::io::BufWriter::new(mock_stdout_write);
   2776 
   2777         let actor_writer = tokio::io::BufWriter::new(actor_stdin_write);
   2778         let actor_reader = BufReader::new(actor_stdout_read).lines();
   2779 
   2780         let (command_tx, mut command_rx) = tokio_mpsc::channel(16);
   2781 
   2782         let handle = tokio::spawn(async move {
   2783             session_actor_loop(
   2784                 "test-session",
   2785                 actor_writer,
   2786                 actor_reader,
   2787                 Some("mock-model"),
   2788                 None,
   2789                 None,
   2790                 &mut command_rx,
   2791             )
   2792             .await;
   2793         });
   2794 
   2795         // Read the initialize request
   2796         let line = mock_reader.next_line().await.unwrap().unwrap();
   2797         let req: RpcMessage = serde_json::from_str(&line).unwrap();
   2798         let id = req.id.unwrap();
   2799 
   2800         // Send an error response
   2801         let error_resp = json!({
   2802             "id": id,
   2803             "error": { "code": -1, "message": "mock init failure" }
   2804         });
   2805         let mut error_line = serde_json::to_string(&error_resp).unwrap();
   2806         error_line.push('\n');
   2807         mock_writer.write_all(error_line.as_bytes()).await.unwrap();
   2808         mock_writer.flush().await.unwrap();
   2809 
   2810         // The actor should drain commands with error. Send a query and a shutdown.
   2811         let (response_tx, response_rx) = mpsc::channel();
   2812         command_tx
   2813             .send(SessionCommand::Query {
   2814                 prompt: "hello".to_string(),
   2815                 response_tx,
   2816                 ctx: egui::Context::default(),
   2817             })
   2818             .await
   2819             .unwrap();
   2820         command_tx.send(SessionCommand::Shutdown).await.unwrap();
   2821 
   2822         handle.await.unwrap();
   2823 
   2824         // The query should have received an error
   2825         let resp = response_rx
   2826             .recv_timeout(Duration::from_secs(5))
   2827             .expect("expected error response after init failure");
   2828         match resp {
   2829             DaveApiResponse::Failed(msg) => {
   2830                 assert!(
   2831                     msg.contains("init handshake"),
   2832                     "Expected init handshake error, got: {}",
   2833                     msg
   2834                 );
   2835             }
   2836             other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
   2837         }
   2838     }
   2839 
   2840     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2841     async fn test_integration_turn_error() {
   2842         let (mut mock, command_tx, handle) = setup_integration_test();
   2843 
   2844         mock.handle_init().await;
   2845         mock.handle_thread_start().await;
   2846 
   2847         let response_rx = send_query(&command_tx, "hello").await;
   2848 
   2849         // Read turn/start request and send an error response
   2850         let req = mock.read_message().await;
   2851         assert_eq!(req.method.as_deref(), Some("turn/start"));
   2852         let id = req.id.unwrap();
   2853         mock.send_line(&json!({
   2854             "id": id,
   2855             "error": { "code": -32000, "message": "rate limit exceeded" }
   2856         }))
   2857         .await;
   2858 
   2859         // Give actor time to process
   2860         tokio::time::sleep(Duration::from_millis(100)).await;
   2861 
   2862         command_tx.send(SessionCommand::Shutdown).await.unwrap();
   2863         handle.await.unwrap();
   2864 
   2865         let responses = collect_responses(&response_rx);
   2866         let failures: Vec<_> = responses
   2867             .iter()
   2868             .filter_map(|r| match r {
   2869                 DaveApiResponse::Failed(msg) => Some(msg.clone()),
   2870                 _ => None,
   2871             })
   2872             .collect();
   2873         assert_eq!(failures.len(), 1);
   2874         assert_eq!(failures[0], "rate limit exceeded");
   2875     }
   2876 
   2877     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2878     async fn test_integration_file_change_approval() {
   2879         let (mut mock, command_tx, handle) = setup_integration_test();
   2880 
   2881         mock.handle_init().await;
   2882         mock.handle_thread_start().await;
   2883 
   2884         let response_rx = send_query(&command_tx, "create file").await;
   2885         mock.handle_turn_start().await;
   2886         drain_session_info(&response_rx);
   2887 
   2888         // File change approval request (create)
   2889         mock.send_approval_request(
   2890             77,
   2891             "item/fileChange/requestApproval",
   2892             json!({
   2893                 "filePath": "new_file.rs",
   2894                 "diff": "+fn main() {}",
   2895                 "kind": { "type": "create" }
   2896             }),
   2897         )
   2898         .await;
   2899 
   2900         let resp = response_rx
   2901             .recv_timeout(Duration::from_secs(5))
   2902             .expect("timed out waiting for PermissionRequest");
   2903         let pending = match resp {
   2904             DaveApiResponse::PermissionRequest(p) => p,
   2905             other => panic!(
   2906                 "Expected PermissionRequest, got {:?}",
   2907                 std::mem::discriminant(&other)
   2908             ),
   2909         };
   2910         // File create should map to "Write" tool
   2911         assert_eq!(pending.request.tool_name, "Write");
   2912 
   2913         pending
   2914             .response_tx
   2915             .send(PermissionResponse::Allow { message: None })
   2916             .unwrap();
   2917 
   2918         let approval_msg = mock.read_message().await;
   2919         assert_eq!(approval_msg.id, Some(77));
   2920         assert_eq!(approval_msg.result.unwrap()["decision"], "accept");
   2921 
   2922         mock.send_turn_completed("completed").await;
   2923         drop(command_tx);
   2924         handle.await.unwrap();
   2925     }
   2926 
   2927     /// Create a mock codex server with `resume_session_id` set, so the actor
   2928     /// sends `thread/resume` instead of `thread/start`.
   2929     fn setup_integration_test_with_resume(
   2930         resume_id: &str,
   2931     ) -> (
   2932         MockCodex,
   2933         tokio_mpsc::Sender<SessionCommand>,
   2934         tokio::task::JoinHandle<()>,
   2935     ) {
   2936         let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192);
   2937         let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192);
   2938 
   2939         let mock = MockCodex {
   2940             reader: BufReader::new(mock_stdin_read).lines(),
   2941             writer: tokio::io::BufWriter::new(mock_stdout_write),
   2942         };
   2943 
   2944         let actor_writer = tokio::io::BufWriter::new(actor_stdin_write);
   2945         let actor_reader = BufReader::new(actor_stdout_read).lines();
   2946 
   2947         let (command_tx, mut command_rx) = tokio_mpsc::channel(16);
   2948         let resume_id = resume_id.to_string();
   2949 
   2950         let handle = tokio::spawn(async move {
   2951             session_actor_loop(
   2952                 "test-session-resume",
   2953                 actor_writer,
   2954                 actor_reader,
   2955                 Some("mock-model"),
   2956                 None,
   2957                 Some(&resume_id),
   2958                 &mut command_rx,
   2959             )
   2960             .await;
   2961         });
   2962 
   2963         (mock, command_tx, handle)
   2964     }
   2965 
   2966     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   2967     async fn test_integration_interrupt_during_stream() {
   2968         let (mut mock, command_tx, handle) = setup_integration_test();
   2969 
   2970         mock.handle_init().await;
   2971         mock.handle_thread_start().await;
   2972 
   2973         let response_rx = send_query(&command_tx, "count to 100").await;
   2974         mock.handle_turn_start().await;
   2975         drain_session_info(&response_rx);
   2976 
   2977         // Send a few tokens
   2978         mock.send_delta("one ").await;
   2979         mock.send_delta("two ").await;
   2980 
   2981         // Give actor time to process the tokens
   2982         tokio::time::sleep(Duration::from_millis(50)).await;
   2983 
   2984         // Verify we got them
   2985         let tok1 = response_rx
   2986             .recv_timeout(Duration::from_secs(2))
   2987             .expect("expected token 1");
   2988         assert!(matches!(tok1, DaveApiResponse::Token(ref t) if t == "one "));
   2989 
   2990         // Send interrupt
   2991         command_tx
   2992             .send(SessionCommand::Interrupt {
   2993                 ctx: egui::Context::default(),
   2994             })
   2995             .await
   2996             .unwrap();
   2997 
   2998         // The actor should send turn/interrupt to codex
   2999         let interrupt_msg = mock.read_message().await;
   3000         assert_eq!(interrupt_msg.method.as_deref(), Some("turn/interrupt"));
   3001 
   3002         // Codex responds with turn/completed after interrupt
   3003         mock.send_turn_completed("interrupted").await;
   3004 
   3005         // Actor should be ready for next command now
   3006         drop(command_tx);
   3007         handle.await.unwrap();
   3008 
   3009         // Verify we got the tokens before interrupt
   3010         let responses = collect_responses(&response_rx);
   3011         let tokens: Vec<_> = responses
   3012             .iter()
   3013             .filter_map(|r| match r {
   3014                 DaveApiResponse::Token(t) => Some(t.clone()),
   3015                 _ => None,
   3016             })
   3017             .collect();
   3018         assert!(tokens.contains(&"two ".to_string()));
   3019     }
   3020 
   3021     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   3022     async fn test_integration_interrupt_during_approval() {
   3023         let (mut mock, command_tx, handle) = setup_integration_test();
   3024 
   3025         mock.handle_init().await;
   3026         mock.handle_thread_start().await;
   3027 
   3028         let response_rx = send_query(&command_tx, "run something").await;
   3029         mock.handle_turn_start().await;
   3030         drain_session_info(&response_rx);
   3031 
   3032         // Send an approval request
   3033         mock.send_approval_request(
   3034             50,
   3035             "item/commandExecution/requestApproval",
   3036             json!({ "command": "rm -rf /" }),
   3037         )
   3038         .await;
   3039 
   3040         // Wait for the PermissionRequest to arrive at the test
   3041         let resp = response_rx
   3042             .recv_timeout(Duration::from_secs(5))
   3043             .expect("timed out waiting for PermissionRequest");
   3044         match resp {
   3045             DaveApiResponse::PermissionRequest(_pending) => {
   3046                 // Don't respond to the pending permission — send interrupt instead
   3047             }
   3048             other => panic!(
   3049                 "Expected PermissionRequest, got {:?}",
   3050                 std::mem::discriminant(&other)
   3051             ),
   3052         }
   3053 
   3054         // Send interrupt while approval is pending
   3055         command_tx
   3056             .send(SessionCommand::Interrupt {
   3057                 ctx: egui::Context::default(),
   3058             })
   3059             .await
   3060             .unwrap();
   3061 
   3062         // Actor should send cancel for the approval
   3063         let cancel_msg = mock.read_message().await;
   3064         assert_eq!(cancel_msg.id, Some(50));
   3065         assert_eq!(cancel_msg.result.unwrap()["decision"], "cancel");
   3066 
   3067         // Then send turn/interrupt
   3068         let interrupt_msg = mock.read_message().await;
   3069         assert_eq!(interrupt_msg.method.as_deref(), Some("turn/interrupt"));
   3070 
   3071         // Codex completes the turn
   3072         mock.send_turn_completed("interrupted").await;
   3073 
   3074         drop(command_tx);
   3075         handle.await.unwrap();
   3076     }
   3077 
   3078     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   3079     async fn test_integration_query_during_active_turn() {
   3080         let (mut mock, command_tx, handle) = setup_integration_test();
   3081 
   3082         mock.handle_init().await;
   3083         mock.handle_thread_start().await;
   3084 
   3085         let response_rx1 = send_query(&command_tx, "first query").await;
   3086         mock.handle_turn_start().await;
   3087 
   3088         // Send some tokens so the turn is clearly active
   3089         mock.send_delta("working...").await;
   3090 
   3091         // Give actor time to enter the streaming loop
   3092         tokio::time::sleep(Duration::from_millis(50)).await;
   3093 
   3094         // Send a second query while the first is still active
   3095         let response_rx2 = send_query(&command_tx, "second query").await;
   3096 
   3097         // The second query should be immediately rejected
   3098         let rejection = response_rx2
   3099             .recv_timeout(Duration::from_secs(5))
   3100             .expect("timed out waiting for rejection");
   3101         match rejection {
   3102             DaveApiResponse::Failed(msg) => {
   3103                 assert_eq!(msg, "Query already in progress");
   3104             }
   3105             other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
   3106         }
   3107 
   3108         // First query continues normally
   3109         mock.send_turn_completed("completed").await;
   3110 
   3111         drop(command_tx);
   3112         handle.await.unwrap();
   3113 
   3114         // Verify first query got its token
   3115         let responses = collect_responses(&response_rx1);
   3116         let tokens: Vec<_> = responses
   3117             .iter()
   3118             .filter_map(|r| match r {
   3119                 DaveApiResponse::Token(t) => Some(t.clone()),
   3120                 _ => None,
   3121             })
   3122             .collect();
   3123         assert!(tokens.contains(&"working...".to_string()));
   3124     }
   3125 
   3126     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   3127     async fn test_integration_thread_resume() {
   3128         let (mut mock, command_tx, handle) =
   3129             setup_integration_test_with_resume("existing-thread-42");
   3130 
   3131         // Init handshake is the same
   3132         mock.handle_init().await;
   3133 
   3134         // Actor should send thread/resume instead of thread/start
   3135         let req = mock.read_message().await;
   3136         assert_eq!(req.method.as_deref(), Some("thread/resume"));
   3137         let params = req.params.unwrap();
   3138         assert_eq!(params["threadId"], "existing-thread-42");
   3139 
   3140         // Respond with success
   3141         let id = req.id.unwrap();
   3142         mock.send_line(&json!({
   3143             "id": id,
   3144             "result": { "thread": { "id": "existing-thread-42" } }
   3145         }))
   3146         .await;
   3147 
   3148         // Now send a query — should work the same as normal
   3149         let response_rx = send_query(&command_tx, "resume prompt").await;
   3150         mock.handle_turn_start().await;
   3151         mock.send_delta("resumed!").await;
   3152         mock.send_turn_completed("completed").await;
   3153 
   3154         drop(command_tx);
   3155         handle.await.unwrap();
   3156 
   3157         let responses = collect_responses(&response_rx);
   3158         let tokens: Vec<_> = responses
   3159             .iter()
   3160             .filter_map(|r| match r {
   3161                 DaveApiResponse::Token(t) => Some(t.clone()),
   3162                 _ => None,
   3163             })
   3164             .collect();
   3165         assert_eq!(tokens, vec!["resumed!"]);
   3166     }
   3167 
   3168     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   3169     async fn test_integration_malformed_jsonl() {
   3170         let (mut mock, command_tx, handle) = setup_integration_test();
   3171 
   3172         mock.handle_init().await;
   3173         mock.handle_thread_start().await;
   3174 
   3175         let response_rx = send_query(&command_tx, "test").await;
   3176         mock.handle_turn_start().await;
   3177 
   3178         // Send valid token
   3179         mock.send_delta("before").await;
   3180 
   3181         // Send garbage that isn't valid JSON
   3182         let mut garbage = "this is not json at all\n".to_string();
   3183         mock.writer.write_all(garbage.as_bytes()).await.unwrap();
   3184         mock.writer.flush().await.unwrap();
   3185 
   3186         // Send another valid token after the garbage
   3187         mock.send_delta("after").await;
   3188 
   3189         // Complete the turn
   3190         mock.send_turn_completed("completed").await;
   3191 
   3192         drop(command_tx);
   3193         handle.await.unwrap();
   3194 
   3195         // Both valid tokens should have been received — the garbage line
   3196         // should have been skipped with a warning, not crash the actor
   3197         let responses = collect_responses(&response_rx);
   3198         let tokens: Vec<_> = responses
   3199             .iter()
   3200             .filter_map(|r| match r {
   3201                 DaveApiResponse::Token(t) => Some(t.clone()),
   3202                 _ => None,
   3203             })
   3204             .collect();
   3205         assert!(
   3206             tokens.contains(&"before".to_string()),
   3207             "Missing 'before' token, got: {:?}",
   3208             tokens
   3209         );
   3210         assert!(
   3211             tokens.contains(&"after".to_string()),
   3212             "Missing 'after' token after malformed line, got: {:?}",
   3213             tokens
   3214         );
   3215     }
   3216 
   3217     // -----------------------------------------------------------------------
   3218     // Real-binary integration tests — require `codex` on PATH
   3219     // Run with: cargo test -p notedeck_dave -- --ignored
   3220     // -----------------------------------------------------------------------
   3221 
   3222     /// Helper: spawn a real codex app-server process and wire it into
   3223     /// `session_actor_loop`. Returns the command sender, response receiver,
   3224     /// and join handle.
   3225     fn setup_real_codex_test() -> (
   3226         tokio_mpsc::Sender<SessionCommand>,
   3227         mpsc::Receiver<DaveApiResponse>,
   3228         tokio::task::JoinHandle<()>,
   3229     ) {
   3230         let codex_binary = std::env::var("CODEX_BINARY").unwrap_or_else(|_| "codex".to_string());
   3231 
   3232         let mut child = spawn_codex(&codex_binary, &None)
   3233             .expect("Failed to spawn codex app-server — is codex installed?");
   3234 
   3235         let stdin = child.stdin.take().expect("stdin piped");
   3236         let stdout = child.stdout.take().expect("stdout piped");
   3237 
   3238         // Drain stderr to prevent pipe deadlock
   3239         if let Some(stderr) = child.stderr.take() {
   3240             tokio::spawn(async move {
   3241                 let mut lines = BufReader::new(stderr).lines();
   3242                 while let Ok(Some(line)) = lines.next_line().await {
   3243                     eprintln!("[codex stderr] {}", line);
   3244                 }
   3245             });
   3246         }
   3247 
   3248         let writer = tokio::io::BufWriter::new(stdin);
   3249         let reader = BufReader::new(stdout).lines();
   3250 
   3251         let (command_tx, mut command_rx) = tokio_mpsc::channel(16);
   3252 
   3253         let handle = tokio::spawn(async move {
   3254             session_actor_loop(
   3255                 "real-codex-test",
   3256                 writer,
   3257                 reader,
   3258                 None, // use codex default model
   3259                 None, // use current directory
   3260                 None, // no resume
   3261                 &mut command_rx,
   3262             )
   3263             .await;
   3264             let _ = child.kill().await;
   3265         });
   3266 
   3267         let (response_tx, response_rx) = mpsc::channel();
   3268         // Send an initial query to trigger handshake + thread start + turn
   3269         let command_tx_clone = command_tx.clone();
   3270         let rt_handle = tokio::runtime::Handle::current();
   3271         rt_handle.spawn(async move {
   3272             command_tx_clone
   3273                 .send(SessionCommand::Query {
   3274                     prompt: "Say exactly: hello world".to_string(),
   3275                     response_tx,
   3276                     ctx: egui::Context::default(),
   3277                 })
   3278                 .await
   3279                 .unwrap();
   3280         });
   3281 
   3282         (command_tx, response_rx, handle)
   3283     }
   3284 
   3285     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   3286     #[ignore] // Requires `codex` binary on PATH
   3287     async fn test_real_codex_streaming() {
   3288         let (command_tx, response_rx, handle) = setup_real_codex_test();
   3289 
   3290         // Wait for at least one token (with a generous timeout for API calls)
   3291         let mut got_token = false;
   3292         let deadline = std::time::Instant::now() + Duration::from_secs(60);
   3293 
   3294         while std::time::Instant::now() < deadline {
   3295             match response_rx.recv_timeout(Duration::from_secs(1)) {
   3296                 Ok(DaveApiResponse::Token(t)) => {
   3297                     eprintln!("[test] got token: {:?}", t);
   3298                     got_token = true;
   3299                 }
   3300                 Ok(DaveApiResponse::PermissionRequest(pending)) => {
   3301                     // Auto-accept any permission requests during test
   3302                     eprintln!(
   3303                         "[test] auto-accepting permission: {}",
   3304                         pending.request.tool_name
   3305                     );
   3306                     let _ = pending
   3307                         .response_tx
   3308                         .send(PermissionResponse::Allow { message: None });
   3309                 }
   3310                 Ok(DaveApiResponse::Failed(msg)) => {
   3311                     panic!("[test] codex turn failed: {}", msg);
   3312                 }
   3313                 Ok(other) => {
   3314                     eprintln!("[test] got response: {:?}", std::mem::discriminant(&other));
   3315                 }
   3316                 Err(mpsc::RecvTimeoutError::Timeout) => {
   3317                     if got_token {
   3318                         break; // Got at least one token; stop waiting
   3319                     }
   3320                 }
   3321                 Err(mpsc::RecvTimeoutError::Disconnected) => break,
   3322             }
   3323         }
   3324 
   3325         assert!(
   3326             got_token,
   3327             "Expected at least one Token response from real codex"
   3328         );
   3329 
   3330         drop(command_tx);
   3331         let _ = tokio::time::timeout(Duration::from_secs(10), handle).await;
   3332     }
   3333 
   3334     #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
   3335     #[ignore] // Requires `codex` binary on PATH
   3336     async fn test_real_codex_turn_completes() {
   3337         let (command_tx, response_rx, handle) = setup_real_codex_test();
   3338 
   3339         // Wait for turn to complete
   3340         let mut got_turn_done = false;
   3341         let mut got_any_response = false;
   3342         let deadline = std::time::Instant::now() + Duration::from_secs(120);
   3343 
   3344         while std::time::Instant::now() < deadline {
   3345             match response_rx.recv_timeout(Duration::from_secs(2)) {
   3346                 Ok(DaveApiResponse::Token(_)) => {
   3347                     got_any_response = true;
   3348                 }
   3349                 Ok(DaveApiResponse::PermissionRequest(pending)) => {
   3350                     got_any_response = true;
   3351                     let _ = pending
   3352                         .response_tx
   3353                         .send(PermissionResponse::Allow { message: None });
   3354                 }
   3355                 Ok(DaveApiResponse::Failed(msg)) => {
   3356                     eprintln!("[test] turn failed: {}", msg);
   3357                     // A failure is still a "completion" — codex responded
   3358                     got_turn_done = true;
   3359                     break;
   3360                 }
   3361                 Ok(_) => {
   3362                     got_any_response = true;
   3363                 }
   3364                 Err(mpsc::RecvTimeoutError::Timeout) => {
   3365                     if got_any_response {
   3366                         // Responses have stopped coming — turn likely completed
   3367                         // (turn/completed causes the actor to stop sending
   3368                         //  and wait for the next command)
   3369                         got_turn_done = true;
   3370                         break;
   3371                     }
   3372                 }
   3373                 Err(mpsc::RecvTimeoutError::Disconnected) => {
   3374                     got_turn_done = true;
   3375                     break;
   3376                 }
   3377             }
   3378         }
   3379 
   3380         assert!(
   3381             got_turn_done,
   3382             "Expected real codex turn to complete within timeout"
   3383         );
   3384 
   3385         drop(command_tx);
   3386         let _ = tokio::time::timeout(Duration::from_secs(10), handle).await;
   3387     }
   3388 }