commit ca627b8aec43d5223d49aaa6bdc85183a0b45686
parent a11d1ac0ef5b254a80e2b51547f38cd12307a0e3
Author: William Casarin <jb55@jb55.com>
Date: Wed, 25 Feb 2026 14:22:58 -0800
dave: add codex tool events, token usage, and manual compaction
- Handle item/started for commandExecution, fileChange, contextCompaction
in the Codex backend so tool activity is visible in the UI
- Handle thread/tokenUsage/updated to show token usage stats
- Track turn count per Codex session for usage reporting
- Add thread/compact/start support for manual context compaction
- Add compact_session() to AiBackend trait with implementations:
Codex sends thread/compact/start RPC, Claude sends /compact query
- Add COMPACT badge button next to PLAN and AUTO in the UI
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat:
8 files changed, 363 insertions(+), 34 deletions(-)
diff --git a/crates/notedeck_dave/src/backend/claude.rs b/crates/notedeck_dave/src/backend/claude.rs
@@ -233,6 +233,11 @@ async fn session_actor(
}
mode_ctx.request_repaint();
}
+ SessionCommand::Compact { response_tx: compact_tx, .. } => {
+ let _ = compact_tx.send(DaveApiResponse::Failed(
+ "Cannot compact during active turn".to_string(),
+ ));
+ }
SessionCommand::Shutdown => {
tracing::debug!("Session actor {} shutting down during query", session_id);
// Drop stream and disconnect - break to exit loop first
@@ -499,6 +504,16 @@ async fn session_actor(
}
ctx.request_repaint();
}
+ SessionCommand::Compact { response_tx, .. } => {
+ // Claude compact is normally routed via compact_session() which
+ // sends /compact as a Query. If a Compact command arrives directly,
+ // just drop the tx — the caller will see it disconnected.
+ tracing::debug!(
+ "Session {} received Compact command (not expected for Claude)",
+ session_id
+ );
+ drop(response_tx);
+ }
SessionCommand::Shutdown => {
tracing::debug!("Session actor {} shutting down", session_id);
break;
@@ -621,6 +636,29 @@ impl AiBackend for ClaudeBackend {
);
}
}
+
+ fn compact_session(
+ &self,
+ session_id: String,
+ ctx: egui::Context,
+ ) -> Option<mpsc::Receiver<DaveApiResponse>> {
+ let handle = self.sessions.get(&session_id)?;
+ let command_tx = handle.command_tx.clone();
+ let (response_tx, response_rx) = mpsc::channel();
+ tokio::spawn(async move {
+ if let Err(err) = command_tx
+ .send(SessionCommand::Query {
+ prompt: "/compact".to_string(),
+ response_tx,
+ ctx,
+ })
+ .await
+ {
+ tracing::warn!("Failed to send compact query to claude session: {}", err);
+ }
+ });
+ Some(response_rx)
+ }
}
#[cfg(test)]
diff --git a/crates/notedeck_dave/src/backend/codex.rs b/crates/notedeck_dave/src/backend/codex.rs
@@ -7,6 +7,7 @@ use crate::backend::traits::AiBackend;
use crate::file_update::{FileUpdate, FileUpdateType};
use crate::messages::{
CompactionInfo, DaveApiResponse, PermissionResponse, SessionInfo, SubagentInfo, SubagentStatus,
+ UsageInfo,
};
use crate::tools::Tool;
use crate::Message;
@@ -156,6 +157,7 @@ async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
let mut request_counter: u64 = 10; // start after init IDs
let mut current_turn_id: Option<String> = None;
let mut sent_session_info = false;
+ let mut turn_count: u32 = 0;
while let Some(cmd) = command_rx.recv().await {
match cmd {
@@ -177,6 +179,7 @@ async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
}
// Send turn/start
+ turn_count += 1;
request_counter += 1;
let turn_req_id = request_counter;
if let Err(err) =
@@ -261,6 +264,12 @@ async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
mode_ctx.request_repaint();
pending_approval = Some((rpc_id, rx));
}
+ SessionCommand::Compact { response_tx: compact_tx, .. } => {
+ let _ = compact_tx.send(DaveApiResponse::Failed(
+ "Cannot compact during active turn".to_string(),
+ ));
+ pending_approval = Some((rpc_id, rx));
+ }
}
}
@@ -300,6 +309,11 @@ async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
);
mode_ctx.request_repaint();
}
+ SessionCommand::Compact { response_tx: compact_tx, .. } => {
+ let _ = compact_tx.send(DaveApiResponse::Failed(
+ "Cannot compact during active turn".to_string(),
+ ));
+ }
SessionCommand::Shutdown => {
tracing::debug!("Session {} shutting down during query", session_id);
return;
@@ -323,6 +337,7 @@ async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
&response_tx,
&ctx,
&mut subagent_stack,
+ &turn_count,
) {
HandleResult::Continue => {}
HandleResult::TurnDone => {
@@ -374,6 +389,120 @@ async fn session_actor_loop<W: AsyncWrite + Unpin, R: AsyncBufRead + Unpin>(
);
ctx.request_repaint();
}
+ SessionCommand::Compact { response_tx, ctx } => {
+ request_counter += 1;
+ let compact_req_id = request_counter;
+
+ // Send thread/compact/start RPC
+ if let Err(err) = send_thread_compact(&mut writer, compact_req_id, &thread_id).await
+ {
+ tracing::error!(
+ "Session {} thread/compact/start failed: {}",
+ session_id,
+ err
+ );
+ let _ = response_tx.send(DaveApiResponse::Failed(err));
+ ctx.request_repaint();
+ continue;
+ }
+
+ // Read the RPC response (empty {})
+ match read_response_for_id(&mut reader, compact_req_id).await {
+ Ok(msg) => {
+ if let Some(err) = msg.error {
+ tracing::error!(
+ "Session {} thread/compact/start error: {}",
+ session_id,
+ err.message
+ );
+ let _ = response_tx.send(DaveApiResponse::Failed(err.message));
+ ctx.request_repaint();
+ continue;
+ }
+ }
+ Err(err) => {
+ tracing::error!(
+ "Session {} failed reading compact response: {}",
+ session_id,
+ err
+ );
+ let _ = response_tx.send(DaveApiResponse::Failed(err));
+ ctx.request_repaint();
+ continue;
+ }
+ }
+
+ // Compact accepted — stream notifications until item/completed
+ let _ = response_tx.send(DaveApiResponse::CompactionStarted);
+ ctx.request_repaint();
+
+ loop {
+ tokio::select! {
+ biased;
+
+ Some(cmd) = command_rx.recv() => {
+ match cmd {
+ SessionCommand::Shutdown => {
+ tracing::debug!("Session {} shutting down during compact", session_id);
+ return;
+ }
+ _ => {
+ // Ignore other commands during compaction
+ }
+ }
+ }
+
+ line_result = reader.next_line() => {
+ match line_result {
+ Ok(Some(line)) => {
+ let msg: RpcMessage = match serde_json::from_str(&line) {
+ Ok(m) => m,
+ Err(err) => {
+ tracing::warn!("Codex parse error during compact: {}", err);
+ continue;
+ }
+ };
+
+ // Look for item/completed with contextCompaction
+ if msg.method.as_deref() == Some("item/completed") {
+ if let Some(ref params) = msg.params {
+ let item_type = params.get("type")
+ .and_then(|v| v.as_str())
+ .unwrap_or("");
+ if item_type == "contextCompaction" {
+ let pre_tokens = params.get("preTokens")
+ .and_then(|v| v.as_u64())
+ .unwrap_or(0);
+ let _ = response_tx.send(DaveApiResponse::CompactionComplete(
+ CompactionInfo { pre_tokens },
+ ));
+ ctx.request_repaint();
+ break;
+ }
+ }
+ }
+ }
+ Ok(None) => {
+ tracing::error!("Session {} codex process exited during compact", session_id);
+ let _ = response_tx.send(DaveApiResponse::Failed(
+ "Codex process exited during compaction".to_string(),
+ ));
+ break;
+ }
+ Err(err) => {
+ tracing::error!("Session {} read error during compact: {}", session_id, err);
+ let _ = response_tx.send(DaveApiResponse::Failed(err.to_string()));
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ // Drop response channel to signal completion
+ drop(response_tx);
+ tracing::debug!("Compaction complete for session {}", session_id);
+ }
SessionCommand::Shutdown => {
tracing::debug!("Session {} shutting down", session_id);
break;
@@ -450,6 +579,7 @@ fn handle_codex_message(
response_tx: &mpsc::Sender<DaveApiResponse>,
ctx: &egui::Context,
subagent_stack: &mut Vec<String>,
+ turn_count: &u32,
) -> HandleResult {
let method = match &msg.method {
Some(m) => m.as_str(),
@@ -479,22 +609,57 @@ fn handle_codex_message(
"item/started" => {
if let Some(params) = msg.params {
if let Ok(started) = serde_json::from_value::<ItemStartedParams>(params) {
- if started.item_type == "collabAgentToolCall" {
- let item_id = started
- .item_id
- .unwrap_or_else(|| Uuid::new_v4().to_string());
- subagent_stack.push(item_id.clone());
- let info = SubagentInfo {
- task_id: item_id,
- description: started.name.unwrap_or_else(|| "agent".to_string()),
- subagent_type: "codex-agent".to_string(),
- status: SubagentStatus::Running,
- output: String::new(),
- max_output_size: 4000,
- tool_results: Vec::new(),
- };
- let _ = response_tx.send(DaveApiResponse::SubagentSpawned(info));
- ctx.request_repaint();
+ match started.item_type.as_str() {
+ "collabAgentToolCall" => {
+ let item_id = started
+ .item_id
+ .unwrap_or_else(|| Uuid::new_v4().to_string());
+ subagent_stack.push(item_id.clone());
+ let info = SubagentInfo {
+ task_id: item_id,
+ description: started.name.unwrap_or_else(|| "agent".to_string()),
+ subagent_type: "codex-agent".to_string(),
+ status: SubagentStatus::Running,
+ output: String::new(),
+ max_output_size: 4000,
+ tool_results: Vec::new(),
+ };
+ let _ = response_tx.send(DaveApiResponse::SubagentSpawned(info));
+ ctx.request_repaint();
+ }
+ "commandExecution" => {
+ let cmd = started.command.unwrap_or_default();
+ let tool_input = serde_json::json!({ "command": cmd });
+ let result_value = serde_json::json!({});
+ shared::send_tool_result(
+ "Bash",
+ &tool_input,
+ &result_value,
+ None,
+ subagent_stack,
+ response_tx,
+ ctx,
+ );
+ }
+ "fileChange" => {
+ let path = started.file_path.unwrap_or_default();
+ let tool_input = serde_json::json!({ "file_path": path });
+ let result_value = serde_json::json!({});
+ shared::send_tool_result(
+ "Edit",
+ &tool_input,
+ &result_value,
+ None,
+ subagent_stack,
+ response_tx,
+ ctx,
+ );
+ }
+ "contextCompaction" => {
+ let _ = response_tx.send(DaveApiResponse::CompactionStarted);
+ ctx.request_repaint();
+ }
+ _ => {}
}
}
}
@@ -600,6 +765,21 @@ fn handle_codex_message(
}
}
+ "thread/tokenUsage/updated" => {
+ if let Some(params) = msg.params {
+ if let Ok(usage) = serde_json::from_value::<TokenUsageParams>(params) {
+ let info = UsageInfo {
+ input_tokens: usage.token_usage.total.input_tokens as u64,
+ output_tokens: usage.token_usage.total.output_tokens as u64,
+ num_turns: *turn_count,
+ cost_usd: None,
+ };
+ let _ = response_tx.send(DaveApiResponse::QueryComplete(info));
+ ctx.request_repaint();
+ }
+ }
+ }
+
"turn/completed" => {
if let Some(params) = msg.params {
if let Ok(completed) = serde_json::from_value::<TurnCompletedParams>(params) {
@@ -1024,6 +1204,25 @@ async fn send_turn_interrupt<W: AsyncWrite + Unpin>(
.map_err(|e| format!("Failed to send turn/interrupt: {}", e))
}
+/// Send `thread/compact/start`.
+async fn send_thread_compact<W: AsyncWrite + Unpin>(
+ writer: &mut tokio::io::BufWriter<W>,
+ req_id: u64,
+ thread_id: &str,
+) -> Result<(), String> {
+ let req = RpcRequest {
+ id: Some(req_id),
+ method: "thread/compact/start",
+ params: ThreadCompactParams {
+ thread_id: thread_id.to_string(),
+ },
+ };
+
+ send_request(writer, &req)
+ .await
+ .map_err(|e| format!("Failed to send thread/compact/start: {}", e))
+}
+
/// Read lines until we find a response matching the given request id.
/// Non-matching messages (notifications) are logged and skipped.
async fn read_response_for_id<R: AsyncBufRead + Unpin>(
@@ -1063,11 +1262,12 @@ async fn drain_commands_with_error(
error: &str,
) {
while let Some(cmd) = command_rx.recv().await {
- if let SessionCommand::Query {
- ref response_tx, ..
- } = cmd
- {
- let _ = response_tx.send(DaveApiResponse::Failed(error.to_string()));
+ match &cmd {
+ SessionCommand::Query { response_tx, .. }
+ | SessionCommand::Compact { response_tx, .. } => {
+ let _ = response_tx.send(DaveApiResponse::Failed(error.to_string()));
+ }
+ _ => {}
}
if matches!(cmd, SessionCommand::Shutdown) {
break;
@@ -1197,6 +1397,25 @@ impl AiBackend for CodexBackend {
});
}
}
+
+ fn compact_session(
+ &self,
+ session_id: String,
+ ctx: egui::Context,
+ ) -> Option<mpsc::Receiver<DaveApiResponse>> {
+ let handle = self.sessions.get(&session_id)?;
+ let command_tx = handle.command_tx.clone();
+ let (response_tx, response_rx) = mpsc::channel();
+ tokio::spawn(async move {
+ if let Err(err) = command_tx
+ .send(SessionCommand::Compact { response_tx, ctx })
+ .await
+ {
+ tracing::warn!("Failed to send compact to codex session: {}", err);
+ }
+ });
+ Some(response_rx)
+ }
}
#[cfg(test)]
@@ -1324,7 +1543,7 @@ mod tests {
let msg = notification("item/agentMessage/delta", json!({ "delta": "Hello world" }));
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
let response = rx.try_recv().unwrap();
@@ -1341,7 +1560,7 @@ mod tests {
let mut subagents = Vec::new();
let msg = notification("turn/completed", json!({ "status": "completed" }));
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::TurnDone));
}
@@ -1355,7 +1574,7 @@ mod tests {
"turn/completed",
json!({ "status": "failed", "error": "rate limit exceeded" }),
);
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::TurnDone));
let response = rx.try_recv().unwrap();
@@ -1379,7 +1598,7 @@ mod tests {
error: None,
params: None,
};
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
assert!(rx.try_recv().is_err()); // nothing sent
}
@@ -1391,7 +1610,7 @@ mod tests {
let mut subagents = Vec::new();
let msg = notification("some/future/event", json!({}));
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
assert!(rx.try_recv().is_err());
}
@@ -1406,7 +1625,7 @@ mod tests {
"codex/event/error",
json!({ "message": "context window exceeded" }),
);
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
let response = rx.try_recv().unwrap();
@@ -1423,7 +1642,7 @@ mod tests {
let mut subagents = Vec::new();
let msg = notification("error", json!({ "message": "something broke" }));
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
let response = rx.try_recv().unwrap();
@@ -1440,7 +1659,7 @@ mod tests {
let mut subagents = Vec::new();
let msg = notification("codex/event/error", json!({}));
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
let response = rx.try_recv().unwrap();
@@ -1469,7 +1688,7 @@ mod tests {
"conversationId": "thread-1"
}),
);
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
let response = rx.try_recv().unwrap();
@@ -1499,7 +1718,7 @@ mod tests {
"turnId": "1"
}),
);
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
let response = rx.try_recv().unwrap();
@@ -1523,7 +1742,7 @@ mod tests {
"name": "research agent"
}),
);
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
assert!(matches!(result, HandleResult::Continue));
assert_eq!(subagents.len(), 1);
assert_eq!(subagents[0], "agent-1");
@@ -1553,7 +1772,7 @@ mod tests {
"item/commandExecution/requestApproval",
json!({ "command": "git status" }),
);
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
match result {
HandleResult::AutoAccepted(id) => assert_eq!(id, 99),
other => panic!(
@@ -1577,7 +1796,7 @@ mod tests {
"item/commandExecution/requestApproval",
json!({ "command": "rm -rf /" }),
);
- let result = handle_codex_message(msg, &tx, &ctx, &mut subagents);
+ let result = handle_codex_message(msg, &tx, &ctx, &mut subagents, &0);
match result {
HandleResult::NeedsApproval { rpc_id, .. } => assert_eq!(rpc_id, 100),
other => panic!(
diff --git a/crates/notedeck_dave/src/backend/codex_protocol.rs b/crates/notedeck_dave/src/backend/codex_protocol.rs
@@ -83,6 +83,13 @@ pub struct ThreadResumeParams {
pub thread_id: String,
}
+/// `thread/compact/start` params
+#[derive(Debug, Serialize)]
+#[serde(rename_all = "camelCase")]
+pub struct ThreadCompactParams {
+ pub thread_id: String,
+}
+
/// `turn/start` params
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
@@ -255,3 +262,23 @@ pub struct TurnCompletedParams {
pub turn_id: Option<String>,
pub error: Option<String>,
}
+
+/// `thread/tokenUsage/updated` params
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TokenUsageParams {
+ pub token_usage: TokenUsage,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TokenUsage {
+ pub total: TokenBreakdown,
+}
+
+#[derive(Debug, Deserialize)]
+#[serde(rename_all = "camelCase")]
+pub struct TokenBreakdown {
+ pub input_tokens: i64,
+ pub output_tokens: i64,
+}
diff --git a/crates/notedeck_dave/src/backend/shared.rs b/crates/notedeck_dave/src/backend/shared.rs
@@ -29,6 +29,11 @@ pub(crate) enum SessionCommand {
mode: PermissionMode,
ctx: egui::Context,
},
+ /// Trigger manual context compaction
+ Compact {
+ response_tx: mpsc::Sender<DaveApiResponse>,
+ ctx: egui::Context,
+ },
Shutdown,
}
diff --git a/crates/notedeck_dave/src/backend/traits.rs b/crates/notedeck_dave/src/backend/traits.rs
@@ -97,4 +97,15 @@ pub trait AiBackend: Send + Sync {
/// Set the permission mode for a session.
/// Plan mode makes Claude plan actions without executing them.
fn set_permission_mode(&self, session_id: String, mode: PermissionMode, ctx: egui::Context);
+
+ /// Trigger manual context compaction for a session.
+ /// Returns a receiver for CompactionStarted/CompactionComplete events.
+ /// Default implementation does nothing (backends that don't support it).
+ fn compact_session(
+ &self,
+ _session_id: String,
+ _ctx: egui::Context,
+ ) -> Option<mpsc::Receiver<DaveApiResponse>> {
+ None
+ }
}
diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs
@@ -2213,6 +2213,19 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
self.auto_steal_focus = new_state;
None
}
+ UiActionResult::Compact => {
+ if let Some(session) = self.session_manager.get_active() {
+ let session_id = session.id.to_string();
+ if let Some(rx) = get_backend(&self.backends, bt)
+ .compact_session(session_id, ui.ctx().clone())
+ {
+ if let Some(session) = self.session_manager.get_active_mut() {
+ session.incoming_tokens = Some(rx);
+ }
+ }
+ }
+ None
+ }
UiActionResult::Handled => None,
}
}
diff --git a/crates/notedeck_dave/src/ui/dave.rs b/crates/notedeck_dave/src/ui/dave.rs
@@ -154,6 +154,8 @@ pub enum DaveAction {
TogglePlanMode,
/// Toggle auto-steal focus mode (clicked AUTO badge)
ToggleAutoSteal,
+ /// Trigger manual context compaction
+ Compact,
}
impl<'a> DaveUi<'a> {
@@ -1572,6 +1574,17 @@ fn toggle_badges_ui(
action = Some(DaveAction::TogglePlanMode);
}
+ // COMPACT badge
+ let compact_badge =
+ super::badge::StatusBadge::new("COMPACT").variant(super::badge::BadgeVariant::Default);
+ if compact_badge
+ .show(ui)
+ .on_hover_text("Click to compact context")
+ .clicked()
+ {
+ action = Some(DaveAction::Compact);
+ }
+
action
}
diff --git a/crates/notedeck_dave/src/ui/mod.rs b/crates/notedeck_dave/src/ui/mod.rs
@@ -769,6 +769,8 @@ pub enum UiActionResult {
PublishPermissionResponse(update::PermissionPublish),
/// Toggle auto-steal focus mode (needs state from DaveApp)
ToggleAutoSteal,
+ /// Trigger manual context compaction
+ Compact,
}
/// Handle a UI action from DaveUi.
@@ -877,5 +879,6 @@ pub fn handle_ui_action(
UiActionResult::PublishPermissionResponse,
)
}
+ DaveAction::Compact => UiActionResult::Compact,
}
}