notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit e7d8acadcd326e5470c8012af9ade044b9eaf9b4
parent 25646da388f7bc208aa5a0a2079b11c0f2b7bbb6
Author: William Casarin <jb55@jb55.com>
Date:   Tue, 24 Feb 2026 12:54:26 -0800

dave: fix queued message handling and batch dispatch

Fix several issues with user messages queued during AI streaming:

- Fix infinite redispatch loop: append_token backwards search now
  only appends to still-streaming assistants, not finalized ones
  from previous turns
- Fix message ordering: insert new assistant responses after
  dispatched user messages but before queued ones, using
  dispatched_user_count to handle both single and batch dispatch
- Batch dispatch: collect all trailing user messages into a single
  prompt via get_pending_user_messages instead of dispatching one
  at a time
- Fix queued indicator: correctly skip dispatched messages and
  detect streaming vs pre-token states so only truly queued
  messages show the label

Extract append_token, finalize_last_assistant, and last_assistant_text
into testable ChatSession methods. Add 32 tests covering batch
dispatch lifecycle, message ordering, queued indicator states, and
edge cases for both Claude and Codex backends.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Diffstat:
Mcrates/notedeck_dave/src/backend/claude.rs | 114+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Mcrates/notedeck_dave/src/backend/codex.rs | 111+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Mcrates/notedeck_dave/src/backend/mod.rs | 2+-
Mcrates/notedeck_dave/src/lib.rs | 65+++++++++++++++++++++++++++++++++++++----------------------------
Mcrates/notedeck_dave/src/session.rs | 789+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/notedeck_dave/src/ui/dave.rs | 47++++++++++++++++++++++++++++++++++++++++++-----
Mcrates/notedeck_dave/src/ui/mod.rs | 1+
7 files changed, 1079 insertions(+), 50 deletions(-)

