commit 138d8024b2d89020e4170f5296af4cd85f3d8e40
parent a11d1ac0ef5b254a80e2b51547f38cd12307a0e3
Author: William Casarin <jb55@jb55.com>
Date: Wed, 25 Feb 2026 13:53:24 -0800
dave: replace dispatched_user_count with DispatchState state machine
Replace the raw dispatched_user_count counter with an explicit
DispatchState enum (Idle → AwaitingResponse → Streaming → Idle)
that tracks the full dispatch lifecycle.
This fixes:
- Infinite redispatch loop when backend returns empty (e.g. /refactor
skill commands) — AwaitingResponse at stream end now correctly
detects that no new messages arrived beyond what was dispatched.
- Messages sent during the dispatch→first-token window not being
queued — handle_user_send() now checks is_dispatched() which
covers the full lifecycle, not just incoming_tokens.is_some().
- Silent failure on empty responses — shows "No response from
backend" error instead of going idle with no feedback.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat:
4 files changed, 226 insertions(+), 78 deletions(-)
diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs
@@ -668,6 +668,19 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
}
}
+ // Backend produced real content — transition dispatch
+ // state so redispatch knows the backend consumed our
+ // messages (AwaitingResponse → Streaming).
+ if !matches!(
+ res,
+ DaveApiResponse::SessionInfo(_)
+ | DaveApiResponse::CompactionStarted
+ | DaveApiResponse::CompactionComplete(_)
+ | DaveApiResponse::QueryComplete(_)
+ ) {
+ session.dispatch_state.backend_responded();
+ }
+
match res {
DaveApiResponse::Failed(ref err) => {
session.chat.push(Message::Error(err.to_string()));
@@ -2256,10 +2269,11 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
return;
}
- // If already streaming, queue the message in chat without dispatching.
+ // If already dispatched (waiting for or receiving response), queue
+ // the message in chat without dispatching.
// needs_redispatch_after_stream_end() will dispatch it when the
// current turn finishes.
- if session.is_streaming() {
+ if session.is_dispatched() {
tracing::info!("message queued, will dispatch after current turn");
return;
}
@@ -2287,15 +2301,10 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
return;
}
- // Count trailing user messages being dispatched so append_token
- // knows how many to skip when inserting the assistant response.
- let trailing_user_count = session
- .chat
- .iter()
- .rev()
- .take_while(|m| matches!(m, Message::User(_)))
- .count();
- session.dispatched_user_count = trailing_user_count;
+ // Record how many trailing user messages we're dispatching.
+ // DispatchState tracks this for append_token insert position,
+ // UI queued indicator, and redispatch-after-stream-end logic.
+ session.mark_dispatched();
let user_id = calculate_user_id(app_ctx.accounts.get_selected_account().keypair());
let session_id = format!("dave-session-{}", session.id);
@@ -2865,8 +2874,22 @@ fn handle_stream_end(
session.task_handle = None;
- // If chat ends with a user message, there's an unanswered remote message
- // that arrived while we were streaming. Queue it for dispatch.
+ // If the backend returned nothing (dispatch_state never left
+ // AwaitingResponse), show an error so the user isn't left staring
+ // at silence.
+ if matches!(
+ session.dispatch_state,
+ session::DispatchState::AwaitingResponse { .. }
+ ) && session.last_assistant_text().is_none()
+ {
+ tracing::warn!("Session {}: backend returned empty response", session_id);
+ session
+ .chat
+ .push(Message::Error("No response from backend".into()));
+ }
+
+ // Check redispatch BEFORE resetting dispatch_state — the check
+ // reads the state to distinguish empty responses from new messages.
if session.needs_redispatch_after_stream_end() {
tracing::info!(
"Session {}: redispatching queued user message after stream end",
@@ -2875,6 +2898,8 @@ fn handle_stream_end(
needs_send.insert(session_id);
}
+ session.dispatch_state.stream_ended();
+
// After compact & approve: compaction must have completed
// (ReadyToProceed) before we send "Proceed".
if session.take_compact_and_proceed() {
diff --git a/crates/notedeck_dave/src/session.rs b/crates/notedeck_dave/src/session.rs
@@ -311,6 +311,51 @@ impl AgenticSessionData {
}
}
+/// Tracks the lifecycle of a dispatch to the AI backend.
+///
+/// Transitions:
+/// - `Idle → AwaitingResponse` when `send_user_message_for()` dispatches
+/// - `AwaitingResponse → Streaming` when the backend produces content
+/// - `Streaming | AwaitingResponse → Idle` at stream end
+#[derive(Clone, Copy, Debug, Default, PartialEq)]
+pub enum DispatchState {
+ /// No active dispatch.
+ #[default]
+ Idle,
+ /// Dispatched `count` trailing user messages; backend hasn't
+ /// produced visible content yet.
+ AwaitingResponse { count: usize },
+ /// Backend is actively producing content for this dispatch.
+ Streaming { dispatched_count: usize },
+}
+
+impl DispatchState {
+ /// Number of user messages that were dispatched in the current batch.
+ /// Used by `append_token` for insert position and UI for queued indicator.
+ pub fn dispatched_count(&self) -> usize {
+ match self {
+ DispatchState::Idle => 0,
+ DispatchState::AwaitingResponse { count } => *count,
+ DispatchState::Streaming { dispatched_count } => *dispatched_count,
+ }
+ }
+
+ /// Transition: backend produced content.
+ /// `AwaitingResponse → Streaming`; other states unchanged.
+ pub fn backend_responded(&mut self) {
+ if let DispatchState::AwaitingResponse { count } = *self {
+ *self = DispatchState::Streaming {
+ dispatched_count: count,
+ };
+ }
+ }
+
+ /// Transition: stream ended. Resets to `Idle`.
+ pub fn stream_ended(&mut self) {
+ *self = DispatchState::Idle;
+ }
+}
+
/// A single chat session with Dave
pub struct ChatSession {
pub id: SessionId,
@@ -320,10 +365,8 @@ pub struct ChatSession {
/// Handle to the background task processing this session's AI requests.
/// Aborted on drop to clean up the subprocess.
pub task_handle: Option<tokio::task::JoinHandle<()>>,
- /// Number of trailing user messages that were dispatched in the current
- /// stream. Used by `append_token` to insert the assistant response
- /// after all dispatched messages but before any newly queued ones.
- pub dispatched_user_count: usize,
+ /// Tracks the dispatch lifecycle for redispatch and insert-position logic.
+ pub dispatch_state: DispatchState,
/// Cached status for the agent (derived from session state)
cached_status: AgentStatus,
/// Set when cached_status changes, cleared after publishing state event
@@ -368,7 +411,7 @@ impl ChatSession {
input: String::new(),
incoming_tokens: None,
task_handle: None,
- dispatched_user_count: 0,
+ dispatch_state: DispatchState::Idle,
cached_status: AgentStatus::Idle,
state_dirty: false,
focus_requested: false,
@@ -821,6 +864,13 @@ impl ChatSession {
self.incoming_tokens.is_some()
}
+ /// Whether a dispatch is active (message sent to backend, waiting for
+ /// or receiving response). This is more reliable than `is_streaming()`
+ /// because it covers the window between dispatch and first token arrival.
+ pub fn is_dispatched(&self) -> bool {
+ !matches!(self.dispatch_state, DispatchState::Idle)
+ }
+
/// Append a streaming token to the current assistant message.
///
/// If the last message is an Assistant, append there. Otherwise
@@ -834,6 +884,9 @@ impl ChatSession {
/// call → more text, the post-tool tokens must go into a NEW
/// Assistant so the tool call appears between the two text blocks.
pub fn append_token(&mut self, token: &str) {
+ // Content arrived — transition AwaitingResponse → Streaming.
+ self.dispatch_state.backend_responded();
+
// Fast path: last message is the active assistant response
if let Some(Message::Assistant(msg)) = self.chat.last_mut() {
msg.push_token(token);
@@ -872,7 +925,7 @@ impl ChatSession {
// Skip past the dispatched user messages (default 1 for
// single dispatch, more for batch redispatch)
- let skip = self.dispatched_user_count.max(1);
+ let skip = self.dispatch_state.dispatched_count().max(1);
let insert_pos = (trailing_start + skip).min(self.chat.len());
self.chat.insert(insert_pos, Message::Assistant(msg));
}
@@ -916,17 +969,47 @@ impl ChatSession {
}
/// Whether a newly arrived remote user message should be dispatched to
- /// the backend right now. Returns false if the session is already
- /// streaming — the message is already in chat and will be picked up
+ /// the backend right now. Returns false if a dispatch is already
+ /// active — the message is already in chat and will be picked up
/// when the current stream finishes.
pub fn should_dispatch_remote_message(&self) -> bool {
- !self.is_streaming() && self.has_pending_user_message()
+ !self.is_dispatched() && self.has_pending_user_message()
+ }
+
+ /// Mark the current trailing user messages as dispatched to the backend.
+ /// Call this when starting a new stream for this session.
+ pub fn mark_dispatched(&mut self) {
+ let count = self.trailing_user_count();
+ self.dispatch_state = DispatchState::AwaitingResponse { count };
+ }
+
+ /// Count trailing user messages at the end of the chat.
+ pub fn trailing_user_count(&self) -> usize {
+ self.chat
+ .iter()
+ .rev()
+ .take_while(|m| matches!(m, Message::User(_)))
+ .count()
}
/// Whether the session needs a re-dispatch after a stream ends.
/// This catches user messages that arrived while we were streaming.
+ ///
+ /// Uses `dispatch_state` to distinguish genuinely new messages from
+ /// messages that were already dispatched:
+ ///
+ /// - `Streaming`: backend responded, so any trailing user messages
+ /// are genuinely new (queued during the response).
+ /// - `AwaitingResponse`: backend returned empty. Only redispatch if
+ /// NEW messages arrived beyond what was dispatched (prevents the
+ /// infinite loop on empty responses).
+ /// - `Idle`: nothing to redispatch.
pub fn needs_redispatch_after_stream_end(&self) -> bool {
- !self.is_streaming() && self.has_pending_user_message()
+ match self.dispatch_state {
+ DispatchState::Streaming { .. } => self.has_pending_user_message(),
+ DispatchState::AwaitingResponse { count } => self.trailing_user_count() > count,
+ DispatchState::Idle => false,
+ }
}
/// If "Compact & Approve" has reached ReadyToProceed, consume the state,
@@ -980,9 +1063,8 @@ mod tests {
let mut session = test_session();
session.chat.push(Message::User("hello".into()));
- // Start streaming
- let (_tx, rx) = mpsc::channel::<DaveApiResponse>();
- session.incoming_tokens = Some(rx);
+ // Dispatch and start streaming
+ let _tx = make_streaming(&mut session);
// New user message arrives while streaming
session.chat.push(Message::User("another".into()));
@@ -994,16 +1076,14 @@ mod tests {
let mut session = test_session();
session.chat.push(Message::User("msg1".into()));
- // Start streaming
- let (tx, rx) = mpsc::channel::<DaveApiResponse>();
- session.incoming_tokens = Some(rx);
+ // Dispatch and start streaming
+ let tx = make_streaming(&mut session);
- // Assistant responds, then more user messages arrive
- session
- .chat
- .push(Message::Assistant(AssistantMessage::from_text(
- "response".into(),
- )));
+ // Assistant responds via append_token (transitions to Streaming)
+ session.append_token("response");
+ session.finalize_last_assistant();
+
+ // New user message arrives while stream is still open
session.chat.push(Message::User("msg2".into()));
// Stream ends
@@ -1018,14 +1098,12 @@ mod tests {
let mut session = test_session();
session.chat.push(Message::User("hello".into()));
- let (tx, rx) = mpsc::channel::<DaveApiResponse>();
- session.incoming_tokens = Some(rx);
+ // Dispatch and start streaming
+ let tx = make_streaming(&mut session);
- session
- .chat
- .push(Message::Assistant(AssistantMessage::from_text(
- "done".into(),
- )));
+ // Backend responds
+ session.append_token("done");
+ session.finalize_last_assistant();
drop(tx);
session.incoming_tokens = None;
@@ -1044,9 +1122,8 @@ mod tests {
session.chat.push(Message::User("msg1".into()));
assert!(session.should_dispatch_remote_message());
- // Backend starts streaming
- let (tx, rx) = mpsc::channel::<DaveApiResponse>();
- session.incoming_tokens = Some(rx);
+ // Dispatch and start streaming
+ let tx = make_streaming(&mut session);
// Messages arrive one per frame while streaming
session.chat.push(Message::User("msg2".into()));
@@ -1055,11 +1132,11 @@ mod tests {
session.chat.push(Message::User("msg3".into()));
assert!(!session.should_dispatch_remote_message());
- // Stream ends
+ // Stream ends (backend didn't produce content — e.g. connection dropped)
drop(tx);
session.incoming_tokens = None;
- // Should redispatch — there are unanswered user messages
+ // Should redispatch — new messages arrived beyond what was dispatched
assert!(session.needs_redispatch_after_stream_end());
}
@@ -1090,8 +1167,9 @@ mod tests {
fn tokens_after_queued_message_dont_bury_it() {
let mut session = test_session();
- // User sends initial message, assistant starts responding
+ // User sends initial message, dispatched and streaming starts
session.chat.push(Message::User("hello".into()));
+ let _tx = make_streaming(&mut session);
session.append_token("Sure, ");
session.append_token("I can ");
@@ -1122,6 +1200,7 @@ mod tests {
let mut session = test_session();
session.chat.push(Message::User("first".into()));
+ let _tx = make_streaming(&mut session);
session.append_token("response");
// Queue two messages
@@ -1202,7 +1281,7 @@ mod tests {
// User sends a new message, dispatched to Claude (single dispatch)
session.chat.push(Message::User("follow up".into()));
- session.dispatched_user_count = 1;
+ session.mark_dispatched();
// User queues another message BEFORE any tokens arrive
session.chat.push(Message::User("queued msg".into()));
@@ -1317,8 +1396,11 @@ mod tests {
// ---- status tests ----
- /// Helper to put a session into "streaming" state
+ /// Helper to put a session into "streaming" state.
+ /// Also calls `mark_dispatched()` to mirror what `send_user_message_for()`
+ /// does in real code — the trailing user messages are marked as dispatched.
fn make_streaming(session: &mut ChatSession) -> mpsc::Sender<DaveApiResponse> {
+ session.mark_dispatched();
let (tx, rx) = mpsc::channel::<DaveApiResponse>();
session.incoming_tokens = Some(rx);
tx
@@ -1367,10 +1449,9 @@ mod tests {
// Step 1: User sends first message, it gets dispatched (single)
session.chat.push(Message::User("hello".into()));
- session.dispatched_user_count = 1;
assert!(session.should_dispatch_remote_message());
- // Backend starts streaming
+ // Backend starts streaming (mark_dispatched called by make_streaming)
let tx = make_streaming(&mut session);
assert!(session.is_streaming());
assert!(!session.should_dispatch_remote_message());
@@ -1415,7 +1496,6 @@ mod tests {
assert_eq!(prompt, "also\ndo this\nand this");
// Step 5: Backend dispatches with the batch prompt (3 messages)
- session.dispatched_user_count = 3;
let _tx2 = make_streaming(&mut session);
// New tokens arrive — should create a new assistant after ALL
@@ -1455,7 +1535,6 @@ mod tests {
// Turn 1: single dispatch
session.chat.push(Message::User("first".into()));
- session.dispatched_user_count = 1;
let tx = make_streaming(&mut session);
session.append_token("response 1");
session.chat.push(Message::User("queued A".into()));
@@ -1466,7 +1545,6 @@ mod tests {
assert!(session.needs_redispatch_after_stream_end());
// Turn 2: batch redispatch handles both queued messages
- session.dispatched_user_count = 2;
let tx2 = make_streaming(&mut session);
session.append_token("response 2");
session.finalize_last_assistant();
@@ -1487,7 +1565,6 @@ mod tests {
let mut session = test_session();
session.chat.push(Message::User("hello".into()));
- session.dispatched_user_count = 1;
let tx = make_streaming(&mut session);
// Error arrives (no tokens were sent)
@@ -1505,6 +1582,30 @@ mod tests {
);
}
+ /// When the backend returns immediately with no content (e.g. a
+ /// skill command it can't handle), the dispatched user message is
+ /// still the last in chat. Without the trailing-count guard this
+ /// would trigger an infinite redispatch loop.
+ #[test]
+ fn empty_response_prevents_redispatch_loop() {
+ let mut session = test_session();
+
+ session
+ .chat
+ .push(Message::User("/refactor something".into()));
+ let tx = make_streaming(&mut session);
+
+ // Backend returns immediately — no tokens, no tools, nothing
+ session.finalize_last_assistant();
+ drop(tx);
+ session.incoming_tokens = None;
+
+ assert!(
+ !session.needs_redispatch_after_stream_end(),
+ "should not redispatch already-dispatched messages with empty response"
+ );
+ }
+
/// Verify chat ordering when queued messages arrive before any
/// tokens, and after tokens, across a full batch lifecycle.
#[test]
@@ -1518,7 +1619,6 @@ mod tests {
// User sends new message (single dispatch)
session.chat.push(Message::User("question".into()));
- session.dispatched_user_count = 1;
let tx = make_streaming(&mut session);
// Queued BEFORE first token
@@ -1573,7 +1673,7 @@ mod tests {
fn find_queued_indices(
chat: &[Message],
is_working: bool,
- dispatched_user_count: usize,
+ dispatch_state: DispatchState,
) -> Vec<usize> {
if !is_working {
return vec![];
@@ -1590,7 +1690,7 @@ mod tests {
}
Some(i) => {
let first_trailing = i + 1;
- let skip = dispatched_user_count.max(1);
+ let skip = dispatch_state.dispatched_count().max(1);
let queued_start = first_trailing + skip;
if queued_start < chat.len() {
Some(queued_start)
@@ -1624,8 +1724,12 @@ mod tests {
session.chat.push(Message::User("queued 1".into()));
session.chat.push(Message::User("queued 2".into()));
- // dispatched_user_count=1: single dispatch
- let queued = find_queued_indices(&session.chat, true, 1);
+ // Single dispatch
+ let queued = find_queued_indices(
+ &session.chat,
+ true,
+ DispatchState::AwaitingResponse { count: 1 },
+ );
let queued_texts: Vec<&str> = queued
.iter()
.map(|&i| match &session.chat[i] {
@@ -1650,9 +1754,13 @@ mod tests {
session.chat.push(Message::User("queued 1".into()));
session.chat.push(Message::User("queued 2".into()));
- // dispatched_user_count doesn't matter here — streaming
- // assistant branch doesn't use it
- let queued = find_queued_indices(&session.chat, true, 1);
+ // Dispatch state doesn't matter here — streaming assistant
+ // branch doesn't use the dispatched count
+ let queued = find_queued_indices(
+ &session.chat,
+ true,
+ DispatchState::AwaitingResponse { count: 1 },
+ );
let queued_texts: Vec<&str> = queued
.iter()
.map(|&i| match &session.chat[i] {
@@ -1674,7 +1782,7 @@ mod tests {
session.chat.push(Message::User("msg 1".into()));
session.chat.push(Message::User("msg 2".into()));
- let queued = find_queued_indices(&session.chat, false, 0);
+ let queued = find_queued_indices(&session.chat, false, DispatchState::Idle);
assert!(
queued.is_empty(),
"nothing should be queued when not working"
@@ -1692,7 +1800,11 @@ mod tests {
)));
session.chat.push(Message::User("only one".into()));
- let queued = find_queued_indices(&session.chat, true, 1);
+ let queued = find_queued_indices(
+ &session.chat,
+ true,
+ DispatchState::AwaitingResponse { count: 1 },
+ );
assert!(
queued.is_empty(),
"single dispatched message should not be queued"
@@ -1720,7 +1832,11 @@ mod tests {
session.append_token("Found it.");
session.chat.push(Message::User("queued".into()));
- let queued = find_queued_indices(&session.chat, true, 1);
+ let queued = find_queued_indices(
+ &session.chat,
+ true,
+ DispatchState::AwaitingResponse { count: 1 },
+ );
let queued_texts: Vec<&str> = queued
.iter()
.map(|&i| match &session.chat[i] {
@@ -1746,7 +1862,11 @@ mod tests {
session.chat.push(Message::User("c".into()));
// All 3 were batch-dispatched
- let queued = find_queued_indices(&session.chat, true, 3);
+ let queued = find_queued_indices(
+ &session.chat,
+ true,
+ DispatchState::AwaitingResponse { count: 3 },
+ );
assert!(
queued.is_empty(),
"all 3 messages were dispatched — none should show queued"
@@ -1769,7 +1889,11 @@ mod tests {
session.chat.push(Message::User("new queued".into()));
// 3 were dispatched, 1 new arrival
- let queued = find_queued_indices(&session.chat, true, 3);
+ let queued = find_queued_indices(
+ &session.chat,
+ true,
+ DispatchState::AwaitingResponse { count: 3 },
+ );
let queued_texts: Vec<&str> = queued
.iter()
.map(|&i| match &session.chat[i] {
diff --git a/crates/notedeck_dave/src/ui/dave.rs b/crates/notedeck_dave/src/ui/dave.rs
@@ -68,9 +68,8 @@ pub struct DaveUi<'a> {
usage: Option<&'a crate::messages::UsageInfo>,
/// Context window size for the current model
context_window: u64,
- /// Number of trailing user messages dispatched in the current stream.
- /// Used by the queued indicator to skip dispatched messages.
- dispatched_user_count: usize,
+ /// Dispatch lifecycle state, used for queued indicator logic.
+ dispatch_state: crate::session::DispatchState,
/// Which backend this session uses
backend_type: BackendType,
}
@@ -185,7 +184,7 @@ impl<'a> DaveUi<'a> {
status_dot_color: None,
usage: None,
context_window: crate::messages::context_window_for_model(None),
- dispatched_user_count: 0,
+ dispatch_state: crate::session::DispatchState::default(),
backend_type: BackendType::Remote,
}
}
@@ -225,8 +224,8 @@ impl<'a> DaveUi<'a> {
self
}
- pub fn dispatched_user_count(mut self, count: usize) -> Self {
- self.dispatched_user_count = count;
+ pub fn dispatch_state(mut self, state: crate::session::DispatchState) -> Self {
+ self.dispatch_state = state;
self
}
@@ -438,7 +437,7 @@ impl<'a> DaveUi<'a> {
// When streaming, append_token inserts an Assistant between the
// dispatched User and any queued Users, so all trailing Users
// after that Assistant are queued. Before the first token arrives
- // there's no Assistant yet, so we skip `dispatched_user_count`
+ // there's no Assistant yet, so we skip the dispatched count
// trailing Users (they were all sent in the prompt).
let queued_from = if self.flags.contains(DaveUiFlags::IsWorking) {
let last_non_user = self
@@ -459,7 +458,7 @@ impl<'a> DaveUi<'a> {
// No streaming assistant yet — skip past the dispatched
// user messages (1 for single dispatch, N for batch)
let first_trailing = i + 1;
- let skip = self.dispatched_user_count.max(1);
+ let skip = self.dispatch_state.dispatched_count().max(1);
let queued_start = first_trailing + skip;
if queued_start < self.chat.len() {
Some(queued_start)
diff --git a/crates/notedeck_dave/src/ui/mod.rs b/crates/notedeck_dave/src/ui/mod.rs
@@ -65,7 +65,7 @@ fn build_dave_ui<'a>(
.plan_mode_active(plan_mode_active)
.auto_steal_focus(auto_steal_focus)
.is_remote(is_remote)
- .dispatched_user_count(session.dispatched_user_count)
+ .dispatch_state(session.dispatch_state)
.details(&session.details)
.backend_type(session.backend_type);