commit 29b0a37a0f5132fda3bd51033892f50d3e04c1cc
parent 24747d7bdc38ba1feb67ada5faed070d5842bd72
Author: William Casarin <jb55@jb55.com>
Date: Tue, 24 Feb 2026 12:22:20 -0800
Merge codex backend, context bar, and nostrverse object editing
dave:
- Add Codex backend via app-server JSON-RPC protocol with
actor-per-session pattern, non-blocking approval flow, and
binary-on-PATH auto-detection for backend selection
- Add integration tests for Codex (18 tests covering streaming,
approvals, subagents, interrupts, error handling)
- Use hostname matching for local vs remote session detection
instead of cwd path existence (fixes cross-host misclassification)
- Add context window usage bar to status bar with green→yellow→red
fill as context fills
- Use seq tag as tiebreaker for message ordering within the same
second (fixes non-deterministic order during streaming)
nostrverse:
- Add click-to-select and drag-to-move objects in 3D viewport
with ray-AABB picking and ground plane unprojection
- Add outline shader (inverted hull method) for selected objects
- Add grid snap with configurable size (G key toggle)
- Add parented object dragging with surface clamping and breakaway
- Add object duplication (Ctrl+D)
- Add drag-to-snap for placing objects on surfaces (TopOf child)
- Add R key rotation controls with inspector Rot Y editor
- Persist rotation as human-readable Euler degrees in protoverse
- Add rotation snap when grid snap is enabled (default 15°)
- Fix f32→f64 precision noise in serialized numbers
- Fix subscription-based demo room loading after ingest
Diffstat:
25 files changed, 4359 insertions(+), 65 deletions(-)
diff --git a/crates/notedeck_dave/src/backend/claude.rs b/crates/notedeck_dave/src/backend/claude.rs
@@ -479,6 +479,30 @@ async fn session_actor(
.unwrap_or_else(|| "Unknown error".to_string());
let _ = response_tx.send(DaveApiResponse::Failed(error_text));
}
+
+ // Extract usage metrics
+ let (input_tokens, output_tokens) = result_msg
+ .usage
+ .as_ref()
+ .map(|u| {
+ let inp = u.get("input_tokens")
+ .and_then(|v| v.as_u64())
+ .unwrap_or(0);
+ let out = u.get("output_tokens")
+ .and_then(|v| v.as_u64())
+ .unwrap_or(0);
+ (inp, out)
+ })
+ .unwrap_or((0, 0));
+
+ let usage_info = crate::messages::UsageInfo {
+ input_tokens,
+ output_tokens,
+ cost_usd: result_msg.total_cost_usd,
+ num_turns: result_msg.num_turns,
+ };
+ let _ = response_tx.send(DaveApiResponse::QueryComplete(usage_info));
+
stream_done = true;
}
ClaudeMessage::User(user_msg) => {
diff --git a/crates/notedeck_dave/src/backend/codex.rs b/crates/notedeck_dave/src/backend/codex.rs
@@ -0,0 +1,2848 @@
+//! Codex backend — orchestrates OpenAI's Codex CLI (`codex app-server`)
+//! via its JSON-RPC-over-stdio protocol.
+
+use super::codex_protocol::*;
+use super::tool_summary::{format_tool_summary, truncate_output};
+use crate::auto_accept::AutoAcceptRules;
+use crate::backend::traits::AiBackend;
+use crate::messages::{
+ CompactionInfo, DaveApiResponse, ExecutedTool, PendingPermission, PermissionRequest,
+ PermissionResponse, SubagentInfo, SubagentStatus,
+};
+use crate::tools::Tool;
+use crate::Message;
+use claude_agent_sdk_rs::PermissionMode;
+use dashmap::DashMap;
+use serde_json::Value;
+use std::collections::HashMap;
+use std::path::PathBuf;
+use std::sync::mpsc;
+use std::sync::Arc;
+use tokio::io::{AsyncBufRead, AsyncBufReadExt, AsyncWrite, AsyncWriteExt, BufReader};
+use tokio::process::{Child, Command};
+use tokio::sync::mpsc as tokio_mpsc;
+use tokio::sync::oneshot;
+use uuid::Uuid;
+
+// ---------------------------------------------------------------------------
+// Session actor
+// ---------------------------------------------------------------------------
+
+/// Commands sent to a Codex session actor.
+enum SessionCommand {
+ Query {
+ prompt: String,
+ response_tx: mpsc::Sender<DaveApiResponse>,
+ ctx: egui::Context,
+ },
+ Interrupt {
+ ctx: egui::Context,
+ },
+ SetPermissionMode {
+ mode: PermissionMode,
+ ctx: egui::Context,
+ },
+ Shutdown,
+}
+
+/// Handle kept by the backend to communicate with the actor.
+struct SessionHandle {
+ command_tx: tokio_mpsc::Sender<SessionCommand>,
+}
+
+/// Result of processing a single Codex JSON-RPC message.
+enum HandleResult {
+ /// Normal notification processed, keep reading.
+ Continue,
+ /// `turn/completed` received — this turn is done.
+ TurnDone,
+ /// Auto-accept matched — send accept for this rpc_id immediately.
+ AutoAccepted(u64),
+ /// Needs UI approval — stash the receiver and wait for the user.
+ NeedsApproval {
+ rpc_id: u64,
+ rx: oneshot::Receiver<PermissionResponse>,
+ },
+}
+
+/// Per-session actor that owns the `codex app-server` child process.
+async fn session_actor(
+ session_id: String,
+ cwd: Option<PathBuf>,
+ codex_binary: String,
+ model: Option<String>,
+ resume_session_id: Option<String>,
+ mut command_rx: tokio_mpsc::Receiver<SessionCommand>,
+) {
+ // Spawn the codex app-server child process
+ let mut child = match spawn_codex(&codex_binary, &cwd) {
+ Ok(c) => c,
+ Err(err) => {
+ tracing::error!("Session {} failed to spawn codex: {}", session_id, err);
+ drain_commands_with_error(&mut command_rx, &format!("Failed to spawn codex: {}", err))
+ .await;
+ return;
+ }
+ };
+
+ let stdin = child.stdin.take().expect("stdin piped");
+ let stdout = child.stdout.take().expect("stdout piped");
+
+ // Drain stderr in a background task to prevent pipe deadlock
+ if let Some(stderr) = child.stderr.take() {
+ let sid = session_id.clone();
+ tokio::spawn(async move {
+ let mut lines = BufReader::new(stderr).lines();
+ while let Ok(Some(line)) = lines.next_line().await {
+ tracing::trace!("Codex stderr [{}]: {}", sid, line);
+ }
+ });
+ }
+
+ let writer = tokio::io::BufWriter::new(stdin);
+ let reader = BufReader::new(stdout).lines();
+ let cwd_str = cwd.as_ref().map(|p| p.to_string_lossy().into_owned());
+
+ session_actor_loop(
+ &session_id,
+ writer,
+ reader,
+ model.as_deref(),
+ cwd_str.as_deref(),
+ resume_session_id.as_deref(),
+ &mut command_rx,
+ )
+ .await;
+
+ let _ = child.kill().await;
+ tracing::debug!("Session {} actor exited", session_id);
+}
+
+/// Core session loop, generic over I/O for testability.
+///
+/// Performs the init handshake, thread start/resume, and main command loop.
+/// Returns when the session is shut down or an unrecoverable error occurs.
+/// The caller is responsible for process lifecycle (spawn, kill).
+async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
+ session_id: &str,
+ mut writer: tokio::io::BufWriter<W>,
+ mut reader: tokio::io::Lines<R>,
+ model: Option<&str>,
+ cwd: Option<&str>,
+ resume_session_id: Option<&str>,
+ command_rx: &mut tokio_mpsc::Receiver<SessionCommand>,
+) {
+ // ---- init handshake ----
+ if let Err(err) = do_init_handshake(&mut writer, &mut reader).await {
+ tracing::error!("Session {} init handshake failed: {}", session_id, err);
+ drain_commands_with_error(command_rx, &format!("Codex init handshake failed: {}", err))
+ .await;
+ return;
+ }
+
+ // ---- thread start / resume ----
+ let thread_id = if let Some(tid) = resume_session_id {
+ match send_thread_resume(&mut writer, &mut reader, tid).await {
+ Ok(id) => id,
+ Err(err) => {
+ tracing::error!("Session {} thread/resume failed: {}", session_id, err);
+ drain_commands_with_error(
+ command_rx,
+ &format!("Codex thread/resume failed: {}", err),
+ )
+ .await;
+ return;
+ }
+ }
+ } else {
+ match send_thread_start(&mut writer, &mut reader, model, cwd).await {
+ Ok(id) => id,
+ Err(err) => {
+ tracing::error!("Session {} thread/start failed: {}", session_id, err);
+ drain_commands_with_error(
+ command_rx,
+ &format!("Codex thread/start failed: {}", err),
+ )
+ .await;
+ return;
+ }
+ }
+ };
+
+ tracing::info!(
+ "Session {} connected to codex, thread_id={}",
+ session_id,
+ thread_id
+ );
+
+ // ---- main command loop ----
+ let mut request_counter: u64 = 10; // start after init IDs
+ let mut current_turn_id: Option<String> = None;
+
+ while let Some(cmd) = command_rx.recv().await {
+ match cmd {
+ SessionCommand::Query {
+ prompt,
+ response_tx,
+ ctx,
+ } => {
+ // Send turn/start
+ request_counter += 1;
+ let turn_req_id = request_counter;
+ if let Err(err) =
+ send_turn_start(&mut writer, turn_req_id, &thread_id, &prompt, model).await
+ {
+ tracing::error!("Session {} turn/start failed: {}", session_id, err);
+ let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
+ continue;
+ }
+
+ // Read the turn/start response
+ match read_response_for_id(&mut reader, turn_req_id).await {
+ Ok(msg) => {
+ if let Some(err) = msg.error {
+ tracing::error!(
+ "Session {} turn/start error: {}",
+ session_id,
+ err.message
+ );
+ let _ = response_tx.send(DaveApiResponse::Failed(err.message));
+ continue;
+ }
+ if let Some(result) = &msg.result {
+ current_turn_id = result
+ .get("turn")
+ .and_then(|t| t.get("id"))
+ .and_then(|v| v.as_str())
+ .map(|s| s.to_string());
+ }
+ }
+ Err(err) => {
+ tracing::error!(
+ "Session {} failed reading turn/start response: {}",
+ session_id,
+ err
+ );
+ let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
+ continue;
+ }
+ }
+
+ // Stream notifications until turn/completed
+ let mut subagent_stack: Vec<String> = Vec::new();
+ let mut turn_done = false;
+ let mut pending_approval: Option<(u64, oneshot::Receiver<PermissionResponse>)> =
+ None;
+
+ while !turn_done {
+ if let Some((rpc_id, mut rx)) = pending_approval.take() {
+ // ---- approval-wait state ----
+ // Codex is blocked waiting for our response, so no new
+ // lines will arrive. Select between the UI response and
+ // commands (interrupt / shutdown).
+ tokio::select! {
+ biased;
+
+ Some(cmd) = command_rx.recv() => {
+ match cmd {
+ SessionCommand::Interrupt { ctx: int_ctx } => {
+ tracing::debug!("Session {} interrupted during approval", session_id);
+ // Cancel the approval and interrupt the turn
+ let _ = send_approval_response(&mut writer, rpc_id, ApprovalDecision::Cancel).await;
+ if let Some(ref tid) = current_turn_id {
+ request_counter += 1;
+ let _ = send_turn_interrupt(&mut writer, request_counter, &thread_id, tid).await;
+ }
+ int_ctx.request_repaint();
+ // Don't restore pending — it's been cancelled
+ }
+ SessionCommand::Shutdown => {
+ tracing::debug!("Session {} shutting down during approval", session_id);
+ return;
+ }
+ SessionCommand::Query { response_tx: new_tx, .. } => {
+ let _ = new_tx.send(DaveApiResponse::Failed(
+ "Query already in progress".to_string(),
+ ));
+ // Restore the pending approval — still waiting
+ pending_approval = Some((rpc_id, rx));
+ }
+ SessionCommand::SetPermissionMode { ctx: mode_ctx, .. } => {
+ mode_ctx.request_repaint();
+ pending_approval = Some((rpc_id, rx));
+ }
+ }
+ }
+
+ result = &mut rx => {
+ let decision = match result {
+ Ok(PermissionResponse::Allow { .. }) => ApprovalDecision::Accept,
+ Ok(PermissionResponse::Deny { .. }) => ApprovalDecision::Decline,
+ Err(_) => ApprovalDecision::Cancel,
+ };
+ let _ = send_approval_response(&mut writer, rpc_id, decision).await;
+ }
+ }
+ } else {
+ // ---- normal streaming state ----
+ tokio::select! {
+ biased;
+
+ Some(cmd) = command_rx.recv() => {
+ match cmd {
+ SessionCommand::Interrupt { ctx: int_ctx } => {
+ tracing::debug!("Session {} interrupted", session_id);
+ if let Some(ref tid) = current_turn_id {
+ request_counter += 1;
+ let _ = send_turn_interrupt(&mut writer, request_counter, &thread_id, tid).await;
+ }
+ int_ctx.request_repaint();
+ }
+ SessionCommand::Query { response_tx: new_tx, .. } => {
+ let _ = new_tx.send(DaveApiResponse::Failed(
+ "Query already in progress".to_string(),
+ ));
+ }
+ SessionCommand::SetPermissionMode { mode, ctx: mode_ctx } => {
+ tracing::debug!(
+ "Session {} ignoring permission mode {:?} (not supported by Codex)",
+ session_id, mode
+ );
+ mode_ctx.request_repaint();
+ }
+ SessionCommand::Shutdown => {
+ tracing::debug!("Session {} shutting down during query", session_id);
+ return;
+ }
+ }
+ }
+
+ line_result = reader.next_line() => {
+ match line_result {
+ Ok(Some(line)) => {
+ let msg: RpcMessage = match serde_json::from_str(&line) {
+ Ok(m) => m,
+ Err(err) => {
+ tracing::warn!("Codex parse error: {} in: {}", err, &line[..line.len().min(200)]);
+ continue;
+ }
+ };
+
+ match handle_codex_message(
+ msg,
+ &response_tx,
+ &ctx,
+ &mut subagent_stack,
+ ) {
+ HandleResult::Continue => {}
+ HandleResult::TurnDone => {
+ turn_done = true;
+ }
+ HandleResult::AutoAccepted(rpc_id) => {
+ let _ = send_approval_response(
+ &mut writer, rpc_id, ApprovalDecision::Accept,
+ ).await;
+ }
+ HandleResult::NeedsApproval { rpc_id, rx } => {
+ pending_approval = Some((rpc_id, rx));
+ }
+ }
+ }
+ Ok(None) => {
+ tracing::error!("Session {} codex process exited unexpectedly", session_id);
+ let _ = response_tx.send(DaveApiResponse::Failed(
+ "Codex process exited unexpectedly".to_string(),
+ ));
+ turn_done = true;
+ }
+ Err(err) => {
+ tracing::error!("Session {} read error: {}", session_id, err);
+ let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
+ turn_done = true;
+ }
+ }
+ }
+ }
+ }
+ }
+
+ current_turn_id = None;
+ tracing::debug!("Turn complete for session {}", session_id);
+ }
+ SessionCommand::Interrupt { ctx } => {
+ ctx.request_repaint();
+ }
+ SessionCommand::SetPermissionMode { mode, ctx } => {
+ tracing::debug!(
+ "Session {} ignoring permission mode {:?} (not supported by Codex)",
+ session_id,
+ mode
+ );
+ ctx.request_repaint();
+ }
+ SessionCommand::Shutdown => {
+ tracing::debug!("Session {} shutting down", session_id);
+ break;
+ }
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Codex message handling (synchronous — no writer needed)
+// ---------------------------------------------------------------------------
+
+/// Process a single incoming Codex JSON-RPC message. Returns a `HandleResult`
+/// indicating what the caller should do next (continue, finish turn, or handle
+/// an approval).
+fn handle_codex_message(
+ msg: RpcMessage,
+ response_tx: &mpsc::Sender<DaveApiResponse>,
+ ctx: &egui::Context,
+ subagent_stack: &mut Vec<String>,
+) -> HandleResult {
+ let method = match &msg.method {
+ Some(m) => m.as_str(),
+ None => {
+ // Response to a request we sent (e.g. approval ack). Nothing to do.
+ return HandleResult::Continue;
+ }
+ };
+
+ match method {
+ "item/agentMessage/delta" => {
+ if let Some(params) = msg.params {
+ if let Ok(delta) = serde_json::from_value::<AgentMessageDeltaParams>(params) {
+ let _ = response_tx.send(DaveApiResponse::Token(delta.delta));
+ ctx.request_repaint();
+ }
+ }
+ }
+
+ "item/started" => {
+ if let Some(params) = msg.params {
+ if let Ok(started) = serde_json::from_value::<ItemStartedParams>(params) {
+ if started.item_type == "collabAgentToolCall" {
+ let item_id = started
+ .item_id
+ .unwrap_or_else(|| Uuid::new_v4().to_string());
+ subagent_stack.push(item_id.clone());
+ let info = SubagentInfo {
+ task_id: item_id,
+ description: started.name.unwrap_or_else(|| "agent".to_string()),
+ subagent_type: "codex-agent".to_string(),
+ status: SubagentStatus::Running,
+ output: String::new(),
+ max_output_size: 4000,
+ tool_results: Vec::new(),
+ };
+ let _ = response_tx.send(DaveApiResponse::SubagentSpawned(info));
+ ctx.request_repaint();
+ }
+ }
+ }
+ }
+
+ "item/completed" => {
+ if let Some(params) = msg.params {
+ if let Ok(completed) = serde_json::from_value::<ItemCompletedParams>(params) {
+ handle_item_completed(&completed, response_tx, ctx, subagent_stack);
+ }
+ }
+ }
+
+ "item/commandExecution/requestApproval" => {
+ if let (Some(rpc_id), Some(params)) = (msg.id, msg.params) {
+ if let Ok(approval) = serde_json::from_value::<CommandApprovalParams>(params) {
+ return check_approval_or_forward(
+ rpc_id,
+ "Bash",
+ serde_json::json!({ "command": approval.command }),
+ response_tx,
+ ctx,
+ );
+ }
+ }
+ }
+
+ "item/fileChange/requestApproval" => {
+ if let (Some(rpc_id), Some(params)) = (msg.id, msg.params) {
+ if let Ok(approval) = serde_json::from_value::<FileChangeApprovalParams>(params) {
+ let kind_str = approval
+ .kind
+ .as_ref()
+ .and_then(|k| k.get("type").and_then(|t| t.as_str()))
+ .unwrap_or("edit");
+
+ let (tool_name, tool_input) = match kind_str {
+ "create" => (
+ "Write",
+ serde_json::json!({
+ "file_path": approval.file_path,
+ "content": approval.diff.as_deref().unwrap_or(""),
+ }),
+ ),
+ _ => (
+ "Edit",
+ serde_json::json!({
+ "file_path": approval.file_path,
+ "old_string": "",
+ "new_string": approval.diff.as_deref().unwrap_or(""),
+ }),
+ ),
+ };
+
+ return check_approval_or_forward(
+ rpc_id,
+ tool_name,
+ tool_input,
+ response_tx,
+ ctx,
+ );
+ }
+ }
+ }
+
+ "turn/completed" => {
+ if let Some(params) = msg.params {
+ if let Ok(completed) = serde_json::from_value::<TurnCompletedParams>(params) {
+ if completed.status == "failed" {
+ let err_msg = completed.error.unwrap_or_else(|| "Turn failed".to_string());
+ let _ = response_tx.send(DaveApiResponse::Failed(err_msg));
+ }
+ }
+ }
+ return HandleResult::TurnDone;
+ }
+
+ other => {
+ tracing::debug!("Unhandled codex notification: {}", other);
+ }
+ }
+
+ HandleResult::Continue
+}
+
+/// Check auto-accept rules. If matched, return `AutoAccepted`. Otherwise
+/// create a `PendingPermission`, send it to the UI, and return `NeedsApproval`
+/// with the oneshot receiver.
+fn check_approval_or_forward(
+ rpc_id: u64,
+ tool_name: &str,
+ tool_input: Value,
+ response_tx: &mpsc::Sender<DaveApiResponse>,
+ ctx: &egui::Context,
+) -> HandleResult {
+ let rules = AutoAcceptRules::default();
+ if rules.should_auto_accept(tool_name, &tool_input) {
+ tracing::debug!("Auto-accepting {} (rpc_id={})", tool_name, rpc_id);
+ return HandleResult::AutoAccepted(rpc_id);
+ }
+
+ // Forward to UI
+ let request_id = Uuid::new_v4();
+ let (ui_resp_tx, ui_resp_rx) = oneshot::channel();
+
+ let request = PermissionRequest {
+ id: request_id,
+ tool_name: tool_name.to_string(),
+ tool_input,
+ response: None,
+ answer_summary: None,
+ cached_plan: None,
+ };
+
+ let pending = PendingPermission {
+ request,
+ response_tx: ui_resp_tx,
+ };
+
+ if response_tx
+ .send(DaveApiResponse::PermissionRequest(pending))
+ .is_err()
+ {
+ tracing::error!("Failed to send permission request to UI");
+ // Return auto-decline — can't reach UI
+ return HandleResult::AutoAccepted(rpc_id); // Will send Accept; could add a Declined variant
+ }
+
+ ctx.request_repaint();
+
+ HandleResult::NeedsApproval {
+ rpc_id,
+ rx: ui_resp_rx,
+ }
+}
+
+/// Handle a completed item from Codex.
+fn handle_item_completed(
+ completed: &ItemCompletedParams,
+ response_tx: &mpsc::Sender<DaveApiResponse>,
+ ctx: &egui::Context,
+ subagent_stack: &mut Vec<String>,
+) {
+ match completed.item_type.as_str() {
+ "commandExecution" => {
+ let command = completed.command.clone().unwrap_or_default();
+ let exit_code = completed.exit_code.unwrap_or(-1);
+ let output = completed.output.clone().unwrap_or_default();
+
+ let tool_input = serde_json::json!({ "command": command });
+ let result_value = serde_json::json!({ "output": output, "exit_code": exit_code });
+ let summary = format_tool_summary("Bash", &tool_input, &result_value);
+ let parent_task_id = subagent_stack.last().cloned();
+
+ let _ = response_tx.send(DaveApiResponse::ToolResult(ExecutedTool {
+ tool_name: "Bash".to_string(),
+ summary,
+ parent_task_id,
+ }));
+ ctx.request_repaint();
+ }
+
+ "fileChange" => {
+ let file_path = completed.file_path.clone().unwrap_or_default();
+ let diff = completed.diff.clone();
+
+ let kind_str = completed
+ .kind
+ .as_ref()
+ .and_then(|k| k.get("type").and_then(|t| t.as_str()))
+ .unwrap_or("edit");
+
+ let tool_name = match kind_str {
+ "create" => "Write",
+ _ => "Edit",
+ };
+
+ let tool_input = serde_json::json!({
+ "file_path": file_path,
+ "diff": diff,
+ });
+ let result_value = serde_json::json!({ "status": "ok" });
+ let summary = format_tool_summary(tool_name, &tool_input, &result_value);
+ let parent_task_id = subagent_stack.last().cloned();
+
+ let _ = response_tx.send(DaveApiResponse::ToolResult(ExecutedTool {
+ tool_name: tool_name.to_string(),
+ summary,
+ parent_task_id,
+ }));
+ ctx.request_repaint();
+ }
+
+ "collabAgentToolCall" => {
+ if let Some(item_id) = &completed.item_id {
+ subagent_stack.retain(|id| id != item_id);
+ let result_text = completed
+ .result
+ .clone()
+ .unwrap_or_else(|| "completed".to_string());
+ let _ = response_tx.send(DaveApiResponse::SubagentCompleted {
+ task_id: item_id.clone(),
+ result: truncate_output(&result_text, 2000),
+ });
+ ctx.request_repaint();
+ }
+ }
+
+ "contextCompaction" => {
+ let pre_tokens = completed.pre_tokens.unwrap_or(0);
+ let _ = response_tx.send(DaveApiResponse::CompactionComplete(CompactionInfo {
+ pre_tokens,
+ }));
+ ctx.request_repaint();
+ }
+
+ other => {
+ tracing::debug!("Unhandled item/completed type: {}", other);
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Codex process spawning and JSON-RPC helpers
+// ---------------------------------------------------------------------------
+
+fn spawn_codex(binary: &str, cwd: &Option<PathBuf>) -> Result<Child, std::io::Error> {
+ let mut cmd = Command::new(binary);
+ cmd.arg("app-server");
+ cmd.stdin(std::process::Stdio::piped());
+ cmd.stdout(std::process::Stdio::piped());
+ cmd.stderr(std::process::Stdio::piped());
+ if let Some(dir) = cwd {
+ cmd.current_dir(dir);
+ }
+ cmd.spawn()
+}
+
+/// Send a JSONL request on stdin.
+async fn send_request<P: serde::Serialize, W: AsyncWrite + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ req: &RpcRequest<P>,
+) -> Result<(), std::io::Error> {
+ let mut line = serde_json::to_string(req)
+ .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
+ line.push('\n');
+ writer.write_all(line.as_bytes()).await?;
+ writer.flush().await?;
+ Ok(())
+}
+
+/// Send a JSON-RPC response (for approval requests).
+async fn send_rpc_response<W: AsyncWrite + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ id: u64,
+ result: Value,
+) -> Result<(), std::io::Error> {
+ let resp = serde_json::json!({ "id": id, "result": result });
+ let mut line = serde_json::to_string(&resp)
+ .map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
+ line.push('\n');
+ writer.write_all(line.as_bytes()).await?;
+ writer.flush().await?;
+ Ok(())
+}
+
+/// Send an approval decision response.
+async fn send_approval_response<W: AsyncWrite + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ rpc_id: u64,
+ decision: ApprovalDecision,
+) -> Result<(), std::io::Error> {
+ let result = serde_json::to_value(ApprovalResponse { decision }).unwrap();
+ send_rpc_response(writer, rpc_id, result).await
+}
+
+/// Perform the `initialize` → `initialized` handshake.
+async fn do_init_handshake<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ reader: &mut tokio::io::Lines<R>,
+) -> Result<(), String> {
+ let req = RpcRequest {
+ id: Some(1),
+ method: "initialize",
+ params: InitializeParams {
+ client_info: ClientInfo {
+ name: "dave".to_string(),
+ version: env!("CARGO_PKG_VERSION").to_string(),
+ },
+ capabilities: serde_json::json!({}),
+ },
+ };
+
+ send_request(writer, &req)
+ .await
+ .map_err(|e| format!("Failed to send initialize: {}", e))?;
+
+ let resp = read_response_for_id(reader, 1)
+ .await
+ .map_err(|e| format!("Failed to read initialize response: {}", e))?;
+
+ if let Some(err) = resp.error {
+ return Err(format!("Initialize error: {}", err.message));
+ }
+
+ // Send `initialized` notification (no id, no response expected)
+ let notif: RpcRequest<Value> = RpcRequest {
+ id: None,
+ method: "initialized",
+ params: serde_json::json!({}),
+ };
+ send_request(writer, ¬if)
+ .await
+ .map_err(|e| format!("Failed to send initialized: {}", e))?;
+
+ Ok(())
+}
+
+/// Send `thread/start` and return the thread ID.
+async fn send_thread_start<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ reader: &mut tokio::io::Lines<R>,
+ model: Option<&str>,
+ cwd: Option<&str>,
+) -> Result<String, String> {
+ let req = RpcRequest {
+ id: Some(2),
+ method: "thread/start",
+ params: ThreadStartParams {
+ model: model.map(|s| s.to_string()),
+ cwd: cwd.map(|s| s.to_string()),
+ approval_policy: Some("on-request".to_string()),
+ },
+ };
+
+ send_request(writer, &req)
+ .await
+ .map_err(|e| format!("Failed to send thread/start: {}", e))?;
+
+ let resp = read_response_for_id(reader, 2)
+ .await
+ .map_err(|e| format!("Failed to read thread/start response: {}", e))?;
+
+ if let Some(err) = resp.error {
+ return Err(format!("thread/start error: {}", err.message));
+ }
+
+ let result = resp.result.ok_or("No result in thread/start response")?;
+ let thread_result: ThreadStartResult = serde_json::from_value(result)
+ .map_err(|e| format!("Failed to parse thread/start result: {}", e))?;
+
+ Ok(thread_result.thread.id)
+}
+
+/// Send `thread/resume` and return the thread ID.
+async fn send_thread_resume<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ reader: &mut tokio::io::Lines<R>,
+ thread_id: &str,
+) -> Result<String, String> {
+ let req = RpcRequest {
+ id: Some(3),
+ method: "thread/resume",
+ params: ThreadResumeParams {
+ thread_id: thread_id.to_string(),
+ },
+ };
+
+ send_request(writer, &req)
+ .await
+ .map_err(|e| format!("Failed to send thread/resume: {}", e))?;
+
+ let resp = read_response_for_id(reader, 3)
+ .await
+ .map_err(|e| format!("Failed to read thread/resume response: {}", e))?;
+
+ if let Some(err) = resp.error {
+ return Err(format!("thread/resume error: {}", err.message));
+ }
+
+ Ok(thread_id.to_string())
+}
+
+/// Send `turn/start`.
+async fn send_turn_start<W: AsyncWrite + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ req_id: u64,
+ thread_id: &str,
+ prompt: &str,
+ model: Option<&str>,
+) -> Result<(), String> {
+ let req = RpcRequest {
+ id: Some(req_id),
+ method: "turn/start",
+ params: TurnStartParams {
+ thread_id: thread_id.to_string(),
+ input: vec![TurnInput::Text {
+ text: prompt.to_string(),
+ }],
+ model: model.map(|s| s.to_string()),
+ effort: None,
+ },
+ };
+
+ send_request(writer, &req)
+ .await
+ .map_err(|e| format!("Failed to send turn/start: {}", e))
+}
+
+/// Send `turn/interrupt`.
+async fn send_turn_interrupt<W: AsyncWrite + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ req_id: u64,
+ thread_id: &str,
+ turn_id: &str,
+) -> Result<(), String> {
+ let req = RpcRequest {
+ id: Some(req_id),
+ method: "turn/interrupt",
+ params: TurnInterruptParams {
+ thread_id: thread_id.to_string(),
+ turn_id: turn_id.to_string(),
+ },
+ };
+
+ send_request(writer, &req)
+ .await
+ .map_err(|e| format!("Failed to send turn/interrupt: {}", e))
+}
+
+/// Read lines until we find a response matching the given request id.
+/// Non-matching messages (notifications) are logged and skipped.
+async fn read_response_for_id<R: AsyncBufRead + Unpin>(
+ reader: &mut tokio::io::Lines<R>,
+ expected_id: u64,
+) -> Result<RpcMessage, String> {
+ loop {
+ let line = reader
+ .next_line()
+ .await
+ .map_err(|e| format!("IO error: {}", e))?
+ .ok_or_else(|| "EOF while waiting for response".to_string())?;
+
+ let msg: RpcMessage = serde_json::from_str(&line).map_err(|e| {
+ format!(
+ "JSON parse error: {} in: {}",
+ e,
+ &line[..line.len().min(200)]
+ )
+ })?;
+
+ if msg.id == Some(expected_id) {
+ return Ok(msg);
+ }
+
+ tracing::trace!(
+ "Skipping message during handshake (waiting for id={}): method={:?}",
+ expected_id,
+ msg.method
+ );
+ }
+}
+
+/// Drain pending commands, sending error to any Query commands.
+async fn drain_commands_with_error(
+ command_rx: &mut tokio_mpsc::Receiver<SessionCommand>,
+ error: &str,
+) {
+ while let Some(cmd) = command_rx.recv().await {
+ if let SessionCommand::Query {
+ ref response_tx, ..
+ } = cmd
+ {
+ let _ = response_tx.send(DaveApiResponse::Failed(error.to_string()));
+ }
+ if matches!(cmd, SessionCommand::Shutdown) {
+ break;
+ }
+ }
+}
+
+// ---------------------------------------------------------------------------
+// Public backend
+// ---------------------------------------------------------------------------
+
+pub struct CodexBackend {
+ codex_binary: String,
+ sessions: DashMap<String, SessionHandle>,
+}
+
+impl CodexBackend {
+ pub fn new(codex_binary: String) -> Self {
+ Self {
+ codex_binary,
+ sessions: DashMap::new(),
+ }
+ }
+
+ /// Convert messages to a prompt string, same logic as the Claude backend.
+ fn messages_to_prompt(messages: &[Message]) -> String {
+ let mut prompt = String::new();
+ for msg in messages {
+ if let Message::System(content) = msg {
+ prompt.push_str(content);
+ prompt.push_str("\n\n");
+ break;
+ }
+ }
+ for msg in messages {
+ match msg {
+ Message::System(_) => {}
+ Message::User(content) => {
+ prompt.push_str("Human: ");
+ prompt.push_str(content);
+ prompt.push_str("\n\n");
+ }
+ Message::Assistant(content) => {
+ prompt.push_str("Assistant: ");
+ prompt.push_str(content.text());
+ prompt.push_str("\n\n");
+ }
+ _ => {}
+ }
+ }
+ prompt
+ }
+
+ fn get_latest_user_message(messages: &[Message]) -> String {
+ messages
+ .iter()
+ .rev()
+ .find_map(|m| match m {
+ Message::User(content) => Some(content.clone()),
+ _ => None,
+ })
+ .unwrap_or_default()
+ }
+}
+
+impl AiBackend for CodexBackend {
+ fn stream_request(
+ &self,
+ messages: Vec<Message>,
+ _tools: Arc<HashMap<String, Tool>>,
+ model: String,
+ _user_id: String,
+ session_id: String,
+ cwd: Option<PathBuf>,
+ resume_session_id: Option<String>,
+ ctx: egui::Context,
+ ) -> (
+ mpsc::Receiver<DaveApiResponse>,
+ Option<tokio::task::JoinHandle<()>>,
+ ) {
+ let (response_tx, response_rx) = mpsc::channel();
+
+ let prompt = if resume_session_id.is_some() {
+ Self::get_latest_user_message(&messages)
+ } else {
+ let is_first_message = messages
+ .iter()
+ .filter(|m| matches!(m, Message::User(_)))
+ .count()
+ == 1;
+ if is_first_message {
+ Self::messages_to_prompt(&messages)
+ } else {
+ Self::get_latest_user_message(&messages)
+ }
+ };
+
+ tracing::debug!(
+ "Codex request: session={}, resumed={}, prompt_len={}",
+ session_id,
+ resume_session_id.is_some(),
+ prompt.len(),
+ );
+
+ let command_tx = {
+ let entry = self.sessions.entry(session_id.clone());
+ let codex_binary = self.codex_binary.clone();
+ let model_clone = model.clone();
+ let cwd_clone = cwd.clone();
+ let resume_clone = resume_session_id.clone();
+ let handle = entry.or_insert_with(|| {
+ let (command_tx, command_rx) = tokio_mpsc::channel(16);
+ let sid = session_id.clone();
+ tokio::spawn(async move {
+ session_actor(
+ sid,
+ cwd_clone,
+ codex_binary,
+ Some(model_clone),
+ resume_clone,
+ command_rx,
+ )
+ .await;
+ });
+ SessionHandle { command_tx }
+ });
+ handle.command_tx.clone()
+ };
+
+ let handle = tokio::spawn(async move {
+ if let Err(err) = command_tx
+ .send(SessionCommand::Query {
+ prompt,
+ response_tx,
+ ctx,
+ })
+ .await
+ {
+ tracing::error!("Failed to send query to codex session actor: {}", err);
+ }
+ });
+
+ (response_rx, Some(handle))
+ }
+
+ fn cleanup_session(&self, session_id: String) {
+ if let Some((_, handle)) = self.sessions.remove(&session_id) {
+ tokio::spawn(async move {
+ if let Err(err) = handle.command_tx.send(SessionCommand::Shutdown).await {
+ tracing::warn!("Failed to send shutdown to codex session: {}", err);
+ }
+ });
+ }
+ }
+
+ fn interrupt_session(&self, session_id: String, ctx: egui::Context) {
+ if let Some(handle) = self.sessions.get(&session_id) {
+ let command_tx = handle.command_tx.clone();
+ tokio::spawn(async move {
+ if let Err(err) = command_tx.send(SessionCommand::Interrupt { ctx }).await {
+ tracing::warn!("Failed to send interrupt to codex session: {}", err);
+ }
+ });
+ }
+ }
+
+ fn set_permission_mode(&self, session_id: String, mode: PermissionMode, ctx: egui::Context) {
+ if let Some(handle) = self.sessions.get(&session_id) {
+ let command_tx = handle.command_tx.clone();
+ tokio::spawn(async move {
+ if let Err(err) = command_tx
+ .send(SessionCommand::SetPermissionMode { mode, ctx })
+ .await
+ {
+ tracing::warn!(
+ "Failed to send set_permission_mode to codex session: {}",
+ err
+ );
+ }
+ });
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::messages::DaveApiResponse;
+ use serde_json::json;
+ use std::time::Duration;
+
+ /// Helper: build an RpcMessage from a method and params JSON
+ fn notification(method: &str, params: Value) -> RpcMessage {
+ RpcMessage {
+ id: None,
+ method: Some(method.to_string()),
+ result: None,
+ error: None,
+ params: Some(params),
+ }
+ }
+
+ /// Helper: build an RpcMessage that is a server→client request (has id)
+ fn server_request(id: u64, method: &str, params: Value) -> RpcMessage {
+ RpcMessage {
+ id: Some(id),
+ method: Some(method.to_string()),
+ result: None,
+ error: None,
+ params: Some(params),
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Protocol serde tests
+ // -----------------------------------------------------------------------
+
+ #[test]
+ fn test_rpc_request_serialization() {
+ let req = RpcRequest {
+ id: Some(1),
+ method: "initialize",
+ params: InitializeParams {
+ client_info: ClientInfo {
+ name: "dave".to_string(),
+ version: "0.1.0".to_string(),
+ },
+ capabilities: json!({}),
+ },
+ };
+ let json = serde_json::to_string(&req).unwrap();
+ assert!(json.contains("\"id\":1"));
+ assert!(json.contains("\"method\":\"initialize\""));
+ assert!(json.contains("\"clientInfo\""));
+ }
+
+ #[test]
+ fn test_rpc_request_notification_omits_id() {
+ let req: RpcRequest<Value> = RpcRequest {
+ id: None,
+ method: "initialized",
+ params: json!({}),
+ };
+ let json = serde_json::to_string(&req).unwrap();
+ assert!(!json.contains("\"id\""));
+ }
+
+ #[test]
+ fn test_rpc_message_deserialization_response() {
+ let json = r#"{"id":1,"result":{"serverInfo":{"name":"codex"}}}"#;
+ let msg: RpcMessage = serde_json::from_str(json).unwrap();
+ assert_eq!(msg.id, Some(1));
+ assert!(msg.result.is_some());
+ assert!(msg.method.is_none());
+ }
+
+ #[test]
+ fn test_rpc_message_deserialization_notification() {
+ let json = r#"{"method":"item/agentMessage/delta","params":{"delta":"hello"}}"#;
+ let msg: RpcMessage = serde_json::from_str(json).unwrap();
+ assert!(msg.id.is_none());
+ assert_eq!(msg.method.as_deref(), Some("item/agentMessage/delta"));
+ }
+
+ #[test]
+ fn test_thread_start_result_deserialization() {
+ let json = r#"{"thread":{"id":"thread_abc123"},"model":"gpt-5.2-codex"}"#;
+ let result: ThreadStartResult = serde_json::from_str(json).unwrap();
+ assert_eq!(result.thread.id, "thread_abc123");
+ assert_eq!(result.model.as_deref(), Some("gpt-5.2-codex"));
+ }
+
+ #[test]
+ fn test_approval_response_serialization() {
+ let resp = ApprovalResponse {
+ decision: ApprovalDecision::Accept,
+ };
+ let json = serde_json::to_string(&resp).unwrap();
+ assert!(json.contains("\"decision\":\"accept\""));
+
+ let resp = ApprovalResponse {
+ decision: ApprovalDecision::Decline,
+ };
+ let json = serde_json::to_string(&resp).unwrap();
+ assert!(json.contains("\"decision\":\"decline\""));
+ }
+
+ #[test]
+ fn test_turn_input_serialization() {
+ let input = TurnInput::Text {
+ text: "hello".to_string(),
+ };
+ let json = serde_json::to_string(&input).unwrap();
+ assert!(json.contains("\"type\":\"text\""));
+ assert!(json.contains("\"text\":\"hello\""));
+ }
+
+ // -----------------------------------------------------------------------
+ // handle_codex_message tests
+ // -----------------------------------------------------------------------
+
+ #[test]
+ fn test_handle_delta_sends_token() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let msg = notification("item/agentMessage/delta", json!({ "delta": "Hello world" }));
+
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ assert!(matches!(result, HandleResult::Continue));
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::Token(t) => assert_eq!(t, "Hello world"),
+ other => panic!("Expected Token, got {:?}", std::mem::discriminant(&other)),
+ }
+ }
+
+ #[test]
+ fn test_handle_turn_completed_success() {
+ let (tx, _rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let msg = notification("turn/completed", json!({ "status": "completed" }));
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ assert!(matches!(result, HandleResult::TurnDone));
+ }
+
+ #[test]
+ fn test_handle_turn_completed_failure_sends_error() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let msg = notification(
+ "turn/completed",
+ json!({ "status": "failed", "error": "rate limit exceeded" }),
+ );
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ assert!(matches!(result, HandleResult::TurnDone));
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::Failed(err) => assert_eq!(err, "rate limit exceeded"),
+ other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
+ }
+ }
+
+ #[test]
+ fn test_handle_response_message_ignored() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ // A response (has id, no method) — should be ignored
+ let msg = RpcMessage {
+ id: Some(42),
+ method: None,
+ result: Some(json!({})),
+ error: None,
+ params: None,
+ };
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ assert!(matches!(result, HandleResult::Continue));
+ assert!(rx.try_recv().is_err()); // nothing sent
+ }
+
+ #[test]
+ fn test_handle_unknown_method_ignored() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let msg = notification("some/future/event", json!({}));
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ assert!(matches!(result, HandleResult::Continue));
+ assert!(rx.try_recv().is_err());
+ }
+
+ #[test]
+ fn test_handle_subagent_started() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let msg = notification(
+ "item/started",
+ json!({
+ "type": "collabAgentToolCall",
+ "itemId": "agent-1",
+ "name": "research agent"
+ }),
+ );
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ assert!(matches!(result, HandleResult::Continue));
+ assert_eq!(subagents.len(), 1);
+ assert_eq!(subagents[0], "agent-1");
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::SubagentSpawned(info) => {
+ assert_eq!(info.task_id, "agent-1");
+ assert_eq!(info.description, "research agent");
+ }
+ other => panic!(
+ "Expected SubagentSpawned, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ #[test]
+ fn test_handle_command_approval_auto_accept() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ // "git status" should be auto-accepted by default rules
+ let msg = server_request(
+ 99,
+ "item/commandExecution/requestApproval",
+ json!({ "command": "git status" }),
+ );
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ match result {
+ HandleResult::AutoAccepted(id) => assert_eq!(id, 99),
+ other => panic!(
+ "Expected AutoAccepted, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ // No permission request sent to UI
+ assert!(rx.try_recv().is_err());
+ }
+
+ #[test]
+ fn test_handle_command_approval_needs_ui() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ // "rm -rf /" should NOT be auto-accepted
+ let msg = server_request(
+ 100,
+ "item/commandExecution/requestApproval",
+ json!({ "command": "rm -rf /" }),
+ );
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ match result {
+ HandleResult::NeedsApproval { rpc_id, .. } => assert_eq!(rpc_id, 100),
+ other => panic!(
+ "Expected NeedsApproval, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+
+ // Permission request should have been sent to UI
+ let response = rx.try_recv().unwrap();
+ assert!(matches!(response, DaveApiResponse::PermissionRequest(_)));
+ }
+
+ // -----------------------------------------------------------------------
+ // handle_item_completed tests
+ // -----------------------------------------------------------------------
+
+ #[test]
+ fn test_item_completed_command_execution() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let completed = ItemCompletedParams {
+ item_type: "commandExecution".to_string(),
+ item_id: None,
+ command: Some("ls -la".to_string()),
+ exit_code: Some(0),
+ output: Some("total 42\n".to_string()),
+ file_path: None,
+ diff: None,
+ kind: None,
+ result: None,
+ pre_tokens: None,
+ content: None,
+ };
+
+ handle_item_completed(&completed, &tx, &ctx, &mut subagents);
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::ToolResult(tool) => {
+ assert_eq!(tool.tool_name, "Bash");
+ assert!(tool.parent_task_id.is_none());
+ }
+ other => panic!(
+ "Expected ToolResult, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ #[test]
+ fn test_item_completed_file_change_edit() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let completed = ItemCompletedParams {
+ item_type: "fileChange".to_string(),
+ item_id: None,
+ command: None,
+ exit_code: None,
+ output: None,
+ file_path: Some("src/main.rs".to_string()),
+ diff: Some("@@ -1,3 +1,3 @@\n-old\n+new\n context\n".to_string()),
+ kind: Some(json!({"type": "edit"})),
+ result: None,
+ pre_tokens: None,
+ content: None,
+ };
+
+ handle_item_completed(&completed, &tx, &ctx, &mut subagents);
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::ToolResult(tool) => {
+ assert_eq!(tool.tool_name, "Edit");
+ }
+ other => panic!(
+ "Expected ToolResult, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ #[test]
+ fn test_item_completed_file_change_create() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let completed = ItemCompletedParams {
+ item_type: "fileChange".to_string(),
+ item_id: None,
+ command: None,
+ exit_code: None,
+ output: None,
+ file_path: Some("new_file.rs".to_string()),
+ diff: None,
+ kind: Some(json!({"type": "create"})),
+ result: None,
+ pre_tokens: None,
+ content: None,
+ };
+
+ handle_item_completed(&completed, &tx, &ctx, &mut subagents);
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::ToolResult(tool) => {
+ assert_eq!(tool.tool_name, "Write");
+ }
+ other => panic!(
+ "Expected ToolResult, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ #[test]
+ fn test_item_completed_subagent() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = vec!["agent-1".to_string()];
+
+ let completed = ItemCompletedParams {
+ item_type: "collabAgentToolCall".to_string(),
+ item_id: Some("agent-1".to_string()),
+ command: None,
+ exit_code: None,
+ output: None,
+ file_path: None,
+ diff: None,
+ kind: None,
+ result: Some("Found 3 relevant files".to_string()),
+ pre_tokens: None,
+ content: None,
+ };
+
+ handle_item_completed(&completed, &tx, &ctx, &mut subagents);
+
+ // Subagent removed from stack
+ assert!(subagents.is_empty());
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::SubagentCompleted { task_id, result } => {
+ assert_eq!(task_id, "agent-1");
+ assert_eq!(result, "Found 3 relevant files");
+ }
+ other => panic!(
+ "Expected SubagentCompleted, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ #[test]
+ fn test_item_completed_compaction() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = Vec::new();
+
+ let completed = ItemCompletedParams {
+ item_type: "contextCompaction".to_string(),
+ item_id: None,
+ command: None,
+ exit_code: None,
+ output: None,
+ file_path: None,
+ diff: None,
+ kind: None,
+ result: None,
+ pre_tokens: Some(50000),
+ content: None,
+ };
+
+ handle_item_completed(&completed, &tx, &ctx, &mut subagents);
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::CompactionComplete(info) => {
+ assert_eq!(info.pre_tokens, 50000);
+ }
+ other => panic!(
+ "Expected CompactionComplete, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ #[test]
+ fn test_item_completed_with_parent_subagent() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+ let mut subagents = vec!["parent-agent".to_string()];
+
+ let completed = ItemCompletedParams {
+ item_type: "commandExecution".to_string(),
+ item_id: None,
+ command: Some("cargo test".to_string()),
+ exit_code: Some(0),
+ output: Some("ok".to_string()),
+ file_path: None,
+ diff: None,
+ kind: None,
+ result: None,
+ pre_tokens: None,
+ content: None,
+ };
+
+ handle_item_completed(&completed, &tx, &ctx, &mut subagents);
+
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::ToolResult(tool) => {
+ assert_eq!(tool.tool_name, "Bash");
+ assert_eq!(tool.parent_task_id.as_deref(), Some("parent-agent"));
+ }
+ other => panic!(
+ "Expected ToolResult, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // check_approval_or_forward tests
+ // -----------------------------------------------------------------------
+
+ #[test]
+ fn test_approval_auto_accept_read_tool() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+
+ // Glob/Grep/Read are always auto-accepted
+ let result = check_approval_or_forward(1, "Glob", json!({"pattern": "*.rs"}), &tx, &ctx);
+ assert!(matches!(result, HandleResult::AutoAccepted(1)));
+ assert!(rx.try_recv().is_err()); // no UI request
+ }
+
+ #[test]
+ fn test_approval_forwards_dangerous_command() {
+ let (tx, rx) = mpsc::channel();
+ let ctx = egui::Context::default();
+
+ let result =
+ check_approval_or_forward(42, "Bash", json!({"command": "sudo rm -rf /"}), &tx, &ctx);
+ match result {
+ HandleResult::NeedsApproval { rpc_id, .. } => assert_eq!(rpc_id, 42),
+ other => panic!(
+ "Expected NeedsApproval, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+
+ // Permission request sent to UI
+ let response = rx.try_recv().unwrap();
+ match response {
+ DaveApiResponse::PermissionRequest(pending) => {
+ assert_eq!(pending.request.tool_name, "Bash");
+ }
+ other => panic!(
+ "Expected PermissionRequest, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ // Integration tests — mock Codex server over duplex streams
+ // -----------------------------------------------------------------------
+
+ /// Mock Codex server that speaks JSONL over duplex streams.
+ struct MockCodex {
+ /// Read what the actor writes (actor's "stdin" from mock's perspective).
+ reader: tokio::io::Lines<BufReader<tokio::io::DuplexStream>>,
+ /// Write what the actor reads (actor's "stdout" from mock's perspective).
+ writer: tokio::io::BufWriter<tokio::io::DuplexStream>,
+ }
+
+ impl MockCodex {
+ /// Read one JSONL message sent by the actor.
+ async fn read_message(&mut self) -> RpcMessage {
+ let line = self.reader.next_line().await.unwrap().unwrap();
+ serde_json::from_str(&line).unwrap()
+ }
+
+ /// Send a JSONL message to the actor.
+ async fn send_line(&mut self, value: &Value) {
+ let mut line = serde_json::to_string(value).unwrap();
+ line.push('\n');
+ self.writer.write_all(line.as_bytes()).await.unwrap();
+ self.writer.flush().await.unwrap();
+ }
+
+ /// Handle the `initialize` → `initialized` handshake.
+ async fn handle_init(&mut self) {
+ let req = self.read_message().await;
+ assert_eq!(req.method.as_deref(), Some("initialize"));
+ let id = req.id.unwrap();
+ self.send_line(&json!({
+ "id": id,
+ "result": { "serverInfo": { "name": "mock-codex", "version": "0.0.0" } }
+ }))
+ .await;
+ let notif = self.read_message().await;
+ assert_eq!(notif.method.as_deref(), Some("initialized"));
+ }
+
+ /// Handle `thread/start` and return the thread ID.
+ async fn handle_thread_start(&mut self) -> String {
+ let req = self.read_message().await;
+ assert_eq!(req.method.as_deref(), Some("thread/start"));
+ let id = req.id.unwrap();
+ let thread_id = "mock-thread-1";
+ self.send_line(&json!({
+ "id": id,
+ "result": { "thread": { "id": thread_id }, "model": "mock-model" }
+ }))
+ .await;
+ thread_id.to_string()
+ }
+
+ /// Handle `turn/start` and return the turn ID.
+ async fn handle_turn_start(&mut self) -> String {
+ let req = self.read_message().await;
+ assert_eq!(req.method.as_deref(), Some("turn/start"));
+ let id = req.id.unwrap();
+ let turn_id = "mock-turn-1";
+ self.send_line(&json!({
+ "id": id,
+ "result": { "turn": { "id": turn_id } }
+ }))
+ .await;
+ turn_id.to_string()
+ }
+
+ /// Send an `item/agentMessage/delta` notification.
+ async fn send_delta(&mut self, text: &str) {
+ self.send_line(&json!({
+ "method": "item/agentMessage/delta",
+ "params": { "delta": text }
+ }))
+ .await;
+ }
+
+ /// Send a `turn/completed` notification.
+ async fn send_turn_completed(&mut self, status: &str) {
+ self.send_line(&json!({
+ "method": "turn/completed",
+ "params": { "status": status }
+ }))
+ .await;
+ }
+
+ /// Send an `item/completed` notification.
+ async fn send_item_completed(&mut self, params: Value) {
+ self.send_line(&json!({
+ "method": "item/completed",
+ "params": params
+ }))
+ .await;
+ }
+
+ /// Send an `item/started` notification.
+ async fn send_item_started(&mut self, params: Value) {
+ self.send_line(&json!({
+ "method": "item/started",
+ "params": params
+ }))
+ .await;
+ }
+
+ /// Send an approval request (server→client request with id).
+ async fn send_approval_request(&mut self, rpc_id: u64, method: &str, params: Value) {
+ self.send_line(&json!({
+ "id": rpc_id,
+ "method": method,
+ "params": params
+ }))
+ .await;
+ }
+ }
+
+ /// Create a mock codex server and spawn the session actor loop.
+ /// Returns the mock, a command sender, and the actor task handle.
+ fn setup_integration_test() -> (
+ MockCodex,
+ tokio_mpsc::Sender<SessionCommand>,
+ tokio::task::JoinHandle<()>,
+ ) {
+ // "stdout" channel: mock writes → actor reads
+ let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192);
+ // "stdin" channel: actor writes → mock reads
+ let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192);
+
+ let mock = MockCodex {
+ reader: BufReader::new(mock_stdin_read).lines(),
+ writer: tokio::io::BufWriter::new(mock_stdout_write),
+ };
+
+ let actor_writer = tokio::io::BufWriter::new(actor_stdin_write);
+ let actor_reader = BufReader::new(actor_stdout_read).lines();
+
+ let (command_tx, mut command_rx) = tokio_mpsc::channel(16);
+
+ let handle = tokio::spawn(async move {
+ session_actor_loop(
+ "test-session",
+ actor_writer,
+ actor_reader,
+ Some("mock-model"),
+ None,
+ None,
+ &mut command_rx,
+ )
+ .await;
+ });
+
+ (mock, command_tx, handle)
+ }
+
+ /// Send a Query command and return the response receiver.
+ async fn send_query(
+ command_tx: &tokio_mpsc::Sender<SessionCommand>,
+ prompt: &str,
+ ) -> mpsc::Receiver<DaveApiResponse> {
+ let (response_tx, response_rx) = mpsc::channel();
+ command_tx
+ .send(SessionCommand::Query {
+ prompt: prompt.to_string(),
+ response_tx,
+ ctx: egui::Context::default(),
+ })
+ .await
+ .unwrap();
+ response_rx
+ }
+
+ /// Collect all responses from the channel.
+ fn collect_responses(rx: &mpsc::Receiver<DaveApiResponse>) -> Vec<DaveApiResponse> {
+ rx.try_iter().collect()
+ }
+
+ // -- Integration tests --
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_streaming_tokens() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "Hello").await;
+ mock.handle_turn_start().await;
+
+ mock.send_delta("Hello").await;
+ mock.send_delta(" world").await;
+ mock.send_delta("!").await;
+ mock.send_turn_completed("completed").await;
+
+ // Drop sender — actor finishes processing remaining lines,
+ // then command_rx.recv() returns None and the loop exits.
+ drop(command_tx);
+ handle.await.unwrap();
+
+ let tokens: Vec<String> = collect_responses(&response_rx)
+ .into_iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::Token(t) => Some(t),
+ _ => None,
+ })
+ .collect();
+ assert_eq!(tokens, vec!["Hello", " world", "!"]);
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_command_execution() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "list files").await;
+ mock.handle_turn_start().await;
+
+ mock.send_item_completed(json!({
+ "type": "commandExecution",
+ "command": "ls -la",
+ "exitCode": 0,
+ "output": "total 42\nfoo.rs\n"
+ }))
+ .await;
+ mock.send_turn_completed("completed").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+
+ let tool_results: Vec<_> = collect_responses(&response_rx)
+ .into_iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::ToolResult(t) => Some(t),
+ _ => None,
+ })
+ .collect();
+ assert_eq!(tool_results.len(), 1);
+ assert_eq!(tool_results[0].tool_name, "Bash");
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_file_change() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "edit file").await;
+ mock.handle_turn_start().await;
+
+ mock.send_item_completed(json!({
+ "type": "fileChange",
+ "filePath": "src/main.rs",
+ "diff": "@@ -1,3 +1,3 @@\n-old\n+new\n context\n",
+ "kind": { "type": "edit" }
+ }))
+ .await;
+ mock.send_turn_completed("completed").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+
+ let tool_results: Vec<_> = collect_responses(&response_rx)
+ .into_iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::ToolResult(t) => Some(t),
+ _ => None,
+ })
+ .collect();
+ assert_eq!(tool_results.len(), 1);
+ assert_eq!(tool_results[0].tool_name, "Edit");
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_approval_accept() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "delete stuff").await;
+ mock.handle_turn_start().await;
+
+ // Send a command that won't be auto-accepted
+ mock.send_approval_request(
+ 42,
+ "item/commandExecution/requestApproval",
+ json!({ "command": "rm -rf /tmp/important" }),
+ )
+ .await;
+
+ // Actor should forward a PermissionRequest
+ let resp = response_rx
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for PermissionRequest");
+ let pending = match resp {
+ DaveApiResponse::PermissionRequest(p) => p,
+ other => panic!(
+ "Expected PermissionRequest, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ };
+ assert_eq!(pending.request.tool_name, "Bash");
+
+ // Approve it
+ pending
+ .response_tx
+ .send(PermissionResponse::Allow { message: None })
+ .unwrap();
+
+ // Mock should receive the acceptance
+ let approval_msg = mock.read_message().await;
+ assert_eq!(approval_msg.id, Some(42));
+ let result = approval_msg.result.unwrap();
+ assert_eq!(result["decision"], "accept");
+
+ mock.send_turn_completed("completed").await;
+ drop(command_tx);
+ handle.await.unwrap();
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_approval_deny() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "dangerous").await;
+ mock.handle_turn_start().await;
+
+ mock.send_approval_request(
+ 99,
+ "item/commandExecution/requestApproval",
+ json!({ "command": "sudo rm -rf /" }),
+ )
+ .await;
+
+ let resp = response_rx
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for PermissionRequest");
+ let pending = match resp {
+ DaveApiResponse::PermissionRequest(p) => p,
+ _ => panic!("Expected PermissionRequest"),
+ };
+
+ // Deny it
+ pending
+ .response_tx
+ .send(PermissionResponse::Deny {
+ reason: "too dangerous".to_string(),
+ })
+ .unwrap();
+
+ // Mock should receive the decline
+ let approval_msg = mock.read_message().await;
+ assert_eq!(approval_msg.id, Some(99));
+ let result = approval_msg.result.unwrap();
+ assert_eq!(result["decision"], "decline");
+
+ mock.send_turn_completed("completed").await;
+ drop(command_tx);
+ handle.await.unwrap();
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_auto_accept() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "check status").await;
+ mock.handle_turn_start().await;
+
+ // "git status" should be auto-accepted
+ mock.send_approval_request(
+ 50,
+ "item/commandExecution/requestApproval",
+ json!({ "command": "git status" }),
+ )
+ .await;
+
+ // Mock should receive the auto-acceptance immediately (no UI involved)
+ let approval_msg = mock.read_message().await;
+ assert_eq!(approval_msg.id, Some(50));
+ let result = approval_msg.result.unwrap();
+ assert_eq!(result["decision"], "accept");
+
+ // No PermissionRequest should have been sent
+ // (the response_rx should be empty or only have non-permission items)
+ mock.send_turn_completed("completed").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+
+ let permission_requests: Vec<_> = collect_responses(&response_rx)
+ .into_iter()
+ .filter(|r| matches!(r, DaveApiResponse::PermissionRequest(_)))
+ .collect();
+ assert!(
+ permission_requests.is_empty(),
+ "Auto-accepted commands should not generate PermissionRequests"
+ );
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_multiple_turns() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ // First turn
+ let rx1 = send_query(&command_tx, "first").await;
+ mock.handle_turn_start().await;
+ mock.send_delta("reply 1").await;
+ mock.send_turn_completed("completed").await;
+
+ // Wait for the first turn's token to confirm the actor is processing
+ let resp = rx1
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for first turn token");
+ assert!(matches!(resp, DaveApiResponse::Token(_)));
+
+ // Brief yield for turn_completed to be processed
+ tokio::time::sleep(Duration::from_millis(100)).await;
+
+ // Second turn
+ let rx2 = send_query(&command_tx, "second").await;
+ mock.handle_turn_start().await;
+ mock.send_delta("reply 2").await;
+ mock.send_turn_completed("completed").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+
+ let tokens2: Vec<String> = collect_responses(&rx2)
+ .into_iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::Token(t) => Some(t),
+ _ => None,
+ })
+ .collect();
+ assert_eq!(tokens2, vec!["reply 2"]);
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_subagent_lifecycle() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "research").await;
+ mock.handle_turn_start().await;
+
+ // Subagent starts
+ mock.send_item_started(json!({
+ "type": "collabAgentToolCall",
+ "itemId": "agent-42",
+ "name": "research agent"
+ }))
+ .await;
+
+ // Command inside subagent
+ mock.send_item_completed(json!({
+ "type": "commandExecution",
+ "command": "grep -r pattern .",
+ "exitCode": 0,
+ "output": "found 3 matches"
+ }))
+ .await;
+
+ // Subagent completes
+ mock.send_item_completed(json!({
+ "type": "collabAgentToolCall",
+ "itemId": "agent-42",
+ "result": "Found relevant information"
+ }))
+ .await;
+
+ mock.send_turn_completed("completed").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+
+ let responses = collect_responses(&response_rx);
+
+ // Should have: SubagentSpawned, ToolResult (with parent), SubagentCompleted
+ let spawned: Vec<_> = responses
+ .iter()
+ .filter(|r| matches!(r, DaveApiResponse::SubagentSpawned(_)))
+ .collect();
+ assert_eq!(spawned.len(), 1);
+
+ let tool_results: Vec<_> = responses
+ .iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::ToolResult(t) => Some(t),
+ _ => None,
+ })
+ .collect();
+ assert_eq!(tool_results.len(), 1);
+ assert_eq!(tool_results[0].parent_task_id.as_deref(), Some("agent-42"));
+
+ let completed: Vec<_> = responses
+ .iter()
+ .filter(|r| matches!(r, DaveApiResponse::SubagentCompleted { .. }))
+ .collect();
+ assert_eq!(completed.len(), 1);
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_shutdown_during_stream() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "long task").await;
+ mock.handle_turn_start().await;
+
+ mock.send_delta("partial").await;
+
+ // Wait for token to arrive before sending Shutdown
+ let resp = response_rx
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for token");
+ assert!(
+ matches!(&resp, DaveApiResponse::Token(t) if t == "partial"),
+ "Expected Token(\"partial\")"
+ );
+
+ // Now shutdown while still inside the turn (no turn_completed sent)
+ command_tx.send(SessionCommand::Shutdown).await.unwrap();
+ handle.await.unwrap();
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_process_eof() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "hello").await;
+ mock.handle_turn_start().await;
+
+ mock.send_delta("partial").await;
+
+ // Drop the mock's writer — simulates process exit
+ drop(mock.writer);
+
+ // Actor should detect EOF and send a Failed response
+ let failed = response_rx
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for response after EOF");
+
+ // First response might be the token, keep reading
+ let mut got_failed = false;
+
+ match failed {
+ DaveApiResponse::Token(t) => {
+ assert_eq!(t, "partial");
+ }
+ DaveApiResponse::Failed(_) => got_failed = true,
+ _ => {}
+ }
+
+ if !got_failed {
+ let resp = response_rx
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for Failed after EOF");
+ match resp {
+ DaveApiResponse::Failed(msg) => {
+ assert!(
+ msg.contains("exited unexpectedly") || msg.contains("EOF"),
+ "Unexpected error message: {}",
+ msg
+ );
+ }
+ other => panic!(
+ "Expected Failed after EOF, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+ }
+
+ // Actor should exit after EOF
+ command_tx.send(SessionCommand::Shutdown).await.ok(); // might fail if actor already exited
+ handle.await.unwrap();
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_init_failure() {
+ // "stdout" channel: mock writes → actor reads
+ let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192);
+ // "stdin" channel: actor writes → mock reads
+ let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192);
+
+ let mut mock_reader = BufReader::new(mock_stdin_read).lines();
+ let mut mock_writer = tokio::io::BufWriter::new(mock_stdout_write);
+
+ let actor_writer = tokio::io::BufWriter::new(actor_stdin_write);
+ let actor_reader = BufReader::new(actor_stdout_read).lines();
+
+ let (command_tx, mut command_rx) = tokio_mpsc::channel(16);
+
+ let handle = tokio::spawn(async move {
+ session_actor_loop(
+ "test-session",
+ actor_writer,
+ actor_reader,
+ Some("mock-model"),
+ None,
+ None,
+ &mut command_rx,
+ )
+ .await;
+ });
+
+ // Read the initialize request
+ let line = mock_reader.next_line().await.unwrap().unwrap();
+ let req: RpcMessage = serde_json::from_str(&line).unwrap();
+ let id = req.id.unwrap();
+
+ // Send an error response
+ let error_resp = json!({
+ "id": id,
+ "error": { "code": -1, "message": "mock init failure" }
+ });
+ let mut error_line = serde_json::to_string(&error_resp).unwrap();
+ error_line.push('\n');
+ mock_writer.write_all(error_line.as_bytes()).await.unwrap();
+ mock_writer.flush().await.unwrap();
+
+ // The actor should drain commands with error. Send a query and a shutdown.
+ let (response_tx, response_rx) = mpsc::channel();
+ command_tx
+ .send(SessionCommand::Query {
+ prompt: "hello".to_string(),
+ response_tx,
+ ctx: egui::Context::default(),
+ })
+ .await
+ .unwrap();
+ command_tx.send(SessionCommand::Shutdown).await.unwrap();
+
+ handle.await.unwrap();
+
+ // The query should have received an error
+ let resp = response_rx
+ .recv_timeout(Duration::from_secs(5))
+ .expect("expected error response after init failure");
+ match resp {
+ DaveApiResponse::Failed(msg) => {
+ assert!(
+ msg.contains("init handshake"),
+ "Expected init handshake error, got: {}",
+ msg
+ );
+ }
+ other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
+ }
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_turn_error() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "hello").await;
+
+ // Read turn/start request and send an error response
+ let req = mock.read_message().await;
+ assert_eq!(req.method.as_deref(), Some("turn/start"));
+ let id = req.id.unwrap();
+ mock.send_line(&json!({
+ "id": id,
+ "error": { "code": -32000, "message": "rate limit exceeded" }
+ }))
+ .await;
+
+ // Give actor time to process
+ tokio::time::sleep(Duration::from_millis(100)).await;
+
+ command_tx.send(SessionCommand::Shutdown).await.unwrap();
+ handle.await.unwrap();
+
+ let responses = collect_responses(&response_rx);
+ let failures: Vec<_> = responses
+ .iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::Failed(msg) => Some(msg.clone()),
+ _ => None,
+ })
+ .collect();
+ assert_eq!(failures.len(), 1);
+ assert_eq!(failures[0], "rate limit exceeded");
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_file_change_approval() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "create file").await;
+ mock.handle_turn_start().await;
+
+ // File change approval request (create)
+ mock.send_approval_request(
+ 77,
+ "item/fileChange/requestApproval",
+ json!({
+ "filePath": "new_file.rs",
+ "diff": "+fn main() {}",
+ "kind": { "type": "create" }
+ }),
+ )
+ .await;
+
+ let resp = response_rx
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for PermissionRequest");
+ let pending = match resp {
+ DaveApiResponse::PermissionRequest(p) => p,
+ other => panic!(
+ "Expected PermissionRequest, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ };
+ // File create should map to "Write" tool
+ assert_eq!(pending.request.tool_name, "Write");
+
+ pending
+ .response_tx
+ .send(PermissionResponse::Allow { message: None })
+ .unwrap();
+
+ let approval_msg = mock.read_message().await;
+ assert_eq!(approval_msg.id, Some(77));
+ assert_eq!(approval_msg.result.unwrap()["decision"], "accept");
+
+ mock.send_turn_completed("completed").await;
+ drop(command_tx);
+ handle.await.unwrap();
+ }
+
+ /// Create a mock codex server with `resume_session_id` set, so the actor
+ /// sends `thread/resume` instead of `thread/start`.
+ fn setup_integration_test_with_resume(
+ resume_id: &str,
+ ) -> (
+ MockCodex,
+ tokio_mpsc::Sender<SessionCommand>,
+ tokio::task::JoinHandle<()>,
+ ) {
+ let (mock_stdout_write, actor_stdout_read) = tokio::io::duplex(8192);
+ let (actor_stdin_write, mock_stdin_read) = tokio::io::duplex(8192);
+
+ let mock = MockCodex {
+ reader: BufReader::new(mock_stdin_read).lines(),
+ writer: tokio::io::BufWriter::new(mock_stdout_write),
+ };
+
+ let actor_writer = tokio::io::BufWriter::new(actor_stdin_write);
+ let actor_reader = BufReader::new(actor_stdout_read).lines();
+
+ let (command_tx, mut command_rx) = tokio_mpsc::channel(16);
+ let resume_id = resume_id.to_string();
+
+ let handle = tokio::spawn(async move {
+ session_actor_loop(
+ "test-session-resume",
+ actor_writer,
+ actor_reader,
+ Some("mock-model"),
+ None,
+ Some(&resume_id),
+ &mut command_rx,
+ )
+ .await;
+ });
+
+ (mock, command_tx, handle)
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_interrupt_during_stream() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "count to 100").await;
+ mock.handle_turn_start().await;
+
+ // Send a few tokens
+ mock.send_delta("one ").await;
+ mock.send_delta("two ").await;
+
+ // Give actor time to process the tokens
+ tokio::time::sleep(Duration::from_millis(50)).await;
+
+ // Verify we got them
+ let tok1 = response_rx
+ .recv_timeout(Duration::from_secs(2))
+ .expect("expected token 1");
+ assert!(matches!(tok1, DaveApiResponse::Token(ref t) if t == "one "));
+
+ // Send interrupt
+ command_tx
+ .send(SessionCommand::Interrupt {
+ ctx: egui::Context::default(),
+ })
+ .await
+ .unwrap();
+
+ // The actor should send turn/interrupt to codex
+ let interrupt_msg = mock.read_message().await;
+ assert_eq!(interrupt_msg.method.as_deref(), Some("turn/interrupt"));
+
+ // Codex responds with turn/completed after interrupt
+ mock.send_turn_completed("interrupted").await;
+
+ // Actor should be ready for next command now
+ drop(command_tx);
+ handle.await.unwrap();
+
+ // Verify we got the tokens before interrupt
+ let responses = collect_responses(&response_rx);
+ let tokens: Vec<_> = responses
+ .iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::Token(t) => Some(t.clone()),
+ _ => None,
+ })
+ .collect();
+ assert!(tokens.contains(&"two ".to_string()));
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_interrupt_during_approval() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "run something").await;
+ mock.handle_turn_start().await;
+
+ // Send an approval request
+ mock.send_approval_request(
+ 50,
+ "item/commandExecution/requestApproval",
+ json!({ "command": "rm -rf /" }),
+ )
+ .await;
+
+ // Wait for the PermissionRequest to arrive at the test
+ let resp = response_rx
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for PermissionRequest");
+ match resp {
+ DaveApiResponse::PermissionRequest(_pending) => {
+ // Don't respond to the pending permission — send interrupt instead
+ }
+ other => panic!(
+ "Expected PermissionRequest, got {:?}",
+ std::mem::discriminant(&other)
+ ),
+ }
+
+ // Send interrupt while approval is pending
+ command_tx
+ .send(SessionCommand::Interrupt {
+ ctx: egui::Context::default(),
+ })
+ .await
+ .unwrap();
+
+ // Actor should send cancel for the approval
+ let cancel_msg = mock.read_message().await;
+ assert_eq!(cancel_msg.id, Some(50));
+ assert_eq!(cancel_msg.result.unwrap()["decision"], "cancel");
+
+ // Then send turn/interrupt
+ let interrupt_msg = mock.read_message().await;
+ assert_eq!(interrupt_msg.method.as_deref(), Some("turn/interrupt"));
+
+ // Codex completes the turn
+ mock.send_turn_completed("interrupted").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_query_during_active_turn() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx1 = send_query(&command_tx, "first query").await;
+ mock.handle_turn_start().await;
+
+ // Send some tokens so the turn is clearly active
+ mock.send_delta("working...").await;
+
+ // Give actor time to enter the streaming loop
+ tokio::time::sleep(Duration::from_millis(50)).await;
+
+ // Send a second query while the first is still active
+ let response_rx2 = send_query(&command_tx, "second query").await;
+
+ // The second query should be immediately rejected
+ let rejection = response_rx2
+ .recv_timeout(Duration::from_secs(5))
+ .expect("timed out waiting for rejection");
+ match rejection {
+ DaveApiResponse::Failed(msg) => {
+ assert_eq!(msg, "Query already in progress");
+ }
+ other => panic!("Expected Failed, got {:?}", std::mem::discriminant(&other)),
+ }
+
+ // First query continues normally
+ mock.send_turn_completed("completed").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+
+ // Verify first query got its token
+ let responses = collect_responses(&response_rx1);
+ let tokens: Vec<_> = responses
+ .iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::Token(t) => Some(t.clone()),
+ _ => None,
+ })
+ .collect();
+ assert!(tokens.contains(&"working...".to_string()));
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_thread_resume() {
+ let (mut mock, command_tx, handle) =
+ setup_integration_test_with_resume("existing-thread-42");
+
+ // Init handshake is the same
+ mock.handle_init().await;
+
+ // Actor should send thread/resume instead of thread/start
+ let req = mock.read_message().await;
+ assert_eq!(req.method.as_deref(), Some("thread/resume"));
+ let params = req.params.unwrap();
+ assert_eq!(params["threadId"], "existing-thread-42");
+
+ // Respond with success
+ let id = req.id.unwrap();
+ mock.send_line(&json!({
+ "id": id,
+ "result": { "thread": { "id": "existing-thread-42" } }
+ }))
+ .await;
+
+ // Now send a query — should work the same as normal
+ let response_rx = send_query(&command_tx, "resume prompt").await;
+ mock.handle_turn_start().await;
+ mock.send_delta("resumed!").await;
+ mock.send_turn_completed("completed").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+
+ let responses = collect_responses(&response_rx);
+ let tokens: Vec<_> = responses
+ .iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::Token(t) => Some(t.clone()),
+ _ => None,
+ })
+ .collect();
+ assert_eq!(tokens, vec!["resumed!"]);
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ async fn test_integration_malformed_jsonl() {
+ let (mut mock, command_tx, handle) = setup_integration_test();
+
+ mock.handle_init().await;
+ mock.handle_thread_start().await;
+
+ let response_rx = send_query(&command_tx, "test").await;
+ mock.handle_turn_start().await;
+
+ // Send valid token
+ mock.send_delta("before").await;
+
+ // Send garbage that isn't valid JSON
+ let mut garbage = "this is not json at all\n".to_string();
+ mock.writer.write_all(garbage.as_bytes()).await.unwrap();
+ mock.writer.flush().await.unwrap();
+
+ // Send another valid token after the garbage
+ mock.send_delta("after").await;
+
+ // Complete the turn
+ mock.send_turn_completed("completed").await;
+
+ drop(command_tx);
+ handle.await.unwrap();
+
+ // Both valid tokens should have been received — the garbage line
+ // should have been skipped with a warning, not crash the actor
+ let responses = collect_responses(&response_rx);
+ let tokens: Vec<_> = responses
+ .iter()
+ .filter_map(|r| match r {
+ DaveApiResponse::Token(t) => Some(t.clone()),
+ _ => None,
+ })
+ .collect();
+ assert!(
+ tokens.contains(&"before".to_string()),
+ "Missing 'before' token, got: {:?}",
+ tokens
+ );
+ assert!(
+ tokens.contains(&"after".to_string()),
+ "Missing 'after' token after malformed line, got: {:?}",
+ tokens
+ );
+ }
+
+ // -----------------------------------------------------------------------
+ // Real-binary integration tests — require `codex` on PATH
+ // Run with: cargo test -p notedeck_dave -- --ignored
+ // -----------------------------------------------------------------------
+
+ /// Helper: spawn a real codex app-server process and wire it into
+ /// `session_actor_loop`. Returns the command sender, response receiver,
+ /// and join handle.
+ fn setup_real_codex_test() -> (
+ tokio_mpsc::Sender<SessionCommand>,
+ mpsc::Receiver<DaveApiResponse>,
+ tokio::task::JoinHandle<()>,
+ ) {
+ let codex_binary = std::env::var("CODEX_BINARY").unwrap_or_else(|_| "codex".to_string());
+
+ let mut child = spawn_codex(&codex_binary, &None)
+ .expect("Failed to spawn codex app-server — is codex installed?");
+
+ let stdin = child.stdin.take().expect("stdin piped");
+ let stdout = child.stdout.take().expect("stdout piped");
+
+ // Drain stderr to prevent pipe deadlock
+ if let Some(stderr) = child.stderr.take() {
+ tokio::spawn(async move {
+ let mut lines = BufReader::new(stderr).lines();
+ while let Ok(Some(line)) = lines.next_line().await {
+ eprintln!("[codex stderr] {}", line);
+ }
+ });
+ }
+
+ let writer = tokio::io::BufWriter::new(stdin);
+ let reader = BufReader::new(stdout).lines();
+
+ let (command_tx, mut command_rx) = tokio_mpsc::channel(16);
+
+ let handle = tokio::spawn(async move {
+ session_actor_loop(
+ "real-codex-test",
+ writer,
+ reader,
+ None, // use codex default model
+ None, // use current directory
+ None, // no resume
+ &mut command_rx,
+ )
+ .await;
+ let _ = child.kill().await;
+ });
+
+ let (response_tx, response_rx) = mpsc::channel();
+ // Send an initial query to trigger handshake + thread start + turn
+ let command_tx_clone = command_tx.clone();
+ let rt_handle = tokio::runtime::Handle::current();
+ rt_handle.spawn(async move {
+ command_tx_clone
+ .send(SessionCommand::Query {
+ prompt: "Say exactly: hello world".to_string(),
+ response_tx,
+ ctx: egui::Context::default(),
+ })
+ .await
+ .unwrap();
+ });
+
+ (command_tx, response_rx, handle)
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ #[ignore] // Requires `codex` binary on PATH
+ async fn test_real_codex_streaming() {
+ let (command_tx, response_rx, handle) = setup_real_codex_test();
+
+ // Wait for at least one token (with a generous timeout for API calls)
+ let mut got_token = false;
+ let deadline = std::time::Instant::now() + Duration::from_secs(60);
+
+ while std::time::Instant::now() < deadline {
+ match response_rx.recv_timeout(Duration::from_secs(1)) {
+ Ok(DaveApiResponse::Token(t)) => {
+ eprintln!("[test] got token: {:?}", t);
+ got_token = true;
+ }
+ Ok(DaveApiResponse::PermissionRequest(pending)) => {
+ // Auto-accept any permission requests during test
+ eprintln!(
+ "[test] auto-accepting permission: {}",
+ pending.request.tool_name
+ );
+ let _ = pending
+ .response_tx
+ .send(PermissionResponse::Allow { message: None });
+ }
+ Ok(DaveApiResponse::Failed(msg)) => {
+ panic!("[test] codex turn failed: {}", msg);
+ }
+ Ok(other) => {
+ eprintln!("[test] got response: {:?}", std::mem::discriminant(&other));
+ }
+ Err(mpsc::RecvTimeoutError::Timeout) => {
+ if got_token {
+ break; // Got at least one token; stop waiting
+ }
+ }
+ Err(mpsc::RecvTimeoutError::Disconnected) => break,
+ }
+ }
+
+ assert!(
+ got_token,
+ "Expected at least one Token response from real codex"
+ );
+
+ drop(command_tx);
+ let _ = tokio::time::timeout(Duration::from_secs(10), handle).await;
+ }
+
+ #[tokio::test(flavor = "multi_thread", worker_threads = 2)]
+ #[ignore] // Requires `codex` binary on PATH
+ async fn test_real_codex_turn_completes() {
+ let (command_tx, response_rx, handle) = setup_real_codex_test();
+
+ // Wait for turn to complete
+ let mut got_turn_done = false;
+ let mut got_any_response = false;
+ let deadline = std::time::Instant::now() + Duration::from_secs(120);
+
+ while std::time::Instant::now() < deadline {
+ match response_rx.recv_timeout(Duration::from_secs(2)) {
+ Ok(DaveApiResponse::Token(_)) => {
+ got_any_response = true;
+ }
+ Ok(DaveApiResponse::PermissionRequest(pending)) => {
+ got_any_response = true;
+ let _ = pending
+ .response_tx
+ .send(PermissionResponse::Allow { message: None });
+ }
+ Ok(DaveApiResponse::Failed(msg)) => {
+ eprintln!("[test] turn failed: {}", msg);
+ // A failure is still a "completion" — codex responded
+ got_turn_done = true;
+ break;
+ }
+ Ok(_) => {
+ got_any_response = true;
+ }
+ Err(mpsc::RecvTimeoutError::Timeout) => {
+ if got_any_response {
+ // Responses have stopped coming — turn likely completed
+ // (turn/completed causes the actor to stop sending
+ // and wait for the next command)
+ got_turn_done = true;
+ break;
+ }
+ }
+ Err(mpsc::RecvTimeoutError::Disconnected) => {
+ got_turn_done = true;
+ break;
+ }
+ }
+ }
+
+ assert!(
+ got_turn_done,
+ "Expected real codex turn to complete within timeout"
+ );
+
+ drop(command_tx);
+ let _ = tokio::time::timeout(Duration::from_secs(10), handle).await;
+ }
+}
diff --git a/crates/notedeck_dave/src/backend/codex_protocol.rs b/crates/notedeck_dave/src/backend/codex_protocol.rs
@@ -0,0 +1,220 @@
+//! JSON-RPC serde types for the Codex app-server protocol.
+//!
+//! The Codex app-server (`codex app-server --listen stdio://`) communicates via
+//! JSONL (one JSON object per line) over stdin/stdout. It uses a JSON-RPC-like
+//! schema but does NOT include the `"jsonrpc":"2.0"` header.
+
+#![allow(dead_code)] // Protocol fields are defined for completeness; not all are read yet.
+
+use serde::{Deserialize, Serialize};
+use serde_json::Value;
+
+// ---------------------------------------------------------------------------
+// Generic JSON-RPC envelope
+// ---------------------------------------------------------------------------
+
+/// Outgoing request or notification (client → server).
+#[derive(Debug, Serialize)]
+pub struct RpcRequest<P: Serialize> {
+ /// Present for requests that expect a response; absent for notifications.
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub id: Option<u64>,
+ pub method: &'static str,
+ pub params: P,
+}
+
+/// Incoming message from the server. Could be a response, notification, or
+/// request (for bidirectional approval).
+#[derive(Debug, Deserialize)]
+pub struct RpcMessage {
+ /// Present on responses and server→client requests.
+ pub id: Option<u64>,
+ /// Present on notifications and server→client requests.
+ pub method: Option<String>,
+ /// Present on successful responses.
+ pub result: Option<Value>,
+ /// Present on error responses.
+ pub error: Option<RpcError>,
+ /// Present on notifications and server→client requests.
+ pub params: Option<Value>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct RpcError {
+ pub code: i64,
+ pub message: String,
+ pub data: Option<Value>,
+}
+
+// ---------------------------------------------------------------------------
+// Outgoing (client → server)
+// ---------------------------------------------------------------------------
+
+/// `initialize` params
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct InitializeParams {
+ pub client_info: ClientInfo,
+ pub capabilities: Value, // empty object for now
+}
+
+#[derive(Debug, Serialize)]
+pub struct ClientInfo {
+ pub name: String,
+ pub version: String,
+}
+
+/// `thread/start` params
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ThreadStartParams {
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub model: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub cwd: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub approval_policy: Option<String>,
+}
+
+/// `thread/resume` params
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ThreadResumeParams {
+ pub thread_id: String,
+}
+
+/// `turn/start` params
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TurnStartParams {
+ pub thread_id: String,
+ pub input: Vec<TurnInput>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub model: Option<String>,
+ #[serde(skip_serializing_if = "Option::is_none")]
+ pub effort: Option<String>,
+}
+
+#[derive(Debug, Serialize)]
+#[serde(tag = "type")]
+pub enum TurnInput {
+ #[serde(rename = "text")]
+ Text { text: String },
+}
+
+/// `turn/interrupt` params
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TurnInterruptParams {
+ pub thread_id: String,
+ pub turn_id: String,
+}
+
+/// Response to an approval request (client → server).
+#[derive(Debug, Serialize)]
+pub struct ApprovalResponse {
+ pub decision: ApprovalDecision,
+}
+
+#[derive(Debug, Serialize, Clone, Copy)]
+#[serde(rename_all = "lowercase")]
+pub enum ApprovalDecision {
+ Accept,
+ Decline,
+ Cancel,
+}
+
+// ---------------------------------------------------------------------------
+// Incoming (server → client)
+// ---------------------------------------------------------------------------
+
+/// Result of `thread/start`
+#[derive(Debug, Deserialize)]
+pub struct ThreadStartResult {
+ pub thread: ThreadInfo,
+ pub model: Option<String>,
+}
+
+#[derive(Debug, Deserialize)]
+pub struct ThreadInfo {
+ pub id: String,
+}
+
+/// `item/started` params
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ItemStartedParams {
+ /// The kind of item: "agentMessage", "commandExecution", "fileChange",
+ /// "collabAgentToolCall", "contextCompaction", etc.
+ #[serde(rename = "type")]
+ pub item_type: String,
+ /// Unique item ID
+ pub item_id: Option<String>,
+ /// For collabAgentToolCall: agent name/description
+ pub name: Option<String>,
+ /// For commandExecution: the command being run
+ pub command: Option<String>,
+ /// For fileChange: the file path
+ pub file_path: Option<String>,
+}
+
+/// `item/agentMessage/delta` params — a streaming text token
+#[derive(Debug, Deserialize)]
+pub struct AgentMessageDeltaParams {
+ pub delta: String,
+}
+
+/// `item/completed` params — an item has finished
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ItemCompletedParams {
+ #[serde(rename = "type")]
+ pub item_type: String,
+ pub item_id: Option<String>,
+ /// For commandExecution: the command that was run
+ pub command: Option<String>,
+ /// For commandExecution: exit code
+ pub exit_code: Option<i32>,
+ /// For commandExecution: stdout/stderr output
+ pub output: Option<String>,
+ /// For fileChange: the file path
+ pub file_path: Option<String>,
+ /// For fileChange: the diff
+ pub diff: Option<String>,
+ /// For fileChange: kind of change (create, edit, delete)
+ pub kind: Option<Value>,
+ /// For collabAgentToolCall: result text
+ pub result: Option<String>,
+ /// For contextCompaction: token info
+ pub pre_tokens: Option<u64>,
+ /// For agentMessage: full content
+ pub content: Option<String>,
+}
+
+/// `item/commandExecution/requestApproval` params — server asks client to approve a command
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct CommandApprovalParams {
+ pub command: String,
+ #[serde(default)]
+ pub cwd: Option<String>,
+}
+
+/// `item/fileChange/requestApproval` params — server asks client to approve a file change
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct FileChangeApprovalParams {
+ pub file_path: String,
+ pub diff: Option<String>,
+ pub kind: Option<Value>,
+}
+
+/// `turn/completed` params
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TurnCompletedParams {
+ /// "completed" or "failed"
+ pub status: String,
+ pub turn_id: Option<String>,
+ pub error: Option<String>,
+}
diff --git a/crates/notedeck_dave/src/backend/mod.rs b/crates/notedeck_dave/src/backend/mod.rs
@@ -1,4 +1,6 @@
mod claude;
+mod codex;
+mod codex_protocol;
mod openai;
mod remote;
mod session_info;
@@ -6,6 +8,7 @@ mod tool_summary;
mod traits;
pub use claude::ClaudeBackend;
+pub use codex::CodexBackend;
pub use openai::OpenAiBackend;
pub use remote::RemoteOnlyBackend;
pub use traits::{AiBackend, BackendType};
diff --git a/crates/notedeck_dave/src/backend/traits.rs b/crates/notedeck_dave/src/backend/traits.rs
@@ -11,6 +11,7 @@ use std::sync::Arc;
pub enum BackendType {
OpenAI,
Claude,
+ Codex,
/// No local AI — only view/control remote agentic sessions from ndb
Remote,
}
diff --git a/crates/notedeck_dave/src/config.rs b/crates/notedeck_dave/src/config.rs
@@ -1,6 +1,14 @@
use crate::backend::BackendType;
use async_openai::config::OpenAIConfig;
use serde::{Deserialize, Serialize};
+use std::env;
+
+/// Check if a binary exists on the system PATH.
+fn has_binary_on_path(binary: &str) -> bool {
+ env::var_os("PATH")
+ .map(|paths| env::split_paths(&paths).any(|dir| dir.join(binary).is_file()))
+ .unwrap_or(false)
+}
/// AI interaction mode - determines UI complexity and feature set
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -18,13 +26,15 @@ pub enum AiProvider {
OpenAI,
Anthropic,
Ollama,
+ Codex,
}
impl AiProvider {
- pub const ALL: [AiProvider; 3] = [
+ pub const ALL: [AiProvider; 4] = [
AiProvider::OpenAI,
AiProvider::Anthropic,
AiProvider::Ollama,
+ AiProvider::Codex,
];
pub fn name(&self) -> &'static str {
@@ -32,6 +42,7 @@ impl AiProvider {
AiProvider::OpenAI => "OpenAI",
AiProvider::Anthropic => "Anthropic",
AiProvider::Ollama => "Ollama",
+ AiProvider::Codex => "Codex",
}
}
@@ -40,12 +51,13 @@ impl AiProvider {
AiProvider::OpenAI => "gpt-5.2",
AiProvider::Anthropic => "claude-sonnet-4-20250514",
AiProvider::Ollama => "hhao/qwen2.5-coder-tools:latest",
+ AiProvider::Codex => "gpt-5.2-codex",
}
}
pub fn default_endpoint(&self) -> Option<&'static str> {
match self {
- AiProvider::OpenAI => None,
+ AiProvider::OpenAI | AiProvider::Codex => None,
AiProvider::Anthropic => Some("https://api.anthropic.com/v1"),
AiProvider::Ollama => Some("http://localhost:11434/v1"),
}
@@ -54,7 +66,7 @@ impl AiProvider {
pub fn requires_api_key(&self) -> bool {
match self {
AiProvider::OpenAI | AiProvider::Anthropic => true,
- AiProvider::Ollama => false,
+ AiProvider::Ollama | AiProvider::Codex => false,
}
}
@@ -73,6 +85,13 @@ impl AiProvider {
"mistral:latest",
"codellama:latest",
],
+ AiProvider::Codex => &[
+ "gpt-5.3-codex",
+ "gpt-5.2-codex",
+ "gpt-5-codex",
+ "gpt-5-codex-mini",
+ "codex-mini-latest",
+ ],
}
}
}
@@ -116,6 +135,7 @@ impl DaveSettings {
let provider = match config.backend {
BackendType::OpenAI | BackendType::Remote => AiProvider::OpenAI,
BackendType::Claude => AiProvider::Anthropic,
+ BackendType::Codex => AiProvider::Codex,
};
let api_key = match provider {
@@ -179,6 +199,7 @@ impl Default for ModelConfig {
match backend_str.to_lowercase().as_str() {
"claude" | "anthropic" => BackendType::Claude,
"openai" => BackendType::OpenAI,
+ "codex" => BackendType::Codex,
_ => {
tracing::warn!(
"Unknown DAVE_BACKEND value: {}, defaulting to OpenAI",
@@ -188,10 +209,13 @@ impl Default for ModelConfig {
}
}
} else {
- // Auto-detect: prefer Claude if key is available, otherwise OpenAI
- // (with trial key fallback). Remote is only for controlling
- // agentic sessions discovered from relays, not the default mode.
- if anthropic_api_key.is_some() {
+ // Auto-detect: prefer agentic backends if their CLI binary is on PATH,
+ // then fall back to API-key detection, then OpenAI (with trial key).
+ if has_binary_on_path("claude") {
+ BackendType::Claude
+ } else if has_binary_on_path("codex") {
+ BackendType::Codex
+ } else if anthropic_api_key.is_some() {
BackendType::Claude
} else {
BackendType::OpenAI
@@ -211,6 +235,7 @@ impl Default for ModelConfig {
.unwrap_or_else(|| match backend {
BackendType::OpenAI => "gpt-4.1-mini".to_string(),
BackendType::Claude => "claude-sonnet-4.5".to_string(),
+ BackendType::Codex => "gpt-5.2-codex".to_string(),
BackendType::Remote => String::new(),
});
@@ -229,7 +254,7 @@ impl Default for ModelConfig {
impl ModelConfig {
pub fn ai_mode(&self) -> AiMode {
match self.backend {
- BackendType::Claude => AiMode::Agentic,
+ BackendType::Claude | BackendType::Codex => AiMode::Agentic,
BackendType::OpenAI | BackendType::Remote => AiMode::Chat,
}
}
@@ -267,6 +292,7 @@ impl ModelConfig {
let backend = match settings.provider {
AiProvider::OpenAI | AiProvider::Ollama => BackendType::OpenAI,
AiProvider::Anthropic => BackendType::Claude,
+ AiProvider::Codex => BackendType::Codex,
};
let anthropic_api_key = if settings.provider == AiProvider::Anthropic {
diff --git a/crates/notedeck_dave/src/file_update.rs b/crates/notedeck_dave/src/file_update.rs
@@ -20,6 +20,8 @@ pub enum FileUpdateType {
},
/// Write: create/overwrite entire file
Write { content: String },
+ /// Unified diff from an external tool (e.g. Codex)
+ UnifiedDiff { diff: String },
}
/// A single line in a diff
@@ -132,7 +134,7 @@ impl FileUpdate {
}
deleted_lines <= max_lines && inserted_lines <= max_lines
}
- FileUpdateType::Write { .. } => false,
+ FileUpdateType::Write { .. } | FileUpdateType::UnifiedDiff { .. } => false,
}
}
@@ -220,6 +222,37 @@ impl FileUpdate {
})
.collect()
}
+ FileUpdateType::UnifiedDiff { diff } => {
+ // Parse unified diff format: lines starting with '+'/'-'/' '
+ // Skip header lines (---/+++/@@ lines)
+ diff.lines()
+ .filter(|line| {
+ !line.starts_with("---")
+ && !line.starts_with("+++")
+ && !line.starts_with("@@")
+ })
+ .map(|line| {
+ if let Some(rest) = line.strip_prefix('+') {
+ DiffLine {
+ tag: DiffTag::Insert,
+ content: format!("{}\n", rest),
+ }
+ } else if let Some(rest) = line.strip_prefix('-') {
+ DiffLine {
+ tag: DiffTag::Delete,
+ content: format!("{}\n", rest),
+ }
+ } else {
+ // Context line (starts with ' ' or is bare)
+ let content = line.strip_prefix(' ').unwrap_or(line);
+ DiffLine {
+ tag: DiffTag::Equal,
+ content: format!("{}\n", content),
+ }
+ }
+ })
+ .collect()
+ }
}
}
}
@@ -410,6 +443,94 @@ mod tests {
);
}
+ // -----------------------------------------------------------------------
+ // UnifiedDiff tests
+ // -----------------------------------------------------------------------
+
+ #[test]
+ fn test_unified_diff_basic() {
+ let update = FileUpdate::new(
+ "test.rs".to_string(),
+ FileUpdateType::UnifiedDiff {
+ diff: "--- a/test.rs\n+++ b/test.rs\n@@ -1,3 +1,3 @@\n context\n-old line\n+new line\n more context\n"
+ .to_string(),
+ },
+ );
+ let lines = FileUpdate::compute_diff_for(&update.update_type);
+ assert_eq!(lines.len(), 4);
+ assert_eq!(lines[0].tag, DiffTag::Equal);
+ assert_eq!(lines[0].content, "context\n");
+ assert_eq!(lines[1].tag, DiffTag::Delete);
+ assert_eq!(lines[1].content, "old line\n");
+ assert_eq!(lines[2].tag, DiffTag::Insert);
+ assert_eq!(lines[2].content, "new line\n");
+ assert_eq!(lines[3].tag, DiffTag::Equal);
+ assert_eq!(lines[3].content, "more context\n");
+ }
+
+ #[test]
+ fn test_unified_diff_skips_headers() {
+ let update = FileUpdate::new(
+ "test.rs".to_string(),
+ FileUpdateType::UnifiedDiff {
+ diff: "--- a/old.rs\n+++ b/new.rs\n@@ -10,4 +10,4 @@\n+added\n".to_string(),
+ },
+ );
+ let lines = FileUpdate::compute_diff_for(&update.update_type);
+ assert_eq!(lines.len(), 1);
+ assert_eq!(lines[0].tag, DiffTag::Insert);
+ assert_eq!(lines[0].content, "added\n");
+ }
+
+ #[test]
+ fn test_unified_diff_delete_only() {
+ let update = FileUpdate::new(
+ "test.rs".to_string(),
+ FileUpdateType::UnifiedDiff {
+ diff: "-removed line 1\n-removed line 2\n".to_string(),
+ },
+ );
+ let lines = FileUpdate::compute_diff_for(&update.update_type);
+ assert_eq!(lines.len(), 2);
+ assert!(lines.iter().all(|l| l.tag == DiffTag::Delete));
+ }
+
+ #[test]
+ fn test_unified_diff_insert_only() {
+ let update = FileUpdate::new(
+ "test.rs".to_string(),
+ FileUpdateType::UnifiedDiff {
+ diff: "+new line 1\n+new line 2\n+new line 3\n".to_string(),
+ },
+ );
+ let lines = FileUpdate::compute_diff_for(&update.update_type);
+ assert_eq!(lines.len(), 3);
+ assert!(lines.iter().all(|l| l.tag == DiffTag::Insert));
+ }
+
+ #[test]
+ fn test_unified_diff_empty() {
+ let update = FileUpdate::new(
+ "test.rs".to_string(),
+ FileUpdateType::UnifiedDiff {
+ diff: String::new(),
+ },
+ );
+ let lines = FileUpdate::compute_diff_for(&update.update_type);
+ assert!(lines.is_empty());
+ }
+
+ #[test]
+ fn test_unified_diff_is_never_small_edit() {
+ let update = FileUpdate::new(
+ "test.rs".to_string(),
+ FileUpdateType::UnifiedDiff {
+ diff: "+x\n".to_string(),
+ },
+ );
+ assert!(!update.is_small_edit(100));
+ }
+
#[test]
fn test_line_count_behavior() {
// Document how lines().count() behaves
diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs
@@ -25,7 +25,9 @@ mod update;
mod vec3;
use agent_status::AgentStatus;
-use backend::{AiBackend, BackendType, ClaudeBackend, OpenAiBackend, RemoteOnlyBackend};
+use backend::{
+ AiBackend, BackendType, ClaudeBackend, CodexBackend, OpenAiBackend, RemoteOnlyBackend,
+};
use chrono::{Duration, Local};
use egui_wgpu::RenderState;
use enostr::KeypairUnowned;
@@ -361,6 +363,9 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
.expect("Claude backend requires ANTHROPIC_API_KEY or CLAUDE_API_KEY");
Box::new(ClaudeBackend::new(api_key.clone()))
}
+ BackendType::Codex => Box::new(CodexBackend::new(
+ std::env::var("CODEX_BINARY").unwrap_or_else(|_| "codex".to_string()),
+ )),
BackendType::Remote => Box::new(RemoteOnlyBackend),
};
@@ -856,6 +861,17 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
}
session.chat.push(Message::CompactionComplete(info));
}
+
+ DaveApiResponse::QueryComplete(info) => {
+ if let Some(agentic) = &mut session.agentic {
+ agentic.usage.input_tokens = info.input_tokens;
+ agentic.usage.output_tokens = info.output_tokens;
+ agentic.usage.num_turns = info.num_turns;
+ if let Some(cost) = info.cost_usd {
+ agentic.usage.cost_usd = Some(cost);
+ }
+ }
+ }
}
}
@@ -1494,16 +1510,18 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
);
session.chat = loaded.messages;
- // Determine if this is a remote session (cwd doesn't exist locally)
- let cwd = std::path::PathBuf::from(&state.cwd);
- if !cwd.exists() {
+ // Determine if this is a remote session: hostname mismatch
+ // is the primary signal, with cwd non-existence as fallback
+ // for old events that may lack a hostname.
+ if (!state.hostname.is_empty() && state.hostname != self.hostname)
+ || (state.hostname.is_empty() && !std::path::PathBuf::from(&state.cwd).exists())
+ {
session.source = session::SessionSource::Remote;
}
- let is_remote = session.is_remote();
// Local sessions use the current machine's hostname;
// remote sessions use what was stored in the event.
- session.details.hostname = if is_remote {
+ session.details.hostname = if session.is_remote() {
state.hostname.clone()
} else {
self.hostname.clone()
@@ -1707,9 +1725,12 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
session.chat = loaded.messages;
}
- // Determine if this is a remote session
- let cwd_path = std::path::PathBuf::from(&state.cwd);
- if !cwd_path.exists() {
+ // Determine if this is a remote session: hostname mismatch
+ // is the primary signal, with cwd non-existence as fallback
+ // for old events that may lack a hostname.
+ if (!state.hostname.is_empty() && state.hostname != self.hostname)
+ || (state.hostname.is_empty() && !std::path::PathBuf::from(&state.cwd).exists())
+ {
session.source = session::SessionSource::Remote;
}
diff --git a/crates/notedeck_dave/src/messages.rs b/crates/notedeck_dave/src/messages.rs
@@ -328,6 +328,27 @@ pub struct CompactionInfo {
pub pre_tokens: u64,
}
+/// Usage metrics from a completed query's Result message
+#[derive(Debug, Clone, Default)]
+pub struct UsageInfo {
+ pub input_tokens: u64,
+ pub output_tokens: u64,
+ pub cost_usd: Option<f64>,
+ pub num_turns: u32,
+}
+
+impl UsageInfo {
+ pub fn total_tokens(&self) -> u64 {
+ self.input_tokens + self.output_tokens
+ }
+}
+
+/// Get context window size for a model name.
+/// All current Claude models have 200K context.
+pub fn context_window_for_model(_model: Option<&str>) -> u64 {
+ 200_000
+}
+
/// The ai backends response. Since we are using streaming APIs these are
/// represented as individual tokens or tool calls
pub enum DaveApiResponse {
@@ -356,6 +377,8 @@ pub enum DaveApiResponse {
CompactionStarted,
/// Conversation compaction completed with token info
CompactionComplete(CompactionInfo),
+ /// Query completed with usage metrics
+ QueryComplete(UsageInfo),
}
impl Message {
diff --git a/crates/notedeck_dave/src/session.rs b/crates/notedeck_dave/src/session.rs
@@ -210,6 +210,8 @@ pub struct AgenticSessionData {
pub seen_note_ids: HashSet<[u8; 32]>,
/// Tracks the "Compact & Approve" lifecycle.
pub compact_and_proceed: CompactAndProceedState,
+ /// Accumulated usage metrics across queries in this session.
+ pub usage: crate::messages::UsageInfo,
}
impl AgenticSessionData {
@@ -243,6 +245,7 @@ impl AgenticSessionData {
live_conversation_sub: None,
seen_note_ids: HashSet::new(),
compact_and_proceed: CompactAndProceedState::Idle,
+ usage: Default::default(),
}
}
diff --git a/crates/notedeck_dave/src/session_loader.rs b/crates/notedeck_dave/src/session_loader.rs
@@ -115,8 +115,14 @@ pub fn load_session_messages(ndb: &Ndb, txn: &Transaction, session_id: &str) ->
.filter_map(|qr| ndb.get_note_by_key(txn, qr.note_key).ok())
.collect();
- // Sort by created_at (chronological order)
- notes.sort_by_key(|note| note.created_at());
+ // Sort by created_at first, then by seq tag as tiebreaker for events
+ // within the same second (seq is per-session, not globally ordered)
+ notes.sort_by_key(|note| {
+ let seq = get_tag_value(note, "seq")
+ .and_then(|s| s.parse::<u32>().ok())
+ .unwrap_or(0);
+ (note.created_at(), seq)
+ });
let event_count = notes.len() as u32;
let note_ids: HashSet<[u8; 32]> = notes.iter().map(|n| *n.id()).collect();
diff --git a/crates/notedeck_dave/src/ui/dave.rs b/crates/notedeck_dave/src/ui/dave.rs
@@ -63,6 +63,10 @@ pub struct DaveUi<'a> {
/// Color for the notification dot on the mobile hamburger icon,
/// derived from FocusPriority of the next focus queue entry.
status_dot_color: Option<egui::Color32>,
+ /// Usage metrics for the current session (tokens, cost)
+ usage: Option<&'a crate::messages::UsageInfo>,
+ /// Context window size for the current model
+ context_window: u64,
}
/// The response the app generates. The response contains an optional
@@ -173,6 +177,8 @@ impl<'a> DaveUi<'a> {
git_status: None,
details: None,
status_dot_color: None,
+ usage: None,
+ context_window: crate::messages::context_window_for_model(None),
}
}
@@ -248,6 +254,12 @@ impl<'a> DaveUi<'a> {
self
}
+ pub fn usage(mut self, usage: &'a crate::messages::UsageInfo, model: Option<&str>) -> Self {
+ self.usage = Some(usage);
+ self.context_window = crate::messages::context_window_for_model(model);
+ self
+ }
+
fn chat_margin(&self, ctx: &egui::Context) -> i8 {
if self.flags.contains(DaveUiFlags::Compact) || notedeck::ui::is_narrow(ctx) {
8
@@ -348,6 +360,8 @@ impl<'a> DaveUi<'a> {
is_agentic,
plan_mode_active,
auto_steal_focus,
+ self.usage,
+ self.context_window,
ui,
)
})
@@ -1305,6 +1319,8 @@ fn status_bar_ui(
is_agentic: bool,
plan_mode_active: bool,
auto_steal_focus: bool,
+ usage: Option<&crate::messages::UsageInfo>,
+ context_window: u64,
ui: &mut egui::Ui,
) -> Option<DaveAction> {
let snapshot = git_status
@@ -1319,19 +1335,25 @@ fn status_bar_ui(
if let Some(git_status) = git_status.as_deref_mut() {
git_status_ui::git_status_content_ui(git_status, &snapshot, ui);
- // Right-aligned section: badges then refresh
+ // Right-aligned section: usage bar, badges, then refresh
ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| {
- if is_agentic {
+ let badge_action = if is_agentic {
toggle_badges_ui(ui, plan_mode_active, auto_steal_focus)
} else {
None
+ };
+ if is_agentic {
+ usage_bar_ui(usage, context_window, ui);
}
+ badge_action
})
.inner
} else if is_agentic {
- // No git status (remote session) - just show badges
+ // No git status (remote session) - just show badges and usage
ui.with_layout(egui::Layout::right_to_left(egui::Align::Center), |ui| {
- toggle_badges_ui(ui, plan_mode_active, auto_steal_focus)
+ let badge_action = toggle_badges_ui(ui, plan_mode_active, auto_steal_focus);
+ usage_bar_ui(usage, context_window, ui);
+ badge_action
})
.inner
} else {
@@ -1349,6 +1371,78 @@ fn status_bar_ui(
.inner
}
+/// Format a token count in a compact human-readable form (e.g. "45K", "1.2M")
+fn format_tokens(tokens: u64) -> String {
+ if tokens >= 1_000_000 {
+ format!("{:.1}M", tokens as f64 / 1_000_000.0)
+ } else if tokens >= 1_000 {
+ format!("{}K", tokens / 1_000)
+ } else {
+ tokens.to_string()
+ }
+}
+
+/// Renders the usage fill bar showing context window consumption.
+fn usage_bar_ui(
+ usage: Option<&crate::messages::UsageInfo>,
+ context_window: u64,
+ ui: &mut egui::Ui,
+) {
+ let total = usage.map(|u| u.total_tokens()).unwrap_or(0);
+ if total == 0 {
+ return;
+ }
+ let usage = usage.unwrap();
+ let fraction = (total as f64 / context_window as f64).min(1.0) as f32;
+
+ // Color based on fill level: green → yellow → red
+ let bar_color = if fraction < 0.5 {
+ egui::Color32::from_rgb(100, 180, 100)
+ } else if fraction < 0.8 {
+ egui::Color32::from_rgb(200, 180, 60)
+ } else {
+ egui::Color32::from_rgb(200, 80, 80)
+ };
+
+ let weak = ui.visuals().weak_text_color();
+
+ // Cost label
+ if let Some(cost) = usage.cost_usd {
+ if cost > 0.0 {
+ ui.add(egui::Label::new(
+ egui::RichText::new(format!("${:.2}", cost))
+ .size(10.0)
+ .color(weak),
+ ));
+ }
+ }
+
+ // Token count label
+ ui.add(egui::Label::new(
+ egui::RichText::new(format!(
+ "{} / {}",
+ format_tokens(total),
+ format_tokens(context_window)
+ ))
+ .size(10.0)
+ .color(weak),
+ ));
+
+ // Fill bar
+ let bar_width = 60.0;
+ let bar_height = 8.0;
+ let (rect, _) = ui.allocate_exact_size(egui::vec2(bar_width, bar_height), egui::Sense::hover());
+ let painter = ui.painter_at(rect);
+
+ // Background
+ painter.rect_filled(rect, 3.0, ui.visuals().faint_bg_color);
+
+ // Fill
+ let fill_rect =
+ egui::Rect::from_min_size(rect.min, egui::vec2(bar_width * fraction, bar_height));
+ painter.rect_filled(fill_rect, 3.0, bar_color);
+}
+
/// Render clickable PLAN and AUTO toggle badges. Returns an action if clicked.
fn toggle_badges_ui(
ui: &mut egui::Ui,
diff --git a/crates/notedeck_dave/src/ui/diff.rs b/crates/notedeck_dave/src/ui/diff.rs
@@ -218,6 +218,7 @@ pub fn file_path_header(update: &FileUpdate, ui: &mut Ui) {
let type_label = match &update.update_type {
FileUpdateType::Edit { .. } => "Edit",
FileUpdateType::Write { .. } => "Write",
+ FileUpdateType::UnifiedDiff { .. } => "Diff",
};
ui.label(RichText::new(type_label).strong());
diff --git a/crates/notedeck_dave/src/ui/mod.rs b/crates/notedeck_dave/src/ui/mod.rs
@@ -67,11 +67,16 @@ fn build_dave_ui<'a>(
.details(&session.details);
if let Some(agentic) = &mut session.agentic {
+ let model = agentic
+ .session_info
+ .as_ref()
+ .and_then(|si| si.model.as_deref());
ui_builder = ui_builder
.permission_message_state(agentic.permission_message_state)
.question_answers(&mut agentic.question_answers)
.question_index(&mut agentic.question_index)
- .is_compacting(agentic.is_compacting);
+ .is_compacting(agentic.is_compacting)
+ .usage(&agentic.usage, model);
// Only show git status for local sessions
if !is_remote {
diff --git a/crates/notedeck_nostrverse/src/convert.rs b/crates/notedeck_nostrverse/src/convert.rs
@@ -1,7 +1,7 @@
//! Convert protoverse Space AST to renderer room state.
use crate::room_state::{ObjectLocation, Room, RoomObject, RoomObjectType, RoomShape};
-use glam::Vec3;
+use glam::{Quat, Vec3};
use protoverse::{Attribute, Cell, CellId, CellType, Location, ObjectType, Shape, Space};
/// Convert a parsed protoverse Space into a Room and its objects.
@@ -92,6 +92,17 @@ fn collect_objects(space: &Space, id: CellId, objects: &mut Vec<RoomObject>) {
let model_url = space.model_url(id).map(|s| s.to_string());
let location = space.location(id).map(location_from_protoverse);
+ let rotation = space
+ .rotation(id)
+ .map(|(x, y, z)| {
+ Quat::from_euler(
+ glam::EulerRot::YXZ,
+ (y as f32).to_radians(),
+ (x as f32).to_radians(),
+ (z as f32).to_radians(),
+ )
+ })
+ .unwrap_or(Quat::IDENTITY);
let mut obj = RoomObject::new(obj_id, name, position)
.with_object_type(object_type_from_cell(obj_type));
@@ -101,6 +112,7 @@ fn collect_objects(space: &Space, id: CellId, objects: &mut Vec<RoomObject>) {
if let Some(loc) = location {
obj = obj.with_location(loc);
}
+ obj.rotation = rotation;
objects.push(obj);
}
@@ -180,6 +192,15 @@ pub fn build_space(room: &Room, objects: &[RoomObject]) -> Space {
pos.y as f64,
pos.z as f64,
));
+ // Only emit rotation when non-identity to keep output clean
+ if obj.rotation.angle_between(Quat::IDENTITY) > 1e-4 {
+ let (y, x, z) = obj.rotation.to_euler(glam::EulerRot::YXZ);
+ attributes.push(Attribute::Rotation(
+ x.to_degrees() as f64,
+ y.to_degrees() as f64,
+ z.to_degrees() as f64,
+ ));
+ }
let obj_attr_count = (attributes.len() as u32 - obj_attr_start) as u16;
let obj_type = CellType::Object(match &obj.object_type {
diff --git a/crates/notedeck_nostrverse/src/lib.rs b/crates/notedeck_nostrverse/src/lib.rs
@@ -172,10 +172,8 @@ impl NostrverseApp {
let builder = nostr_events::build_room_event(&space, &self.state.room_ref.id);
nostr_events::ingest_event(builder, ctx.ndb, kp);
}
-
- // Re-load now that we've ingested the demo
- let txn = nostrdb::Transaction::new(ctx.ndb).expect("txn");
- self.load_room_from_ndb(ctx.ndb, &txn);
+ // room_sub (set up above) will pick up the ingested event
+ // on the next poll_room_updates() frame.
}
// Add self user
@@ -657,6 +655,17 @@ impl NostrverseApp {
}
}
NostrverseAction::SelectObject(selected) => {
+ // Update renderer outline highlight
+ if let Some(renderer) = &self.renderer {
+ let scene_id = selected.as_ref().and_then(|sel_id| {
+ self.state
+ .objects
+ .iter()
+ .find(|o| &o.id == sel_id)
+ .and_then(|o| o.scene_object_id)
+ });
+ renderer.renderer.lock().unwrap().set_selected(scene_id);
+ }
self.state.selected_object = selected;
}
NostrverseAction::SaveRoom => {
@@ -671,8 +680,33 @@ impl NostrverseApp {
self.state.objects.retain(|o| o.id != id);
if self.state.selected_object.as_ref() == Some(&id) {
self.state.selected_object = None;
+ if let Some(renderer) = &self.renderer {
+ renderer.renderer.lock().unwrap().set_selected(None);
+ }
+ }
+ self.state.dirty = true;
+ }
+ NostrverseAction::RotateObject { id, rotation } => {
+ if let Some(obj) = self.state.get_object_mut(&id) {
+ obj.rotation = rotation;
+ self.state.dirty = true;
}
+ }
+ NostrverseAction::DuplicateObject(id) => {
+ let Some(src) = self.state.objects.iter().find(|o| o.id == id).cloned() else {
+ return;
+ };
+ let new_id = format!("{}-copy-{}", src.id, self.state.objects.len());
+ let mut dup = src;
+ dup.id = new_id.clone();
+ dup.name = format!("{} (copy)", dup.name);
+ dup.position.x += 0.5;
+ // Clear scene node — sync_scene will create a new one.
+ // Keep model_handle: it's a shared ref to loaded GPU data.
+ dup.scene_object_id = None;
+ self.state.objects.push(dup);
self.state.dirty = true;
+ self.state.selected_object = Some(new_id);
}
}
}
diff --git a/crates/notedeck_nostrverse/src/room_state.rs b/crates/notedeck_nostrverse/src/room_state.rs
@@ -2,7 +2,7 @@
use enostr::Pubkey;
use glam::{Quat, Vec3};
-use renderbud::{Model, ObjectId};
+use renderbud::{Aabb, Model, ObjectId};
/// Actions that can be triggered from the nostrverse view
#[derive(Clone, Debug)]
@@ -17,6 +17,10 @@ pub enum NostrverseAction {
AddObject(RoomObject),
/// An object was removed
RemoveObject(String),
+ /// Duplicate the selected object
+ DuplicateObject(String),
+ /// Object was rotated (id, new rotation)
+ RotateObject { id: String, rotation: Quat },
}
/// Reference to a nostrverse room
@@ -204,6 +208,33 @@ impl RoomUser {
}
}
+/// How a drag interaction is constrained
+#[derive(Clone, Debug)]
+pub enum DragMode {
+ /// Free object: drag on world-space Y plane
+ Free,
+ /// Parented object: slide on parent surface, may break away
+ Parented {
+ parent_id: String,
+ parent_scene_id: ObjectId,
+ parent_aabb: Aabb,
+ /// Local Y where child sits (e.g. parent top + child half height)
+ local_y: f32,
+ },
+}
+
+/// State for an active object drag in the 3D viewport
+pub struct DragState {
+ /// ID of the object being dragged
+ pub object_id: String,
+ /// Offset from object position to the initial grab point
+ pub grab_offset: Vec3,
+ /// Y height of the drag constraint plane
+ pub plane_y: f32,
+ /// Drag constraint mode
+ pub mode: DragMode,
+}
+
/// State for a nostrverse view
pub struct NostrverseState {
/// Reference to the room being viewed
@@ -222,6 +253,20 @@ pub struct NostrverseState {
pub smooth_avatar_yaw: f32,
/// Room has unsaved edits
pub dirty: bool,
+ /// Active drag state for viewport object manipulation
+ pub drag_state: Option<DragState>,
+ /// Grid snap size in meters
+ pub grid_snap: f32,
+ /// Whether grid snapping is enabled
+ pub grid_snap_enabled: bool,
+ /// Whether rotate mode is active (R key toggle)
+ pub rotate_mode: bool,
+ /// Whether the current drag is a rotation drag (started on an object in rotate mode)
+ pub rotate_drag: bool,
+ /// Rotation snap increment in degrees (used when grid snap is enabled)
+ pub rotation_snap: f32,
+ /// Cached serialized scene text (avoids re-serializing every frame)
+ pub cached_scene_text: String,
}
impl NostrverseState {
@@ -235,6 +280,13 @@ impl NostrverseState {
edit_mode: true,
smooth_avatar_yaw: 0.0,
dirty: false,
+ drag_state: None,
+ grid_snap: 0.5,
+ grid_snap_enabled: false,
+ rotate_mode: false,
+ rotate_drag: false,
+ rotation_snap: 15.0,
+ cached_scene_text: String::new(),
}
}
diff --git a/crates/notedeck_nostrverse/src/room_view.rs b/crates/notedeck_nostrverse/src/room_view.rs
@@ -1,10 +1,15 @@
//! Room 3D rendering and editing UI for nostrverse via renderbud
use egui::{Color32, Pos2, Rect, Response, Sense, Ui};
-use glam::Vec3;
+use glam::{Quat, Vec3};
use super::convert;
-use super::room_state::{NostrverseAction, NostrverseState, RoomObject, RoomShape};
+use super::room_state::{
+ DragMode, DragState, NostrverseAction, NostrverseState, ObjectLocation, RoomObject, RoomShape,
+};
+
+/// Radians of Y rotation per pixel of horizontal drag
+const ROTATE_SENSITIVITY: f32 = 0.01;
/// Response from rendering the nostrverse view
pub struct NostrverseResponse {
@@ -12,6 +17,158 @@ pub struct NostrverseResponse {
pub action: Option<NostrverseAction>,
}
+fn snap_to_grid(pos: Vec3, grid: f32) -> Vec3 {
+ Vec3::new(
+ (pos.x / grid).round() * grid,
+ pos.y,
+ (pos.z / grid).round() * grid,
+ )
+}
+
+/// Result of computing a drag update — fully owned, no borrows into state.
+enum DragUpdate {
+ Move {
+ id: String,
+ position: Vec3,
+ },
+ Breakaway {
+ id: String,
+ world_pos: Vec3,
+ new_grab_offset: Vec3,
+ new_plane_y: f32,
+ },
+ SnapToParent {
+ id: String,
+ parent_id: String,
+ parent_scene_id: renderbud::ObjectId,
+ parent_aabb: renderbud::Aabb,
+ local_pos: Vec3,
+ local_y: f32,
+ plane_y: f32,
+ new_grab_offset: Vec3,
+ },
+}
+
+/// During a free drag, check if the world position lands on another object's
+/// top surface. Returns snap info if a suitable parent is found.
+/// Takes viewport coords to re-unproject onto the new drag plane for a
+/// smooth grab-offset transition.
+fn find_snap_parent(
+ world_pos: Vec3,
+ drag_id: &str,
+ child_half_h: f32,
+ vp_x: f32,
+ vp_y: f32,
+ objects: &[RoomObject],
+ r: &renderbud::Renderer,
+) -> Option<DragUpdate> {
+ for obj in objects {
+ if obj.id == drag_id {
+ continue;
+ }
+ let Some(scene_id) = obj.scene_object_id else {
+ continue;
+ };
+ let Some(model) = obj.model_handle else {
+ continue;
+ };
+ let Some(aabb) = r.model_bounds(model) else {
+ continue;
+ };
+ let Some(parent_world) = r.world_matrix(scene_id) else {
+ continue;
+ };
+ let inv_parent = parent_world.inverse();
+ let local_hit = inv_parent.transform_point3(world_pos);
+
+ // Check if XZ is within the parent's AABB
+ if aabb.xz_overshoot(local_hit) < 0.01 {
+ let local_y = aabb.max.y + child_half_h;
+ let local_pos = aabb.clamp_xz(Vec3::new(local_hit.x, local_y, local_hit.z));
+ let snapped_world = parent_world.transform_point3(local_pos);
+ let plane_y = snapped_world.y;
+
+ // Compute grab offset so the object doesn't jump:
+ // re-unproject cursor onto the new (higher) drag plane,
+ // then compute offset in parent-local space.
+ let grab_offset = if let Some(new_hit) = r.unproject_to_plane(vp_x, vp_y, plane_y) {
+ let new_local = inv_parent.transform_point3(new_hit);
+ Vec3::new(local_pos.x - new_local.x, 0.0, local_pos.z - new_local.z)
+ } else {
+ Vec3::ZERO
+ };
+
+ return Some(DragUpdate::SnapToParent {
+ id: drag_id.to_string(),
+ parent_id: obj.id.clone(),
+ parent_scene_id: scene_id,
+ parent_aabb: aabb,
+ local_pos,
+ local_y,
+ plane_y,
+ new_grab_offset: grab_offset,
+ });
+ }
+ }
+ None
+}
+
+/// Pure computation: given current drag state and pointer, decide what to do.
+fn compute_drag_update(
+ drag: &DragState,
+ vp_x: f32,
+ vp_y: f32,
+ grid_snap: Option<f32>,
+ r: &renderbud::Renderer,
+) -> Option<DragUpdate> {
+ match &drag.mode {
+ DragMode::Free => {
+ let hit = r.unproject_to_plane(vp_x, vp_y, drag.plane_y)?;
+ let mut new_pos = hit + drag.grab_offset;
+ if let Some(grid) = grid_snap {
+ new_pos = snap_to_grid(new_pos, grid);
+ }
+ Some(DragUpdate::Move {
+ id: drag.object_id.clone(),
+ position: new_pos,
+ })
+ }
+ DragMode::Parented {
+ parent_scene_id,
+ parent_aabb,
+ local_y,
+ ..
+ } => {
+ let hit = r.unproject_to_plane(vp_x, vp_y, drag.plane_y)?;
+ let parent_world = r.world_matrix(*parent_scene_id)?;
+ let local_hit = parent_world.inverse().transform_point3(hit);
+ let mut local_pos = Vec3::new(
+ local_hit.x + drag.grab_offset.x,
+ *local_y,
+ local_hit.z + drag.grab_offset.z,
+ );
+ if let Some(grid) = grid_snap {
+ local_pos = snap_to_grid(local_pos, grid);
+ }
+
+ if parent_aabb.xz_overshoot(local_pos) > 1.0 {
+ let world_pos = parent_world.transform_point3(local_pos);
+ Some(DragUpdate::Breakaway {
+ id: drag.object_id.clone(),
+ world_pos,
+ new_grab_offset: world_pos - hit,
+ new_plane_y: world_pos.y,
+ })
+ } else {
+ Some(DragUpdate::Move {
+ id: drag.object_id.clone(),
+ position: parent_aabb.clamp_xz(local_pos),
+ })
+ }
+ }
+ }
+}
+
/// Render the nostrverse room view with 3D scene
pub fn show_room_view(
ui: &mut Ui,
@@ -21,18 +178,269 @@ pub fn show_room_view(
let available_size = ui.available_size();
let (rect, response) = ui.allocate_exact_size(available_size, Sense::click_and_drag());
+ let mut action: Option<NostrverseAction> = None;
+
// Update renderer target size and handle input
{
let mut r = renderer.renderer.lock().unwrap();
r.set_target_size((rect.width() as u32, rect.height() as u32));
- // Handle mouse drag for camera look
- if response.dragged() {
- let delta = response.drag_delta();
- r.on_mouse_drag(delta.x, delta.y);
+ if state.edit_mode {
+ // --- Edit mode: click-to-select, drag-to-move objects ---
+
+ // Drag start: pick to decide object-drag vs camera
+ if response.drag_started()
+ && let Some(pos) = response.interact_pointer_pos()
+ {
+ let vp = pos - rect.min.to_vec2();
+ if let Some(scene_id) = r.pick(vp.x, vp.y)
+ && let Some(obj) = state
+ .objects
+ .iter()
+ .find(|o| o.scene_object_id == Some(scene_id))
+ {
+ // Always select on drag start
+ r.set_selected(Some(scene_id));
+ state.selected_object = Some(obj.id.clone());
+
+ // In rotate mode, mark this as a rotation drag
+ // (don't start a position drag)
+ let drag_info = if state.rotate_mode {
+ state.rotate_drag = true;
+ None
+ } else {
+ match &obj.location {
+ Some(ObjectLocation::TopOf(parent_id))
+ | Some(ObjectLocation::Near(parent_id)) => {
+ let parent_obj = state.objects.iter().find(|o| o.id == *parent_id);
+ if let Some(parent) = parent_obj
+ && let Some(parent_scene_id) = parent.scene_object_id
+ && let Some(parent_model) = parent.model_handle
+ && let Some(parent_aabb) = r.model_bounds(parent_model)
+ && let Some(parent_world) = r.world_matrix(parent_scene_id)
+ {
+ let child_half_h = obj
+ .model_handle
+ .and_then(|m| r.model_bounds(m))
+ .map(|b| (b.max.y - b.min.y) * 0.5)
+ .unwrap_or(0.0);
+ let local_y = if matches!(
+ &obj.location,
+ Some(ObjectLocation::TopOf(_))
+ ) {
+ parent_aabb.max.y + child_half_h
+ } else {
+ 0.0
+ };
+ let obj_world = parent_world.transform_point3(obj.position);
+ let plane_y = obj_world.y;
+ let hit = r
+ .unproject_to_plane(vp.x, vp.y, plane_y)
+ .unwrap_or(obj_world);
+ let local_hit = parent_world.inverse().transform_point3(hit);
+ let grab_offset = obj.position - local_hit;
+ Some((
+ DragMode::Parented {
+ parent_id: parent_id.clone(),
+ parent_scene_id,
+ parent_aabb,
+ local_y,
+ },
+ grab_offset,
+ plane_y,
+ ))
+ } else {
+ None
+ }
+ }
+ None | Some(ObjectLocation::Floor) => {
+ let plane_y = obj.position.y;
+ let hit = r
+ .unproject_to_plane(vp.x, vp.y, plane_y)
+ .unwrap_or(obj.position);
+ let grab_offset = obj.position - hit;
+ Some((DragMode::Free, grab_offset, plane_y))
+ }
+ _ => None, // Center/Ceiling/Custom: not draggable
+ }
+ };
+
+ if let Some((mode, grab_offset, plane_y)) = drag_info {
+ state.drag_state = Some(DragState {
+ object_id: obj.id.clone(),
+ grab_offset,
+ plane_y,
+ mode,
+ });
+ }
+ }
+ }
+
+ // Dragging: rotate or move object, or control camera
+ if response.dragged() {
+ // Rotation drag: only when drag started on an object in rotate mode
+ if state.rotate_drag
+ && let Some(sel_id) = state.selected_object.clone()
+ && let Some(obj) = state.objects.iter().find(|o| o.id == sel_id)
+ {
+ let delta_x = response.drag_delta().x;
+ let angle = delta_x * ROTATE_SENSITIVITY;
+ let new_rotation = Quat::from_rotation_y(angle) * obj.rotation;
+ // Snap to angle increments when grid snap is enabled
+ let new_rotation = if state.grid_snap_enabled {
+ let (_, y, _) = new_rotation.to_euler(glam::EulerRot::YXZ);
+ let snap_rad = state.rotation_snap.to_radians();
+ let snapped_y = (y / snap_rad).round() * snap_rad;
+ Quat::from_rotation_y(snapped_y)
+ } else {
+ new_rotation
+ };
+ action = Some(NostrverseAction::RotateObject {
+ id: sel_id,
+ rotation: new_rotation,
+ });
+ ui.ctx().request_repaint();
+ } else if let Some(drag) = state.drag_state.as_ref() {
+ if let Some(pos) = response.interact_pointer_pos() {
+ let vp = pos - rect.min.to_vec2();
+ let grid = state.grid_snap_enabled.then_some(state.grid_snap);
+ // Borrow of state.drag_state is scoped to this call
+ let update = compute_drag_update(drag, vp.x, vp.y, grid, &r);
+ // Borrow released — free to mutate state
+ // For free drags, check if we should snap to a parent
+ let update = if let Some(DragUpdate::Move {
+ ref id,
+ ref position,
+ }) = update
+ {
+ if matches!(
+ state.drag_state.as_ref().map(|d| &d.mode),
+ Some(DragMode::Free)
+ ) {
+ let child_half_h = state
+ .objects
+ .iter()
+ .find(|o| o.id == *id)
+ .and_then(|o| o.model_handle)
+ .and_then(|m| r.model_bounds(m))
+ .map(|b| (b.max.y - b.min.y) * 0.5)
+ .unwrap_or(0.0);
+ find_snap_parent(
+ *position,
+ id,
+ child_half_h,
+ vp.x,
+ vp.y,
+ &state.objects,
+ &r,
+ )
+ .or(update)
+ } else {
+ update
+ }
+ } else {
+ update
+ };
+
+ match update {
+ Some(DragUpdate::Move { id, position }) => {
+ action = Some(NostrverseAction::MoveObject { id, position });
+ }
+ Some(DragUpdate::Breakaway {
+ id,
+ world_pos,
+ new_grab_offset,
+ new_plane_y,
+ }) => {
+ if let Some(obj) = state.objects.iter_mut().find(|o| o.id == id) {
+ if let Some(sid) = obj.scene_object_id {
+ r.set_parent(sid, None);
+ }
+ obj.position = world_pos;
+ obj.location = None;
+ obj.location_base = None;
+ state.dirty = true;
+ }
+ state.drag_state = Some(DragState {
+ object_id: id,
+ grab_offset: new_grab_offset,
+ plane_y: new_plane_y,
+ mode: DragMode::Free,
+ });
+ }
+ Some(DragUpdate::SnapToParent {
+ id,
+ parent_id,
+ parent_scene_id,
+ parent_aabb,
+ local_pos,
+ local_y,
+ plane_y,
+ new_grab_offset,
+ }) => {
+ if let Some(obj) = state.objects.iter_mut().find(|o| o.id == id) {
+ if let Some(sid) = obj.scene_object_id {
+ r.set_parent(sid, Some(parent_scene_id));
+ }
+ obj.position = local_pos;
+ obj.location = Some(ObjectLocation::TopOf(parent_id.clone()));
+ obj.location_base = Some(Vec3::new(0.0, local_y, 0.0));
+ state.dirty = true;
+ }
+ state.drag_state = Some(DragState {
+ object_id: id,
+ grab_offset: new_grab_offset,
+ plane_y,
+ mode: DragMode::Parented {
+ parent_id,
+ parent_scene_id,
+ parent_aabb,
+ local_y,
+ },
+ });
+ }
+ None => {}
+ }
+ }
+ ui.ctx().request_repaint();
+ } else {
+ let delta = response.drag_delta();
+ r.on_mouse_drag(delta.x, delta.y);
+ }
+ }
+
+ // Drag end: clear state
+ if response.drag_stopped() {
+ state.drag_state = None;
+ state.rotate_drag = false;
+ }
+
+ // Click (no drag): select/deselect
+ if response.clicked()
+ && let Some(pos) = response.interact_pointer_pos()
+ {
+ let vp = pos - rect.min.to_vec2();
+ if let Some(scene_id) = r.pick(vp.x, vp.y) {
+ if let Some(obj) = state
+ .objects
+ .iter()
+ .find(|o| o.scene_object_id == Some(scene_id))
+ {
+ action = Some(NostrverseAction::SelectObject(Some(obj.id.clone())));
+ }
+ } else {
+ action = Some(NostrverseAction::SelectObject(None));
+ }
+ }
+ } else {
+ // --- View mode: camera only ---
+ if response.dragged() {
+ let delta = response.drag_delta();
+ r.on_mouse_drag(delta.x, delta.y);
+ }
}
- // Handle scroll for speed adjustment
+ // Scroll: always routes to camera (zoom/speed)
if response.hover_pos().is_some() {
let scroll = ui.input(|i| i.raw_scroll_delta.y);
if scroll.abs() > 0.0 {
@@ -40,7 +448,24 @@ pub fn show_room_view(
}
}
- // WASD + QE movement
+ // G key: toggle grid snap
+ if ui.input(|i| i.key_pressed(egui::Key::G)) {
+ state.grid_snap_enabled = !state.grid_snap_enabled;
+ }
+
+ // R key: toggle rotate mode
+ if ui.input(|i| i.key_pressed(egui::Key::R)) {
+ state.rotate_mode = !state.rotate_mode;
+ }
+
+ // Ctrl+D: duplicate selected object
+ if ui.input(|i| i.modifiers.command && i.key_pressed(egui::Key::D))
+ && let Some(id) = state.selected_object.clone()
+ {
+ action = Some(NostrverseAction::DuplicateObject(id));
+ }
+
+ // WASD + QE movement: always available
let dt = ui.input(|i| i.stable_dt);
let mut forward = 0.0_f32;
let mut right = 0.0_f32;
@@ -83,10 +508,7 @@ pub fn show_room_view(
let painter = ui.painter_at(rect);
draw_info_overlay(&painter, state, rect);
- NostrverseResponse {
- response,
- action: None,
- }
+ NostrverseResponse { response, action }
}
fn draw_info_overlay(painter: &egui::Painter, state: &NostrverseState, rect: Rect) {
@@ -96,26 +518,29 @@ fn draw_info_overlay(painter: &egui::Painter, state: &NostrverseState, rect: Rec
.map(|r| r.name.as_str())
.unwrap_or("Loading...");
- let info_text = format!("{} | Objects: {}", room_name, state.objects.len());
+ let mut info_text = format!("{} | Objects: {}", room_name, state.objects.len());
+ if state.rotate_mode {
+ info_text.push_str(" | Rotate (R)");
+ }
- // Background for readability
+ // Measure text to size the background
+ let font_id = egui::FontId::proportional(14.0);
let text_pos = Pos2::new(rect.left() + 10.0, rect.top() + 10.0);
+ let galley = painter.layout_no_wrap(
+ info_text,
+ font_id,
+ Color32::from_rgba_unmultiplied(200, 200, 210, 220),
+ );
+ let padding = egui::vec2(12.0, 6.0);
painter.rect_filled(
Rect::from_min_size(
Pos2::new(rect.left() + 4.0, rect.top() + 4.0),
- egui::vec2(200.0, 24.0),
+ galley.size() + padding,
),
4.0,
Color32::from_rgba_unmultiplied(0, 0, 0, 160),
);
-
- painter.text(
- text_pos,
- egui::Align2::LEFT_TOP,
- info_text,
- egui::FontId::proportional(14.0),
- Color32::from_rgba_unmultiplied(200, 200, 210, 220),
- );
+ painter.galley(text_pos, galley, Color32::PLACEHOLDER);
}
/// Render the side panel with room editing, object list, and object inspector.
@@ -215,8 +640,12 @@ pub fn render_editing_panel(ui: &mut Ui, state: &mut NostrverseState) -> Option<
let label = format!("{} ({})", state.objects[i].name, state.objects[i].id);
if ui.selectable_label(is_selected, label).clicked() {
- let id = state.objects[i].id.clone();
- state.selected_object = if is_selected { None } else { Some(id) };
+ let selected = if is_selected {
+ None
+ } else {
+ Some(state.objects[i].id.clone())
+ };
+ action = Some(NostrverseAction::SelectObject(selected));
}
}
@@ -309,20 +738,70 @@ pub fn render_editing_panel(ui: &mut Ui, state: &mut NostrverseState) -> Option<
.inner;
obj.scale = Vec3::new(sx, sy, sz);
+ // Editable Y rotation (degrees)
+ let (_, angle_y, _) = obj.rotation.to_euler(glam::EulerRot::YXZ);
+ let mut deg = angle_y.to_degrees();
+ let snap = state.grid_snap_enabled;
+ let snap_deg = state.rotation_snap;
+ let rot_changed = ui
+ .horizontal(|ui| {
+ ui.label("Rot Y:");
+ let speed = if snap { snap_deg } else { 1.0 };
+ ui.add(egui::DragValue::new(&mut deg).speed(speed).suffix("°"))
+ .changed()
+ })
+ .inner;
+ if rot_changed {
+ if snap {
+ deg = (deg / snap_deg).round() * snap_deg;
+ }
+ obj.rotation = Quat::from_rotation_y(deg.to_radians());
+ }
+
// Model URL (read-only for now)
if let Some(url) = &obj.model_url {
ui.add_space(4.0);
ui.small(format!("Model: {}", url));
}
- if name_changed || pos_changed || scale_changed {
+ if name_changed || pos_changed || scale_changed || rot_changed {
state.dirty = true;
}
ui.add_space(8.0);
- if ui.button("Delete Object").clicked() {
- action = Some(NostrverseAction::RemoveObject(selected_id.to_owned()));
+ ui.horizontal(|ui| {
+ if ui.button("Duplicate").clicked() {
+ action = Some(NostrverseAction::DuplicateObject(selected_id.to_owned()));
+ }
+ if ui.button("Delete").clicked() {
+ action = Some(NostrverseAction::RemoveObject(selected_id.to_owned()));
+ }
+ });
+ }
+
+ // --- Grid Snap ---
+ ui.add_space(8.0);
+ ui.horizontal(|ui| {
+ ui.checkbox(&mut state.grid_snap_enabled, "Grid Snap (G)");
+ if state.grid_snap_enabled {
+ ui.add(
+ egui::DragValue::new(&mut state.grid_snap)
+ .speed(0.05)
+ .range(0.05..=10.0)
+ .suffix("m"),
+ );
}
+ });
+ if state.grid_snap_enabled {
+ ui.horizontal(|ui| {
+ ui.label(" Rot snap:");
+ ui.add(
+ egui::DragValue::new(&mut state.rotation_snap)
+ .speed(1.0)
+ .range(1.0..=90.0)
+ .suffix("°"),
+ );
+ });
}
// --- Save button ---
@@ -337,13 +816,19 @@ pub fn render_editing_panel(ui: &mut Ui, state: &mut NostrverseState) -> Option<
}
// --- Scene body (syntax-highlighted, read-only) ---
+ // Only re-serialize when not actively dragging an object
+ if state.drag_state.is_none()
+ && let Some(room) = &state.room
+ {
+ let space = convert::build_space(room, &state.objects);
+ state.cached_scene_text = protoverse::serialize(&space);
+ }
+
ui.add_space(12.0);
ui.strong("Scene");
ui.separator();
- if let Some(room) = &state.room {
- let space = convert::build_space(room, &state.objects);
- let text = protoverse::serialize(&space);
- let layout_job = highlight_sexp(&text, ui);
+ if !state.cached_scene_text.is_empty() {
+ let layout_job = highlight_sexp(&state.cached_scene_text, ui);
let code_bg = if ui.visuals().dark_mode {
Color32::from_rgb(0x1E, 0x1C, 0x19)
} else {
@@ -455,6 +940,7 @@ fn is_sexp_keyword(word: &str) -> bool {
| "height"
| "depth"
| "position"
+ | "rotation"
| "location"
| "model-url"
| "material"
diff --git a/crates/protoverse/src/ast.rs b/crates/protoverse/src/ast.rs
@@ -65,6 +65,8 @@ pub enum Attribute {
Location(Location),
State(CellState),
Position(f64, f64, f64),
+ /// Euler rotation in degrees (X, Y, Z), applied in YXZ order.
+ Rotation(f64, f64, f64),
ModelUrl(String),
}
@@ -206,6 +208,14 @@ impl Space {
})
}
+ /// Euler rotation in degrees (X, Y, Z).
+ pub fn rotation(&self, id: CellId) -> Option<(f64, f64, f64)> {
+ self.attrs(id).iter().find_map(|a| match a {
+ Attribute::Rotation(x, y, z) => Some((*x, *y, *z)),
+ _ => None,
+ })
+ }
+
pub fn model_url(&self, id: CellId) -> Option<&str> {
self.attrs(id).iter().find_map(|a| match a {
Attribute::ModelUrl(s) => Some(s.as_str()),
diff --git a/crates/protoverse/src/parser.rs b/crates/protoverse/src/parser.rs
@@ -216,6 +216,15 @@ impl<'a> Parser<'a> {
_ => None,
}
}
+ "rotation" => {
+ let x = self.eat_number();
+ let y = self.eat_number();
+ let z = self.eat_number();
+ match (x, y, z) {
+ (Some(x), Some(y), Some(z)) => Some(Attribute::Rotation(x, y, z)),
+ _ => None,
+ }
+ }
"model-url" => self
.eat_string()
.map(|s| Attribute::ModelUrl(s.to_string())),
diff --git a/crates/protoverse/src/serializer.rs b/crates/protoverse/src/serializer.rs
@@ -17,7 +17,13 @@ fn format_number(n: f64) -> String {
if n == n.floor() && n.abs() < i64::MAX as f64 {
format!("{}", n as i64)
} else {
- format!("{}", n)
+ // Round to 4 decimal places to avoid f32→f64 noise
+ // (e.g. -0.20000000298023224 → -0.2)
+ let rounded = (n * 10000.0).round() / 10000.0;
+ let s = format!("{:.4}", rounded);
+ let s = s.trim_end_matches('0');
+ let s = s.trim_end_matches('.');
+ s.to_string()
}
}
@@ -98,6 +104,15 @@ fn write_attr(attr: &Attribute, out: &mut String) {
format_number(*z)
);
}
+ Attribute::Rotation(x, y, z) => {
+ let _ = write!(
+ out,
+ "(rotation {} {} {})",
+ format_number(*x),
+ format_number(*y),
+ format_number(*z)
+ );
+ }
Attribute::ModelUrl(s) => {
let _ = write!(out, "(model-url \"{}\")", s);
}
@@ -117,4 +132,21 @@ mod tests {
assert!(output.contains("(name \"Test\")"));
assert!(output.contains("(width 10)"));
}
+
+ #[test]
+ fn test_format_number_strips_float_noise() {
+ // Integers
+ assert_eq!(format_number(10.0), "10");
+ assert_eq!(format_number(-3.0), "-3");
+ assert_eq!(format_number(0.0), "0");
+
+ // Clean decimals
+ assert_eq!(format_number(1.5), "1.5");
+ assert_eq!(format_number(-0.2), "-0.2");
+
+ // f32→f64 noise: -0.2f32 as f64 == -0.20000000298023224
+ assert_eq!(format_number(-0.2_f32 as f64), "-0.2");
+ assert_eq!(format_number(0.5_f32 as f64), "0.5");
+ assert_eq!(format_number(1.125_f32 as f64), "1.125");
+ }
}
diff --git a/crates/renderbud/src/lib.rs b/crates/renderbud/src/lib.rs
@@ -1,4 +1,4 @@
-use glam::{Mat4, Vec2, Vec3};
+use glam::{Mat4, Vec2, Vec3, Vec4};
use crate::material::{MaterialUniform, make_material_gpudata};
use crate::model::ModelData;
@@ -119,6 +119,7 @@ pub struct Renderer {
skybox_pipeline: wgpu::RenderPipeline,
grid_pipeline: wgpu::RenderPipeline,
shadow_pipeline: wgpu::RenderPipeline,
+ outline_pipeline: wgpu::RenderPipeline,
shadow_view: wgpu::TextureView,
shadow_globals_bg: wgpu::BindGroup,
@@ -308,6 +309,26 @@ fn make_dynamic_object_buffer(
)
}
+/// Ray-AABB intersection using the slab method.
+/// Transforms the ray into the object's local space via the inverse world matrix.
+/// Returns the distance along the ray if there's a hit.
+fn ray_aabb(origin: Vec3, dir: Vec3, aabb: &Aabb, world: &Mat4) -> Option<f32> {
+ let inv = world.inverse();
+ let lo = (inv * origin.extend(1.0)).truncate();
+ let ld = (inv * dir.extend(0.0)).truncate();
+ let t1 = (aabb.min - lo) / ld;
+ let t2 = (aabb.max - lo) / ld;
+ let tmin = t1.min(t2);
+ let tmax = t1.max(t2);
+ let enter = tmin.x.max(tmin.y).max(tmin.z);
+ let exit = tmax.x.min(tmax.y).min(tmax.z);
+ if exit >= enter.max(0.0) {
+ Some(enter.max(0.0))
+ } else {
+ None
+ }
+}
+
impl Renderer {
pub fn new(
device: &wgpu::Device,
@@ -564,6 +585,55 @@ impl Renderer {
multiview: None,
});
+ // Outline pipeline (inverted hull, front-face culling)
+ let outline_shader = device.create_shader_module(wgpu::ShaderModuleDescriptor {
+ label: Some("outline_shader"),
+ source: wgpu::ShaderSource::Wgsl(include_str!("outline.wgsl").into()),
+ });
+
+ let outline_pipeline_layout =
+ device.create_pipeline_layout(&wgpu::PipelineLayoutDescriptor {
+ label: Some("outline_pipeline_layout"),
+ bind_group_layouts: &[&shadow_globals_bgl, &object_bgl],
+ push_constant_ranges: &[],
+ });
+
+ let outline_pipeline = device.create_render_pipeline(&wgpu::RenderPipelineDescriptor {
+ label: Some("outline_pipeline"),
+ cache: None,
+ layout: Some(&outline_pipeline_layout),
+ vertex: wgpu::VertexState {
+ module: &outline_shader,
+ compilation_options: wgpu::PipelineCompilationOptions::default(),
+ entry_point: Some("vs_main"),
+ buffers: &[Vertex::desc()],
+ },
+ fragment: Some(wgpu::FragmentState {
+ module: &outline_shader,
+ compilation_options: wgpu::PipelineCompilationOptions::default(),
+ entry_point: Some("fs_main"),
+ targets: &[Some(wgpu::ColorTargetState {
+ format,
+ blend: Some(wgpu::BlendState::REPLACE),
+ write_mask: wgpu::ColorWrites::ALL,
+ })],
+ }),
+ primitive: wgpu::PrimitiveState {
+ topology: wgpu::PrimitiveTopology::TriangleList,
+ cull_mode: Some(wgpu::Face::Front),
+ ..Default::default()
+ },
+ depth_stencil: Some(wgpu::DepthStencilState {
+ format: wgpu::TextureFormat::Depth24Plus,
+ depth_write_enabled: true,
+ depth_compare: wgpu::CompareFunction::Less,
+ stencil: wgpu::StencilState::default(),
+ bias: wgpu::DepthBiasState::default(),
+ }),
+ multisample: wgpu::MultisampleState::default(),
+ multiview: None,
+ });
+
let (depth_tex, depth_view) = create_depth(device, width, height);
/* TODO: move to example
@@ -592,6 +662,7 @@ impl Renderer {
skybox_pipeline,
grid_pipeline,
shadow_pipeline,
+ outline_pipeline,
shadow_view,
shadow_globals_bg,
globals,
@@ -713,11 +784,82 @@ impl Renderer {
self.globals.data.set_camera(w, h, &self.world.camera);
}
+ /// Set or clear which object shows a selection outline.
+ pub fn set_selected(&mut self, id: Option<ObjectId>) {
+ self.world.selected_object = id;
+ }
+
/// Get the axis-aligned bounding box for a loaded model.
pub fn model_bounds(&self, model: Model) -> Option<Aabb> {
self.models.get(&model).map(|md| md.bounds)
}
+ /// Get the cached world matrix for a scene object.
+ pub fn world_matrix(&self, id: ObjectId) -> Option<glam::Mat4> {
+ self.world.world_matrix(id)
+ }
+
+ /// Get the parent of a scene object, if it has one.
+ pub fn node_parent(&self, id: ObjectId) -> Option<ObjectId> {
+ self.world.node_parent(id)
+ }
+
+ /// Convert screen coordinates (relative to viewport) to a world-space ray.
+ /// Returns (origin, direction).
+ fn screen_to_ray(&self, screen_x: f32, screen_y: f32) -> (Vec3, Vec3) {
+ let (w, h) = self.target_size;
+ let ndc_x = (screen_x / w as f32) * 2.0 - 1.0;
+ let ndc_y = 1.0 - (screen_y / h as f32) * 2.0;
+ let vp = self.world.camera.view_proj(w as f32, h as f32);
+ let inv_vp = vp.inverse();
+ let near4 = inv_vp * Vec4::new(ndc_x, ndc_y, 0.0, 1.0);
+ let far4 = inv_vp * Vec4::new(ndc_x, ndc_y, 1.0, 1.0);
+ let near = near4.truncate() / near4.w;
+ let far = far4.truncate() / far4.w;
+ (near, (far - near).normalize())
+ }
+
+ /// Pick the closest scene object at the given screen coordinates.
+ /// Coordinates are relative to the viewport (0,0 = top-left).
+ pub fn pick(&self, screen_x: f32, screen_y: f32) -> Option<ObjectId> {
+ let (origin, dir) = self.screen_to_ray(screen_x, screen_y);
+ let mut closest: Option<(ObjectId, f32)> = None;
+ for &id in self.world.renderables() {
+ let model = match self.world.node_model(id) {
+ Some(m) => m,
+ None => continue,
+ };
+ let aabb = match self.model_bounds(model) {
+ Some(a) => a,
+ None => continue,
+ };
+ let world = match self.world.world_matrix(id) {
+ Some(w) => w,
+ None => continue,
+ };
+ if let Some(t) = ray_aabb(origin, dir, &aabb, &world)
+ && closest.is_none_or(|(_, d)| t < d)
+ {
+ closest = Some((id, t));
+ }
+ }
+ closest.map(|(id, _)| id)
+ }
+
+ /// Unproject screen coordinates to a point on a horizontal plane at the given Y height.
+ /// Useful for constraining object drag to the ground plane.
+ pub fn unproject_to_plane(&self, screen_x: f32, screen_y: f32, plane_y: f32) -> Option<Vec3> {
+ let (origin, dir) = self.screen_to_ray(screen_x, screen_y);
+ if dir.y.abs() < 1e-6 {
+ return None;
+ }
+ let t = (plane_y - origin.y) / dir.y;
+ if t < 0.0 {
+ return None;
+ }
+ Some(origin + dir * t)
+ }
+
/// Handle mouse drag for camera look/orbit.
pub fn on_mouse_drag(&mut self, delta_x: f32, delta_y: f32) {
match &mut self.camera_mode {
@@ -925,6 +1067,30 @@ impl Renderer {
rpass.draw_indexed(0..d.mesh.num_indices, 0, 0..1);
}
}
+
+ // 4. Draw selection outline for selected object
+ if let Some(selected_id) = self.world.selected_object
+ && let Some(sel_idx) = self
+ .world
+ .renderables()
+ .iter()
+ .position(|&id| id == selected_id)
+ {
+ let node = self.world.get_node(selected_id).unwrap();
+ let model_handle = node.model.unwrap();
+ if let Some(model_data) = self.models.get(&model_handle) {
+ rpass.set_pipeline(&self.outline_pipeline);
+ rpass.set_bind_group(0, &self.shadow_globals_bg, &[]);
+ let dynamic_offset = (sel_idx as u64 * self.object_buf.stride) as u32;
+ rpass.set_bind_group(1, &self.object_buf.bindgroup, &[dynamic_offset]);
+
+ for d in &model_data.draws {
+ rpass.set_vertex_buffer(0, d.mesh.vert_buf.slice(..));
+ rpass.set_index_buffer(d.mesh.ind_buf.slice(..), wgpu::IndexFormat::Uint16);
+ rpass.draw_indexed(0..d.mesh.num_indices, 0, 0..1);
+ }
+ }
+ }
}
}
diff --git a/crates/renderbud/src/model.rs b/crates/renderbud/src/model.rs
@@ -614,4 +614,20 @@ impl Aabb {
pub fn radius(&self) -> f32 {
self.half_extents().length()
}
+
+ /// Clamp a point's XZ to the AABB's XZ extent. Y unchanged.
+ pub fn clamp_xz(&self, p: Vec3) -> Vec3 {
+ Vec3::new(
+ p.x.clamp(self.min.x, self.max.x),
+ p.y,
+ p.z.clamp(self.min.z, self.max.z),
+ )
+ }
+
+ /// Distance the point's XZ overshoots the AABB boundary. 0 if inside.
+ pub fn xz_overshoot(&self, p: Vec3) -> f32 {
+ let dx = (p.x - self.max.x).max(self.min.x - p.x).max(0.0);
+ let dz = (p.z - self.max.z).max(self.min.z - p.z).max(0.0);
+ (dx * dx + dz * dz).sqrt()
+ }
}
diff --git a/crates/renderbud/src/outline.wgsl b/crates/renderbud/src/outline.wgsl
@@ -0,0 +1,56 @@
+// Outline shader: inverted hull method.
+// Renders back faces of a slightly inflated mesh in a solid color
+// to produce a visible outline around the selected object.
+
+struct Globals {
+ time: f32,
+ _pad0: f32,
+ resolution: vec2<f32>,
+
+ cam_pos: vec3<f32>,
+ _pad3: f32,
+
+ light_dir: vec3<f32>,
+ _pad1: f32,
+
+ light_color: vec3<f32>,
+ _pad2: f32,
+
+ fill_light_dir: vec3<f32>,
+ _pad4: f32,
+
+ fill_light_color: vec3<f32>,
+ _pad5: f32,
+
+ view_proj: mat4x4<f32>,
+ inv_view_proj: mat4x4<f32>,
+ light_view_proj: mat4x4<f32>,
+};
+
+struct Object {
+ model: mat4x4<f32>,
+ normal: mat4x4<f32>,
+};
+
+@group(0) @binding(0) var<uniform> globals: Globals;
+@group(1) @binding(0) var<uniform> object: Object;
+
+struct VSIn {
+ @location(0) pos: vec3<f32>,
+ @location(1) normal: vec3<f32>,
+ @location(2) uv: vec2<f32>,
+ @location(3) tangent: vec4<f32>,
+};
+
+@vertex
+fn vs_main(v: VSIn) -> @builtin(position) vec4<f32> {
+ let outline_width = 0.012;
+ let inflated = v.pos + v.normal * outline_width;
+ let world4 = object.model * vec4<f32>(inflated, 1.0);
+ return globals.view_proj * world4;
+}
+
+@fragment
+fn fs_main() -> @location(0) vec4<f32> {
+ return vec4<f32>(1.0, 0.6, 0.15, 1.0);
+}
diff --git a/crates/renderbud/src/world.rs b/crates/renderbud/src/world.rs
@@ -349,6 +349,22 @@ impl World {
Some(&self.nodes[id.index as usize])
}
+ /// Get the parent of a node, if it has one.
+ pub fn node_parent(&self, id: NodeId) -> Option<NodeId> {
+ if !self.is_valid(id) {
+ return None;
+ }
+ self.nodes[id.index as usize].parent
+ }
+
+ /// Get the Model handle for a node, if it has one.
+ pub fn node_model(&self, id: NodeId) -> Option<Model> {
+ if !self.is_valid(id) {
+ return None;
+ }
+ self.nodes[id.index as usize].model
+ }
+
/// Iterate renderable node ids (nodes with a Model).
pub fn renderables(&self) -> &[NodeId] {
&self.renderables