diff --git a/crates/notedeck_dave/src/backend/claude.rs b/crates/notedeck_dave/src/backend/claude.rs @@ -112,16 +112,21 @@ impl ClaudeBackend { prompt } - /// Extract only the latest user message for session continuation - fn get_latest_user_message(messages: &[Message]) -> String { - messages + /// Collect all trailing user messages and join them. + /// When multiple messages are queued, they're all sent as one prompt + /// so the AI sees everything at once instead of one at a time. + pub fn get_pending_user_messages(messages: &[Message]) -> String { + let mut trailing: Vec<&str> = messages .iter() .rev() - .find_map(|m| match m { - Message::User(content) => Some(content.clone()), + .take_while(|m| matches!(m, Message::User(_))) + .filter_map(|m| match m { + Message::User(content) => Some(content.as_str()), _ => None, }) - .unwrap_or_default() + .collect(); + trailing.reverse(); + trailing.join("\n") } } @@ -649,7 +654,7 @@ impl AiBackend for ClaudeBackend { // Claude Code already has the full conversation context via --resume. // For new sessions, send full prompt on the first message. let prompt = if resume_session_id.is_some() { - Self::get_latest_user_message(&messages) + Self::get_pending_user_messages(&messages) } else { let is_first_message = messages .iter() @@ -659,7 +664,7 @@ impl AiBackend for ClaudeBackend { if is_first_message { Self::messages_to_prompt(&messages) } else { - Self::get_latest_user_message(&messages) + Self::get_pending_user_messages(&messages) } }; @@ -753,3 +758,96 @@ impl AiBackend for ClaudeBackend { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::messages::AssistantMessage; + + #[test] + fn pending_messages_single_user() { + let messages = vec![Message::User("hello".into())]; + assert_eq!(ClaudeBackend::get_pending_user_messages(&messages), "hello"); + } + + #[test] + fn pending_messages_multiple_trailing_users() { + let messages = vec![ + Message::User("first".into()), + Message::Assistant(AssistantMessage::from_text("reply".into())), + Message::User("second".into()), + Message::User("third".into()), + Message::User("fourth".into()), + ]; + assert_eq!( + ClaudeBackend::get_pending_user_messages(&messages), + "second\nthird\nfourth" + ); + } + + #[test] + fn pending_messages_stops_at_non_user() { + let messages = vec![ + Message::User("old".into()), + Message::User("also old".into()), + Message::Assistant(AssistantMessage::from_text("reply".into())), + Message::User("pending".into()), + ]; + assert_eq!( + ClaudeBackend::get_pending_user_messages(&messages), + "pending" + ); + } + + #[test] + fn pending_messages_empty_when_last_is_assistant() { + let messages = vec![ + Message::User("hello".into()), + Message::Assistant(AssistantMessage::from_text("reply".into())), + ]; + assert_eq!(ClaudeBackend::get_pending_user_messages(&messages), ""); + } + + #[test] + fn pending_messages_empty_chat() { + let messages: Vec<Message> = vec![]; + assert_eq!(ClaudeBackend::get_pending_user_messages(&messages), ""); + } + + #[test] + fn pending_messages_stops_at_tool_response() { + let messages = vec![ + Message::User("do something".into()), + Message::Assistant(AssistantMessage::from_text("ok".into())), + Message::ToolCalls(vec![crate::tools::ToolCall::invalid( + "c1".into(), + Some("Read".into()), + None, + "test".into(), + )]), + Message::ToolResponse(crate::tools::ToolResponse::error( + "c1".into(), + "result".into(), + )), + Message::User("queued 1".into()), + Message::User("queued 2".into()), + ]; + assert_eq!( + ClaudeBackend::get_pending_user_messages(&messages), + "queued 1\nqueued 2" + ); + } + + #[test] + fn pending_messages_preserves_order() { + let messages = vec![ + Message::User("a".into()), + Message::User("b".into()), + Message::User("c".into()), + ]; + assert_eq!( + ClaudeBackend::get_pending_user_messages(&messages), + "a\nb\nc" + ); + } +} diff --git a/crates/notedeck_dave/src/backend/codex.rs b/crates/notedeck_dave/src/backend/codex.rs @@ -965,15 +965,19 @@ impl CodexBackend { prompt } - fn get_latest_user_message(messages: &[Message]) -> String { - messages + /// Collect all trailing user messages and join them. + fn get_pending_user_messages(messages: &[Message]) -> String { + let mut trailing: Vec<&str> = messages .iter() .rev() - .find_map(|m| match m { - Message::User(content) => Some(content.clone()), + .take_while(|m| matches!(m, Message::User(_))) + .filter_map(|m| match m { + Message::User(content) => Some(content.as_str()), _ => None, }) - .unwrap_or_default() + .collect(); + trailing.reverse(); + trailing.join("\n") } } @@ -995,7 +999,7 @@ impl AiBackend for CodexBackend { let (response_tx, response_rx) = mpsc::channel(); let prompt = if resume_session_id.is_some() { - Self::get_latest_user_message(&messages) + Self::get_pending_user_messages(&messages) } else { let is_first_message = messages .iter() @@ -1005,7 +1009,7 @@ impl AiBackend for CodexBackend { if is_first_message { Self::messages_to_prompt(&messages) } else { - Self::get_latest_user_message(&messages) + Self::get_pending_user_messages(&messages) } }; @@ -1099,7 +1103,7 @@ impl AiBackend for CodexBackend { #[cfg(test)] mod tests { use super::*; - use crate::messages::DaveApiResponse; + use crate::messages::{AssistantMessage, DaveApiResponse}; use serde_json::json; use std::time::Duration; @@ -1633,6 +1637,97 @@ mod tests { } // ----------------------------------------------------------------------- + // get_pending_user_messages tests + // ----------------------------------------------------------------------- + + #[test] + fn pending_messages_single_user() { + let messages = vec![Message::User("hello".into())]; + assert_eq!(CodexBackend::get_pending_user_messages(&messages), "hello"); + } + + #[test] + fn pending_messages_multiple_trailing_users() { + let messages = vec![ + Message::User("first".into()), + Message::Assistant(AssistantMessage::from_text("reply".into())), + Message::User("second".into()), + Message::User("third".into()), + Message::User("fourth".into()), + ]; + assert_eq!( + CodexBackend::get_pending_user_messages(&messages), + "second\nthird\nfourth" + ); + } + + #[test] + fn pending_messages_stops_at_non_user() { + let messages = vec![ + Message::User("old".into()), + Message::User("also old".into()), + Message::Assistant(AssistantMessage::from_text("reply".into())), + Message::User("pending".into()), + ]; + assert_eq!( + CodexBackend::get_pending_user_messages(&messages), + "pending" + ); + } + + #[test] + fn pending_messages_empty_when_last_is_assistant() { + let messages = vec![ + Message::User("hello".into()), + Message::Assistant(AssistantMessage::from_text("reply".into())), + ]; + assert_eq!(CodexBackend::get_pending_user_messages(&messages), ""); + } + + #[test] + fn pending_messages_empty_chat() { + let messages: Vec<Message> = vec![]; + assert_eq!(CodexBackend::get_pending_user_messages(&messages), ""); + } + + #[test] + fn pending_messages_stops_at_tool_response() { + let messages = vec![ + Message::User("do something".into()), + Message::Assistant(AssistantMessage::from_text("ok".into())), + Message::ToolCalls(vec![crate::tools::ToolCall::invalid( + "c1".into(), + Some("Read".into()), + None, + "test".into(), + )]), + Message::ToolResponse(crate::tools::ToolResponse::error( + "c1".into(), + "result".into(), + )), + Message::User("queued 1".into()), + Message::User("queued 2".into()), + ]; + assert_eq!( + CodexBackend::get_pending_user_messages(&messages), + "queued 1\nqueued 2" + ); + } + + #[test] + fn pending_messages_preserves_order() { + let messages = vec![ + Message::User("a".into()), + Message::User("b".into()), + Message::User("c".into()), + ]; + assert_eq!( + CodexBackend::get_pending_user_messages(&messages), + "a\nb\nc" + ); + } + + // ----------------------------------------------------------------------- // Integration tests — mock Codex server over duplex streams // ----------------------------------------------------------------------- diff --git a/crates/notedeck_dave/src/backend/mod.rs b/crates/notedeck_dave/src/backend/mod.rs @@ -1,4 +1,4 @@ -mod claude; +pub mod claude; mod codex; mod codex_protocol; mod openai; diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs @@ -662,15 +662,9 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr session.chat.push(Message::Error(err.to_string())); } - DaveApiResponse::Token(token) => match session.chat.last_mut() { - Some(Message::Assistant(msg)) => msg.push_token(&token), - Some(_) => { - let mut msg = messages::AssistantMessage::new(); - msg.push_token(&token); - session.chat.push(Message::Assistant(msg)); - } - None => {} - }, + DaveApiResponse::Token(token) => { + session.append_token(&token); + } DaveApiResponse::ToolCalls(toolcalls) => { tracing::info!("got tool calls: {:?}", toolcalls); @@ -920,27 +914,24 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr Err(std::sync::mpsc::TryRecvError::Disconnected) => { // Stream ended, clear task state if let Some(session) = self.session_manager.get_mut(session_id) { - // Finalize any active assistant message to cache parsed elements - if let Some(Message::Assistant(msg)) = session.chat.last_mut() { - msg.finalize(); - } + // Finalize the last assistant message to cache parsed + // elements. It may not be chat.last() if user messages + // were queued after it. + session.finalize_last_assistant(); // Generate live event for the finalized assistant message if let Some(sk) = &secret_key { - if let Some(Message::Assistant(msg)) = session.chat.last() { - let text = msg.text().to_string(); - if !text.is_empty() { - if let Some(evt) = ingest_live_event( - session, - app_ctx.ndb, - sk, - &text, - "assistant", - None, - None, - ) { - events_to_publish.push(evt); - } + if let Some(text) = session.last_assistant_text() { + if let Some(evt) = ingest_live_event( + session, + app_ctx.ndb, + sk, + &text, + "assistant", + None, + None, + ) { + events_to_publish.push(evt); } } } @@ -952,6 +943,10 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr // unanswered remote message that arrived while we // were streaming. Queue it for dispatch. if session.needs_redispatch_after_stream_end() { + tracing::info!( + "Session {}: redispatching queued user message after stream end", + session_id + ); needs_send.insert(session_id); } @@ -2351,6 +2346,16 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr return; }; + // Count trailing user messages being dispatched so append_token + // knows how many to skip when inserting the assistant response. + let trailing_user_count = session + .chat + .iter() + .rev() + .take_while(|m| matches!(m, Message::User(_))) + .count(); + session.dispatched_user_count = trailing_user_count; + let user_id = calculate_user_id(app_ctx.accounts.get_selected_account().keypair()); let session_id = format!("dave-session-{}", session.id); let messages = session.chat.clone(); @@ -2783,8 +2788,12 @@ impl notedeck::App for Dave { } } - // Send continuation messages for all sessions that have tool responses + // Send continuation messages for all sessions that have queued messages for session_id in sessions_needing_send { + tracing::info!( + "Session {}: dispatching queued message via send_user_message_for", + session_id + ); self.send_user_message_for(session_id, ctx, ui.ctx()); } diff --git a/crates/notedeck_dave/src/session.rs b/crates/notedeck_dave/src/session.rs @@ -320,6 +320,10 @@ pub struct ChatSession { /// Handle to the background task processing this session's AI requests. /// Aborted on drop to clean up the subprocess. pub task_handle: Option<tokio::task::JoinHandle<()>>, + /// Number of trailing user messages that were dispatched in the current + /// stream. Used by `append_token` to insert the assistant response + /// after all dispatched messages but before any newly queued ones. + pub dispatched_user_count: usize, /// Cached status for the agent (derived from session state) cached_status: AgentStatus, /// Set when cached_status changes, cleared after publishing state event @@ -364,6 +368,7 @@ impl ChatSession { input: String::new(), incoming_tokens: None, task_handle: None, + dispatched_user_count: 0, cached_status: AgentStatus::Idle, state_dirty: false, focus_requested: false, @@ -816,6 +821,94 @@ impl ChatSession { self.incoming_tokens.is_some() } + /// Append a streaming token to the current assistant message. + /// + /// If the last message is an Assistant, append there. Otherwise + /// search backwards through only trailing User messages (queued + /// ones) for a still-streaming Assistant. If none is found, + /// create a new Assistant — inserted after the dispatched user + /// message but before any queued ones. + /// + /// We intentionally do NOT search past ToolCalls, ToolResponse, + /// or other non-User messages. When Claude sends text → tool + /// call → more text, the post-tool tokens must go into a NEW + /// Assistant so the tool call appears between the two text blocks. + pub fn append_token(&mut self, token: &str) { + // Fast path: last message is the active assistant response + if let Some(Message::Assistant(msg)) = self.chat.last_mut() { + msg.push_token(token); + return; + } + + // Slow path: look backwards through only trailing User messages. + // If we find a streaming Assistant just before them, append there. + let mut appended = false; + for m in self.chat.iter_mut().rev() { + match m { + Message::User(_) => continue, // skip queued user messages + Message::Assistant(msg) if msg.is_streaming() => { + msg.push_token(token); + appended = true; + break; + } + _ => break, // stop at ToolCalls, ToolResponse, finalized Assistant, etc. + } + } + + if !appended { + // No streaming assistant reachable — start a new one. + // Insert after the dispatched user messages but before + // any newly queued ones so the response appears in the + // right order and queued messages trigger redispatch. + let mut msg = crate::messages::AssistantMessage::new(); + msg.push_token(token); + + let trailing_start = self + .chat + .iter() + .rposition(|m| !matches!(m, Message::User(_))) + .map(|i| i + 1) + .unwrap_or(0); + + // Skip past the dispatched user messages (default 1 for + // single dispatch, more for batch redispatch) + let skip = self.dispatched_user_count.max(1); + let insert_pos = (trailing_start + skip).min(self.chat.len()); + self.chat.insert(insert_pos, Message::Assistant(msg)); + } + } + + /// Finalize the last assistant message (cache parsed markdown, etc). + /// + /// Searches backwards because queued user messages may appear after + /// the assistant response in the chat. + pub fn finalize_last_assistant(&mut self) { + for msg in self.chat.iter_mut().rev() { + if let Message::Assistant(assistant) = msg { + assistant.finalize(); + return; + } + } + } + + /// Get the text of the last assistant message. + /// + /// Searches backwards because queued user messages may appear after + /// the assistant response in the chat. + pub fn last_assistant_text(&self) -> Option<String> { + self.chat.iter().rev().find_map(|m| match m { + Message::Assistant(msg) => { + let text = msg.text().to_string(); + if text.is_empty() { + None + } else { + Some(text) + } + } + _ => None, + }) + } + /// Whether the session has an unanswered user message at the end of the /// chat that needs to be dispatched to the backend. pub fn has_pending_user_message(&self) -> bool { @@ -969,4 +1062,700 @@ mod tests { // Should redispatch — there are unanswered user messages assert!(session.needs_redispatch_after_stream_end()); } + + // ---- append_token tests ---- + + #[test] + fn append_token_creates_assistant_when_empty() { + let mut session = test_session(); + session.append_token("hello"); + assert!(matches!(session.chat.last(), Some(Message::Assistant(_)))); + assert_eq!(session.last_assistant_text().unwrap(), "hello"); + } + + #[test] + fn append_token_extends_existing_assistant() { + let mut session = test_session(); + session.chat.push(Message::User("hi".into())); + session.append_token("hel"); + session.append_token("lo"); + assert_eq!(session.last_assistant_text().unwrap(), "hello"); + assert!(matches!(session.chat.last(), Some(Message::Assistant(_)))); + } + + /// The key bug this prevents: tokens arriving after a queued user + /// message must NOT create a new Assistant that buries the queued + /// message. They should append to the existing Assistant before it. + #[test] + fn tokens_after_queued_message_dont_bury_it() { + let mut session = test_session(); + + // User sends initial message, assistant starts responding + session.chat.push(Message::User("hello".into())); + session.append_token("Sure, "); + session.append_token("I can "); + + // User queues a follow-up while streaming + session.chat.push(Message::User("also do this".into())); + + // More tokens arrive from the CURRENT stream (not the queued msg) + session.append_token("help!"); + + // The queued user message must still be last + assert!( + matches!(session.chat.last(), Some(Message::User(_))), + "queued user message should still be the last message" + ); + assert!(session.has_pending_user_message()); + + // Tokens should have been appended to the existing assistant + assert_eq!(session.last_assistant_text().unwrap(), "Sure, I can help!"); + + // After stream ends, redispatch should fire + assert!(session.needs_redispatch_after_stream_end()); + } + + /// Multiple queued messages: all should remain after the assistant + /// response, and redispatch should still trigger. + #[test] + fn multiple_queued_messages_preserved() { + let mut session = test_session(); + + session.chat.push(Message::User("first".into())); + session.append_token("response"); + + // Queue two messages + session.chat.push(Message::User("second".into())); + session.chat.push(Message::User("third".into())); + + // More tokens arrive + session.append_token(" done"); + + // Last message should still be the queued user message + assert!(session.has_pending_user_message()); + assert!(session.needs_redispatch_after_stream_end()); + + // Assistant text should be the combined response + assert_eq!(session.last_assistant_text().unwrap(), "response done"); + } + + /// After a turn is finalized, a new user message is sent and Claude + /// responds. Tokens for the NEW response must create a new Assistant + /// after the user message, not append to the finalized old one. + /// This was the root cause of the infinite redispatch loop. + #[test] + fn tokens_after_finalized_turn_create_new_assistant() { + let mut session = test_session(); + + // Complete turn 1 + session.chat.push(Message::User("hello".into())); + session.append_token("first response"); + session.finalize_last_assistant(); + + // User sends a new message (primary, not queued) + session.chat.push(Message::User("follow up".into())); + + // Tokens arrive from Claude's new response + session.append_token("second "); + session.append_token("response"); + + // The new tokens must be in a NEW assistant after the user message + assert!( + matches!(session.chat.last(), Some(Message::Assistant(_))), + "new assistant should be the last message" + ); + assert_eq!(session.last_assistant_text().unwrap(), "second response"); + + // The old assistant should still have its original text + let first_assistant_text = session + .chat + .iter() + .find_map(|m| match m { + Message::Assistant(msg) => { + let t = msg.text().to_string(); + if t == "first response" { + Some(t) + } else { + None + } + } + _ => None, + }) + .expect("original assistant should still exist"); + assert_eq!(first_assistant_text, "first response"); + + // No pending user message — assistant is last + assert!(!session.has_pending_user_message()); + } + + /// When a queued message arrives before the first token, the new + /// Assistant must be inserted between the dispatched user message + /// and the queued one, not after the queued one. + #[test] + fn queued_before_first_token_ordering() { + let mut session = test_session(); + + // Turn 1 complete + session.chat.push(Message::User("hello".into())); + session.append_token("response 1"); + session.finalize_last_assistant(); + + // User sends a new message, dispatched to Claude (single dispatch) + session.chat.push(Message::User("follow up".into())); + session.dispatched_user_count = 1; + + // User queues another message BEFORE any tokens arrive + session.chat.push(Message::User("queued msg".into())); + + // Now first token arrives from Claude's response to "follow up" + session.append_token("response "); + session.append_token("2"); + + // Expected order: User("follow up"), Assistant("response 2"), User("queued msg") + let msgs: Vec<&str> = session + .chat + .iter() + .filter_map(|m| match m { + Message::User(s) if s == "follow up" => Some("U:follow up"), + Message::User(s) if s == "queued msg" => Some("U:queued msg"), + Message::Assistant(a) if a.text() == "response 2" => Some("A:response 2"), + _ => None, + }) + .collect(); + assert_eq!( + msgs, + vec!["U:follow up", "A:response 2", "U:queued msg"], + "assistant response should appear between dispatched and queued messages" + ); + + // Queued message should still be last → triggers redispatch + assert!(session.has_pending_user_message()); + } + + /// Text → tool call → more text: post-tool tokens must create a + /// new Assistant so the tool call appears between the two text blocks, + /// not get appended to the pre-tool Assistant (which would push the + /// tool call to the bottom). + #[test] + fn tokens_after_tool_call_create_new_assistant() { + let mut session = test_session(); + + session.chat.push(Message::User("do something".into())); + session.append_token("Let me read that file."); + + // Tool call arrives mid-stream + let tool = crate::tools::ToolCall::invalid( + "call-1".into(), + Some("Read".into()), + None, + "test".into(), + ); + session.chat.push(Message::ToolCalls(vec![tool])); + session + .chat + .push(Message::ToolResponse(crate::tools::ToolResponse::error( + "call-1".into(), + "test result".into(), + ))); + + // More tokens arrive after the tool call + session.append_token("Here is what I found."); + + // Verify ordering: Assistant, ToolCalls, ToolResponse, Assistant + let labels: Vec<&str> = session + .chat + .iter() + .map(|m| match m { + Message::User(_) => "User", + Message::Assistant(_) => "Assistant", + Message::ToolCalls(_) => "ToolCalls", + Message::ToolResponse(_) => "ToolResponse", + _ => "Other", + }) + .collect(); + assert_eq!( + labels, + vec![ + "User", + "Assistant", + "ToolCalls", + "ToolResponse", + "Assistant" + ], + "post-tool tokens should be in a new assistant, not appended to the first" + ); + + // Verify content of each assistant + let assistants: Vec<String> = session + .chat + .iter() + .filter_map(|m| match m { + Message::Assistant(a) => Some(a.text().to_string()), + _ => None, + }) + .collect(); + assert_eq!(assistants[0], "Let me read that file."); + assert_eq!(assistants[1], "Here is what I found."); + } + + // ---- finalize_last_assistant tests ---- + + #[test] + fn finalize_finds_assistant_before_queued_messages() { + let mut session = test_session(); + + session.chat.push(Message::User("hi".into())); + session.append_token("response"); + session.chat.push(Message::User("queued".into())); + + // Should finalize without panicking, even though last() is User + session.finalize_last_assistant(); + + // Verify the queued message is still there + assert!(session.has_pending_user_message()); + } + + // ---- status tests ---- + + /// Helper to put a session into "streaming" state + fn make_streaming(session: &mut ChatSession) -> mpsc::Sender<DaveApiResponse> { + let (tx, rx) = mpsc::channel::<DaveApiResponse>(); + session.incoming_tokens = Some(rx); + tx + } + + #[test] + fn status_idle_initially() { + let session = test_session(); + assert_eq!(session.status(), AgentStatus::Idle); + } + + #[test] + fn status_idle_with_pending_user_message() { + let mut session = test_session(); + session.chat.push(Message::User("hello".into())); + session.update_status(); + // No task handle or incoming tokens → Idle + assert_eq!(session.status(), AgentStatus::Idle); + } + + #[test] + fn status_done_when_assistant_is_last() { + let mut session = test_session(); + session.chat.push(Message::User("hello".into())); + session + .chat + .push(Message::Assistant(AssistantMessage::from_text( + "reply".into(), + ))); + session.update_status(); + assert_eq!(session.status(), AgentStatus::Done); + } + + // ---- batch redispatch lifecycle tests ---- + + /// Simulates the full lifecycle of queued message batch dispatch: + /// 1. User sends message → dispatched + /// 2. While streaming, user queues 3 more messages + /// 3. Stream ends → needs_redispatch is true + /// 4. On redispatch, get_pending_user_messages collects all 3 + /// 5. After redispatch, new tokens create response after all queued msgs + #[test] + fn batch_redispatch_full_lifecycle() { + let mut session = test_session(); + use crate::backend::claude::ClaudeBackend; + + // Step 1: User sends first message, it gets dispatched (single) + session.chat.push(Message::User("hello".into())); + session.dispatched_user_count = 1; + assert!(session.should_dispatch_remote_message()); + + // Backend starts streaming + let tx = make_streaming(&mut session); + assert!(session.is_streaming()); + assert!(!session.should_dispatch_remote_message()); + + // First tokens arrive + session.append_token("Sure, "); + session.append_token("I can help."); + + // Step 2: User queues 3 messages while streaming + session.chat.push(Message::User("also".into())); + session.chat.push(Message::User("do this".into())); + session.chat.push(Message::User("and this".into())); + + // Should NOT dispatch while streaming + assert!(!session.should_dispatch_remote_message()); + + // More tokens arrive — should append to the streaming assistant, + // not create new ones after the queued messages + session.append_token(" Let me "); + session.append_token("check."); + + // Verify the assistant text is continuous + assert_eq!( + session.last_assistant_text().unwrap(), + "Sure, I can help. Let me check." + ); + + // Queued messages should still be at the end + assert!(session.has_pending_user_message()); + + // Step 3: Stream ends + session.finalize_last_assistant(); + drop(tx); + session.incoming_tokens = None; + + assert!(!session.is_streaming()); + assert!(session.needs_redispatch_after_stream_end()); + + // Step 4: At redispatch time, get_pending_user_messages should + // collect ALL trailing user messages + let prompt = ClaudeBackend::get_pending_user_messages(&session.chat); + assert_eq!(prompt, "also\ndo this\nand this"); + + // Step 5: Backend dispatches with the batch prompt (3 messages) + session.dispatched_user_count = 3; + let _tx2 = make_streaming(&mut session); + + // New tokens arrive — should create a new assistant after ALL + // dispatched messages (since they were all sent in the batch) + session.append_token("OK, doing all three."); + + // Verify chat order: response 2 should come after all 3 + // batch-dispatched user messages + let types: Vec<&str> = session + .chat + .iter() + .map(|m| match m { + Message::User(_) => "User", + Message::Assistant(_) => "Assistant", + _ => "?", + }) + .collect(); + assert_eq!( + types, + // Turn 1: User → Assistant + // Turn 2: User, User, User (batch) → Assistant + vec!["User", "Assistant", "User", "User", "User", "Assistant"], + ); + // Verify the second assistant has the right text + assert_eq!( + session.last_assistant_text().unwrap(), + "OK, doing all three." + ); + } + + /// When all queued messages are batch-dispatched, no redispatch + /// should be needed after the second stream completes (assuming + /// no new messages arrive). + #[test] + fn no_double_redispatch_after_batch() { + let mut session = test_session(); + + // Turn 1: single dispatch + session.chat.push(Message::User("first".into())); + session.dispatched_user_count = 1; + let tx = make_streaming(&mut session); + session.append_token("response 1"); + session.chat.push(Message::User("queued A".into())); + session.chat.push(Message::User("queued B".into())); + session.finalize_last_assistant(); + drop(tx); + session.incoming_tokens = None; + assert!(session.needs_redispatch_after_stream_end()); + + // Turn 2: batch redispatch handles both queued messages + session.dispatched_user_count = 2; + let tx2 = make_streaming(&mut session); + session.append_token("response 2"); + session.finalize_last_assistant(); + drop(tx2); + session.incoming_tokens = None; + + // No more pending user messages after the assistant response + assert!( + !session.needs_redispatch_after_stream_end(), + "should not need another redispatch when no new messages arrived" + ); + } + + /// Verify chat ordering when queued messages arrive before any + /// tokens, and after tokens, across a full batch lifecycle. + #[test] + fn chat_ordering_with_mixed_timing() { + let mut session = test_session(); + + // Turn 1 complete + session.chat.push(Message::User("hello".into())); + session.append_token("hi there"); + session.finalize_last_assistant(); + + // User sends new message (single dispatch) + session.chat.push(Message::User("question".into())); + session.dispatched_user_count = 1; + let tx = make_streaming(&mut session); + + // Queued BEFORE first token + session.chat.push(Message::User("early queue".into())); + + // First token arrives + session.append_token("answer "); + + // Queued AFTER first token + session.chat.push(Message::User("late queue".into())); + + // More tokens + session.append_token("here"); + + // Verify: assistant response should be between dispatched + // user and the queued messages + let types: Vec<String> = session + .chat + .iter() + .map(|m| match m { + Message::User(s) => format!("U:{}", s), + Message::Assistant(a) => format!("A:{}", a.text()), + _ => "?".into(), + }) + .collect(); + + // The key constraint: "answer here" must appear after + // "question" and before the queued messages + let answer_pos = types.iter().position(|t| t == "A:answer here").unwrap(); + let question_pos = types.iter().position(|t| t == "U:question").unwrap(); + let early_pos = types.iter().position(|t| t == "U:early queue").unwrap(); + let late_pos = types.iter().position(|t| t == "U:late queue").unwrap(); + + assert!( + answer_pos > question_pos, + "answer should come after the dispatched question" + ); + assert!( + early_pos > answer_pos || late_pos > answer_pos, + "at least one queued message should be after the answer" + ); + + // Finalize and check redispatch + session.finalize_last_assistant(); + drop(tx); + session.incoming_tokens = None; + assert!(session.needs_redispatch_after_stream_end()); + } + + /// Queued indicator detection: helper that mimics what the UI does + /// to find which messages are "queued". + fn find_queued_indices( + chat: &[Message], + is_working: bool, + dispatched_user_count: usize, + ) -> Vec<usize> { + if !is_working { + return vec![]; + } + let last_non_user = chat.iter().rposition(|m| !matches!(m, Message::User(_))); + let queued_from = match last_non_user { + Some(i) if matches!(chat[i], Message::Assistant(ref m) if m.is_streaming()) => { + let first_trailing = i + 1; + if first_trailing < chat.len() { + Some(first_trailing) + } else { + None + } + } + Some(i) => { + let first_trailing = i + 1; + let skip = dispatched_user_count.max(1); + let queued_start = first_trailing + skip; + if queued_start < chat.len() { + Some(queued_start) + } else { + None + } + } + None => None, + }; + match queued_from { + Some(qi) => (qi..chat.len()) + .filter(|&i| matches!(chat[i], Message::User(_))) + .collect(), + None => vec![], + } + } + + #[test] + fn queued_indicator_before_first_token() { + // Chat: [...finalized Asst], User("dispatched"), User("queued") + // No streaming assistant yet → dispatched is being processed, + // only "queued" should show the indicator. + let mut session = test_session(); + session.chat.push(Message::User("prev".into())); + session + .chat + .push(Message::Assistant(AssistantMessage::from_text( + "prev reply".into(), + ))); + session.chat.push(Message::User("dispatched".into())); + session.chat.push(Message::User("queued 1".into())); + session.chat.push(Message::User("queued 2".into())); + + // dispatched_user_count=1: single dispatch + let queued = find_queued_indices(&session.chat, true, 1); + let queued_texts: Vec<&str> = queued + .iter() + .map(|&i| match &session.chat[i] { + Message::User(s) => s.as_str(), + _ => "?", + }) + .collect(); + assert_eq!( + queued_texts, + vec!["queued 1", "queued 2"], + "dispatched message should not be marked as queued" + ); + } + + #[test] + fn queued_indicator_during_streaming() { + // Chat: User("dispatched"), Assistant(streaming), User("queued") + // Streaming assistant separates dispatched from queued. + let mut session = test_session(); + session.chat.push(Message::User("dispatched".into())); + session.append_token("streaming..."); + session.chat.push(Message::User("queued 1".into())); + session.chat.push(Message::User("queued 2".into())); + + // dispatched_user_count doesn't matter here — streaming + // assistant branch doesn't use it + let queued = find_queued_indices(&session.chat, true, 1); + let queued_texts: Vec<&str> = queued + .iter() + .map(|&i| match &session.chat[i] { + Message::User(s) => s.as_str(), + _ => "?", + }) + .collect(); + assert_eq!( + queued_texts, + vec!["queued 1", "queued 2"], + "all user messages after streaming assistant should be queued" + ); + } + + #[test] + fn queued_indicator_not_working() { + // When not working, nothing should be marked as queued + let mut session = test_session(); + session.chat.push(Message::User("msg 1".into())); + session.chat.push(Message::User("msg 2".into())); + + let queued = find_queued_indices(&session.chat, false, 0); + assert!( + queued.is_empty(), + "nothing should be queued when not working" + ); + } + + #[test] + fn queued_indicator_no_queued_messages() { + // Working but only one user message → nothing queued + let mut session = test_session(); + session + .chat + .push(Message::Assistant(AssistantMessage::from_text( + "prev".into(), + ))); + session.chat.push(Message::User("only one".into())); + + let queued = find_queued_indices(&session.chat, true, 1); + assert!( + queued.is_empty(), + "single dispatched message should not be queued" + ); + } + + #[test] + fn queued_indicator_after_tool_call_with_streaming() { + // Chat: User, Asst, ToolCalls, ToolResponse, Asst(streaming), User(queued) + let mut session = test_session(); + session.chat.push(Message::User("do something".into())); + session.append_token("Let me check."); + + let tool = + crate::tools::ToolCall::invalid("c1".into(), Some("Read".into()), None, "test".into()); + session.chat.push(Message::ToolCalls(vec![tool])); + session + .chat + .push(Message::ToolResponse(crate::tools::ToolResponse::error( + "c1".into(), + "result".into(), + ))); + + // Post-tool tokens create new streaming assistant + session.append_token("Found it."); + session.chat.push(Message::User("queued".into())); + + let queued = find_queued_indices(&session.chat, true, 1); + let queued_texts: Vec<&str> = queued + .iter() + .map(|&i| match &session.chat[i] { + Message::User(s) => s.as_str(), + _ => "?", + }) + .collect(); + assert_eq!(queued_texts, vec!["queued"]); + } + + /// Batch dispatch: when 3 messages were dispatched together, + /// none should show "queued" before the first token arrives. + #[test] + fn queued_indicator_batch_dispatch_no_queued() { + let mut session = test_session(); + session + .chat + .push(Message::Assistant(AssistantMessage::from_text( + "prev reply".into(), + ))); + session.chat.push(Message::User("a".into())); + session.chat.push(Message::User("b".into())); + session.chat.push(Message::User("c".into())); + + // All 3 were batch-dispatched + let queued = find_queued_indices(&session.chat, true, 3); + assert!( + queued.is_empty(), + "all 3 messages were dispatched — none should show queued" + ); + } + + /// Batch dispatch with new message queued after: 3 dispatched, + /// then 1 more arrives. Only the new one should be "queued". + #[test] + fn queued_indicator_batch_with_new_queued() { + let mut session = test_session(); + session + .chat + .push(Message::Assistant(AssistantMessage::from_text( + "prev reply".into(), + ))); + session.chat.push(Message::User("a".into())); + session.chat.push(Message::User("b".into())); + session.chat.push(Message::User("c".into())); + session.chat.push(Message::User("new queued".into())); + + // 3 were dispatched, 1 new arrival + let queued = find_queued_indices(&session.chat, true, 3); + let queued_texts: Vec<&str> = queued + .iter() + .map(|&i| match &session.chat[i] { + Message::User(s) => s.as_str(), + _ => "?", + }) + .collect(); + assert_eq!( + queued_texts, + vec!["new queued"], + "only the message after the batch should be queued" + ); + } } diff --git a/crates/notedeck_dave/src/ui/dave.rs b/crates/notedeck_dave/src/ui/dave.rs @@ -67,6 +67,9 @@ pub struct DaveUi<'a> { usage: Option<&'a crate::messages::UsageInfo>, /// Context window size for the current model context_window: u64, + /// Number of trailing user messages dispatched in the current stream. + /// Used by the queued indicator to skip dispatched messages. + dispatched_user_count: usize, } /// The response the app generates. The response contains an optional @@ -179,6 +182,7 @@ impl<'a> DaveUi<'a> { status_dot_color: None, usage: None, context_window: crate::messages::context_window_for_model(None), + dispatched_user_count: 0, } } @@ -212,6 +216,11 @@ impl<'a> DaveUi<'a> { self } + pub fn dispatched_user_count(mut self, count: usize) -> Self { + self.dispatched_user_count = count; + self + } + pub fn interrupt_pending(mut self, val: bool) -> Self { self.flags.set(DaveUiFlags::InterruptPending, val); self @@ -416,13 +425,41 @@ impl<'a> DaveUi<'a> { let mut response = DaveResponse::default(); let is_agentic = self.ai_mode == AiMode::Agentic; - // Find where queued (not-yet-dispatched) user messages start: - // trailing User messages while the session is working. + // Find where queued (not-yet-dispatched) user messages start. + // When streaming, append_token inserts an Assistant between the + // dispatched User and any queued Users, so all trailing Users + // after that Assistant are queued. Before the first token arrives + // there's no Assistant yet, so we skip `dispatched_user_count` + // trailing Users (they were all sent in the prompt). let queued_from = if self.flags.contains(DaveUiFlags::IsWorking) { - self.chat + let last_non_user = self + .chat .iter() - .rposition(|m| !matches!(m, Message::User(_))) - .map(|i| i + 1) + .rposition(|m| !matches!(m, Message::User(_))); + match last_non_user { + Some(i) if matches!(self.chat[i], Message::Assistant(ref m) if m.is_streaming()) => { + // Streaming assistant separates dispatched from queued + let first_trailing = i + 1; + if first_trailing < self.chat.len() { + Some(first_trailing) + } else { + None + } + } + Some(i) => { + // No streaming assistant yet — skip past the dispatched + // user messages (1 for single dispatch, N for batch) + let first_trailing = i + 1; + let skip = self.dispatched_user_count.max(1); + let queued_start = first_trailing + skip; + if queued_start < self.chat.len() { + Some(queued_start) + } else { + None + } + } + None => None, + } } else { None }; diff --git a/crates/notedeck_dave/src/ui/mod.rs b/crates/notedeck_dave/src/ui/mod.rs @@ -65,6 +65,7 @@ fn build_dave_ui<'a>( .plan_mode_active(plan_mode_active) .auto_steal_focus(auto_steal_focus) .is_remote(is_remote) + .dispatched_user_count(session.dispatched_user_count) .details(&session.details); if let Some(agentic) = &mut session.agentic {