notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit afc47672462d58f41a435fd4d9b9eb0db62cb878
parent ceedaade05439222e3146496e154619a5ca28d5d
Author: William Casarin <jb55@jb55.com>
Date:   Wed, 18 Feb 2026 09:48:48 -0800

fix remote messages clobbering in-flight streams

When multiple user messages arrived from a remote client (phone) while
the backend was already streaming a response, each new message would
call send_user_message_for and overwrite the active stream. This caused
responses to lag behind by one message.

Now we guard dispatch: skip if the session is already streaming (the
message is already in chat), and re-dispatch when the stream ends if
there are unanswered user messages.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Diffstat:
Mcrates/notedeck_dave/src/lib.rs | 18+++++++++++++++++-
Mcrates/notedeck_dave/src/session.rs | 134+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 151 insertions(+), 1 deletion(-)

diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs @@ -697,6 +697,13 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr session.task_handle = None; // Don't restore incoming_tokens - leave it None + + // If chat ends with a user message, there's an + // unanswered remote message that arrived while we + // were streaming. Queue it for dispatch. + if session.needs_redispatch_after_stream_end() { + needs_send.insert(session_id); + } } } _ => { @@ -2003,9 +2010,18 @@ impl notedeck::App for Dave { // Poll for live conversation events on all sessions. // Returns user messages from remote clients that need backend dispatch. + // Only dispatch if the session isn't already streaming a response — + // the message is already in chat, so it will be included when the + // current stream finishes and we re-dispatch. let remote_user_msgs = self.poll_remote_conversation_events(ctx.ndb); for (sid, _msg) in remote_user_msgs { - self.send_user_message_for(sid, ctx, ui.ctx()); + let should_dispatch = self + .session_manager + .get(sid) + .is_some_and(|s| s.should_dispatch_remote_message()); + if should_dispatch { + self.send_user_message_for(sid, ctx, ui.ctx()); + } } // Process pending archive conversion (JSONL → nostr events) diff --git a/crates/notedeck_dave/src/session.rs b/crates/notedeck_dave/src/session.rs @@ -574,3 +574,137 @@ impl SessionManager { self.order.clone() } } + +impl ChatSession { + /// Whether the session is actively streaming a response from the backend. + pub fn is_streaming(&self) -> bool { + self.incoming_tokens.is_some() + } + + /// Whether the session has an unanswered user message at the end of the + /// chat that needs to be dispatched to the backend. + pub fn has_pending_user_message(&self) -> bool { + matches!(self.chat.last(), Some(Message::User(_))) + } + + /// Whether a newly arrived remote user message should be dispatched to + /// the backend right now. Returns false if the session is already + /// streaming — the message is already in chat and will be picked up + /// when the current stream finishes. + pub fn should_dispatch_remote_message(&self) -> bool { + !self.is_streaming() && self.has_pending_user_message() + } + + /// Whether the session needs a re-dispatch after a stream ends. + /// This catches user messages that arrived while we were streaming. + pub fn needs_redispatch_after_stream_end(&self) -> bool { + !self.is_streaming() && self.has_pending_user_message() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::config::AiMode; + use crate::messages::AssistantMessage; + use std::sync::mpsc; + + fn test_session() -> ChatSession { + ChatSession::new(1, PathBuf::from("/tmp"), AiMode::Agentic) + } + + #[test] + fn dispatch_when_idle_with_user_message() { + let mut session = test_session(); + session.chat.push(Message::User("hello".into())); + assert!(session.should_dispatch_remote_message()); + } + + #[test] + fn no_dispatch_while_streaming() { + let mut session = test_session(); + session.chat.push(Message::User("hello".into())); + + // Start streaming + let (_tx, rx) = mpsc::channel::<DaveApiResponse>(); + session.incoming_tokens = Some(rx); + + // New user message arrives while streaming + session.chat.push(Message::User("another".into())); + assert!(!session.should_dispatch_remote_message()); + } + + #[test] + fn redispatch_after_stream_ends_with_pending_user_message() { + let mut session = test_session(); + session.chat.push(Message::User("msg1".into())); + + // Start streaming + let (tx, rx) = mpsc::channel::<DaveApiResponse>(); + session.incoming_tokens = Some(rx); + + // Assistant responds, then more user messages arrive + session + .chat + .push(Message::Assistant(AssistantMessage::from_text( + "response".into(), + ))); + session.chat.push(Message::User("msg2".into())); + + // Stream ends + drop(tx); + session.incoming_tokens = None; + + assert!(session.needs_redispatch_after_stream_end()); + } + + #[test] + fn no_redispatch_when_assistant_is_last() { + let mut session = test_session(); + session.chat.push(Message::User("hello".into())); + + let (tx, rx) = mpsc::channel::<DaveApiResponse>(); + session.incoming_tokens = Some(rx); + + session + .chat + .push(Message::Assistant(AssistantMessage::from_text( + "done".into(), + ))); + + drop(tx); + session.incoming_tokens = None; + + assert!(!session.needs_redispatch_after_stream_end()); + } + + /// The key bug scenario: multiple remote messages arrive across frames + /// while streaming. None should trigger dispatch. After stream ends, + /// the last pending message should trigger redispatch. + #[test] + fn multiple_remote_messages_while_streaming() { + let mut session = test_session(); + + // First message — dispatched normally + session.chat.push(Message::User("msg1".into())); + assert!(session.should_dispatch_remote_message()); + + // Backend starts streaming + let (tx, rx) = mpsc::channel::<DaveApiResponse>(); + session.incoming_tokens = Some(rx); + + // Messages arrive one per frame while streaming + session.chat.push(Message::User("msg2".into())); + assert!(!session.should_dispatch_remote_message()); + + session.chat.push(Message::User("msg3".into())); + assert!(!session.should_dispatch_remote_message()); + + // Stream ends + drop(tx); + session.incoming_tokens = None; + + // Should redispatch — there are unanswered user messages + assert!(session.needs_redispatch_after_stream_end()); + } +}