commit eda663dd610d4c498bab5c2476d4f67b83cae302
parent 4ddff4beac18aa10f3be78d1372ac77596511120
Author: William Casarin <jb55@jb55.com>
Date: Tue, 17 Feb 2026 10:54:25 -0800
add live conversation polling and dedup for remote sessions
Subscribe to kind-1988 events for remote sessions and poll for new
messages in the update loop. Dedup via seen_note_ids HashSet seeded
from initial session restore. Also update remote_status when new
kind-31988 events arrive for existing sessions.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat:
3 files changed, 234 insertions(+), 4 deletions(-)
diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs
@@ -1213,6 +1213,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
if !cwd.exists() {
session.source = session::SessionSource::Remote;
}
+ let is_remote = session.is_remote();
if let Some(agentic) = &mut session.agentic {
if let (Some(root), Some(last)) =
@@ -1220,12 +1221,36 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
{
agentic.live_threading.seed(root, last, loaded.event_count);
}
- // Load permission state from events
+ // Load permission state and dedup set from events
agentic.responded_perm_ids = loaded.responded_perm_ids;
agentic.perm_request_note_ids.extend(loaded.perm_request_note_ids);
+ agentic.seen_note_ids = loaded.note_ids;
// Set remote status from state event
agentic.remote_status =
AgentStatus::from_status_str(&state.status);
+
+ // Set up live conversation subscription for remote sessions
+ if is_remote && agentic.live_conversation_sub.is_none() {
+ let conv_filter = nostrdb::Filter::new()
+ .kinds([session_events::AI_CONVERSATION_KIND as u64])
+ .tags([state.claude_session_id.as_str()], 'd')
+ .build();
+ match ctx.ndb.subscribe(&[conv_filter]) {
+ Ok(sub) => {
+ agentic.live_conversation_sub = Some(sub);
+ tracing::info!(
+ "subscribed for live conversation events for remote session '{}'",
+ state.title,
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ "failed to subscribe for conversation events: {:?}",
+ e,
+ );
+ }
+ }
+ }
}
}
}
@@ -1278,8 +1303,19 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
continue;
};
- // Skip sessions we already know about
+ // Update remote_status for existing remote sessions
if existing_ids.contains(claude_sid) {
+ let status_str = json["status"].as_str().unwrap_or("idle");
+ let new_status = AgentStatus::from_status_str(status_str);
+ for session in self.session_manager.iter_mut() {
+ if session.is_remote() {
+ if let Some(agentic) = &mut session.agentic {
+ if agentic.event_session_id() == Some(claude_sid) {
+ agentic.remote_status = new_status;
+ }
+ }
+ }
+ }
continue;
}
@@ -1296,7 +1332,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
let dave_sid = self.session_manager.new_resumed_session(
cwd,
claude_sid.to_string(),
- title,
+ title.clone(),
AiMode::Agentic,
);
@@ -1321,6 +1357,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
if !cwd_path.exists() {
session.source = session::SessionSource::Remote;
}
+ let is_remote = session.is_remote();
if let Some(agentic) = &mut session.agentic {
if let (Some(root), Some(last)) =
@@ -1328,12 +1365,36 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
{
agentic.live_threading.seed(root, last, loaded.event_count);
}
- // Load permission state
+ // Load permission state and dedup set
agentic.responded_perm_ids = loaded.responded_perm_ids;
agentic.perm_request_note_ids.extend(loaded.perm_request_note_ids);
+ agentic.seen_note_ids = loaded.note_ids;
// Set remote status
let status_str = json["status"].as_str().unwrap_or("idle");
agentic.remote_status = AgentStatus::from_status_str(status_str);
+
+ // Set up live conversation subscription for remote sessions
+ if is_remote && agentic.live_conversation_sub.is_none() {
+ let conv_filter = nostrdb::Filter::new()
+ .kinds([session_events::AI_CONVERSATION_KIND as u64])
+ .tags([claude_sid], 'd')
+ .build();
+ match ctx.ndb.subscribe(&[conv_filter]) {
+ Ok(sub) => {
+ agentic.live_conversation_sub = Some(sub);
+ tracing::info!(
+ "subscribed for live conversation events for remote session '{}'",
+ &title,
+ );
+ }
+ Err(e) => {
+ tracing::warn!(
+ "failed to subscribe for conversation events: {:?}",
+ e,
+ );
+ }
+ }
+ }
}
}
@@ -1344,6 +1405,162 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr
}
}
+ /// Poll for new kind-1988 conversation events on remote sessions.
+ ///
+ /// Remote sessions subscribe to conversation events via ndb. When new
+ /// events arrive (from PNS unwrapping), this converts them to Messages
+ /// and appends them to the chat, keeping the phone UI in sync with the
+ /// desktop's conversation.
+ fn poll_remote_conversation_events(&mut self, ndb: &nostrdb::Ndb) {
+ let session_ids = self.session_manager.session_ids();
+ for session_id in session_ids {
+ let Some(session) = self.session_manager.get_mut(session_id) else {
+ continue;
+ };
+ // Only remote sessions need to poll for conversation events
+ if !session.is_remote() {
+ continue;
+ }
+ let Some(agentic) = &mut session.agentic else {
+ continue;
+ };
+ let Some(sub) = agentic.live_conversation_sub else {
+ continue;
+ };
+
+ let note_keys = ndb.poll_for_notes(sub, 128);
+ if note_keys.is_empty() {
+ continue;
+ }
+
+ let txn = match Transaction::new(ndb) {
+ Ok(txn) => txn,
+ Err(_) => continue,
+ };
+
+ // Collect and sort by created_at to process in order
+ let mut notes: Vec<_> = note_keys
+ .iter()
+ .filter_map(|key| ndb.get_note_by_key(&txn, *key).ok())
+ .collect();
+ notes.sort_by_key(|n| n.created_at());
+
+ for note in ¬es {
+ // Skip events we've already processed (dedup)
+ let note_id = *note.id();
+ if !agentic.seen_note_ids.insert(note_id) {
+ continue;
+ }
+
+ let content = note.content();
+ let role = session_events::get_tag_value(note, "role");
+
+ match role {
+ Some("user") => {
+ session
+ .chat
+ .push(Message::User(content.to_string()));
+ }
+ Some("assistant") => {
+ session.chat.push(Message::Assistant(
+ crate::messages::AssistantMessage::from_text(
+ content.to_string(),
+ ),
+ ));
+ }
+ Some("tool_call") => {
+ session.chat.push(Message::Assistant(
+ crate::messages::AssistantMessage::from_text(
+ content.to_string(),
+ ),
+ ));
+ }
+ Some("tool_result") => {
+ let summary = if content.chars().count() > 100 {
+ let truncated: String = content.chars().take(100).collect();
+ format!("{}...", truncated)
+ } else {
+ content.to_string()
+ };
+ session.chat.push(Message::ToolResult(
+ crate::messages::ToolResult {
+ tool_name: "tool".to_string(),
+ summary,
+ },
+ ));
+ }
+ Some("permission_request") => {
+ if let Ok(content_json) =
+ serde_json::from_str::<serde_json::Value>(content)
+ {
+ let tool_name = content_json["tool_name"]
+ .as_str()
+ .unwrap_or("unknown")
+ .to_string();
+ let tool_input = content_json
+ .get("tool_input")
+ .cloned()
+ .unwrap_or(serde_json::Value::Null);
+ let perm_id =
+ session_events::get_tag_value(note, "perm-id")
+ .and_then(|s| uuid::Uuid::parse_str(s).ok())
+ .unwrap_or_else(uuid::Uuid::new_v4);
+
+ // Check if we already responded
+ let response =
+ if agentic.responded_perm_ids.contains(&perm_id) {
+ Some(
+ crate::messages::PermissionResponseType::Allowed,
+ )
+ } else {
+ None
+ };
+
+ // Store the note ID for linking responses
+ agentic
+ .perm_request_note_ids
+ .insert(perm_id, *note.id());
+
+ session.chat.push(Message::PermissionRequest(
+ crate::messages::PermissionRequest {
+ id: perm_id,
+ tool_name,
+ tool_input,
+ response,
+ answer_summary: None,
+ cached_plan: None,
+ },
+ ));
+ }
+ }
+ Some("permission_response") => {
+ // Track that this permission was responded to
+ if let Some(perm_id_str) =
+ session_events::get_tag_value(note, "perm-id")
+ {
+ if let Ok(perm_id) = uuid::Uuid::parse_str(perm_id_str) {
+ agentic.responded_perm_ids.insert(perm_id);
+ // Update the matching PermissionRequest in chat
+ for msg in session.chat.iter_mut() {
+ if let Message::PermissionRequest(req) = msg {
+ if req.id == perm_id && req.response.is_none() {
+ req.response = Some(
+ crate::messages::PermissionResponseType::Allowed,
+ );
+ }
+ }
+ }
+ }
+ }
+ }
+ _ => {
+ // Skip progress, queue-operation, etc.
+ }
+ }
+ }
+ }
+ }
+
/// Delete a session and clean up backend resources
fn delete_session(&mut self, id: SessionId) {
update::delete_session(
@@ -1645,6 +1862,9 @@ impl notedeck::App for Dave {
// Poll for new session states from PNS-unwrapped relay events
self.poll_session_state_events(ctx);
+ // Poll for live conversation events on remote sessions
+ self.poll_remote_conversation_events(ctx.ndb);
+
// Process pending archive conversion (JSONL → nostr events)
if let Some((file_path, dave_sid, claude_sid)) = self.pending_archive_convert.take() {
// Check if events already exist for this session in ndb
diff --git a/crates/notedeck_dave/src/session.rs b/crates/notedeck_dave/src/session.rs
@@ -82,6 +82,10 @@ pub struct AgenticSessionData {
pub live_conversation_sub: Option<nostrdb::Subscription>,
/// Set of perm-id UUIDs that we (the remote/phone) have already responded to.
pub responded_perm_ids: HashSet<Uuid>,
+ /// Note IDs we've already processed from live conversation polling.
+ /// Prevents duplicate messages when events are loaded during restore
+ /// and then appear again via the subscription.
+ pub seen_note_ids: HashSet<[u8; 32]>,
}
impl AgenticSessionData {
@@ -114,6 +118,7 @@ impl AgenticSessionData {
remote_status: None,
live_conversation_sub: None,
responded_perm_ids: HashSet::new(),
+ seen_note_ids: HashSet::new(),
}
}
diff --git a/crates/notedeck_dave/src/session_loader.rs b/crates/notedeck_dave/src/session_loader.rs
@@ -24,6 +24,8 @@ pub struct LoadedSession {
/// Map of perm_id -> note_id for permission request events.
/// Used by remote sessions to link responses back to requests.
pub perm_request_note_ids: std::collections::HashMap<uuid::Uuid, [u8; 32]>,
+ /// All note IDs found, for seeding dedup in live polling.
+ pub note_ids: HashSet<[u8; 32]>,
}
/// Load conversation messages from ndb for a given session ID.
@@ -48,6 +50,7 @@ pub fn load_session_messages(ndb: &Ndb, txn: &Transaction, session_id: &str) ->
event_count: 0,
responded_perm_ids: HashSet::new(),
perm_request_note_ids: std::collections::HashMap::new(),
+ note_ids: HashSet::new(),
}
}
};
@@ -62,6 +65,7 @@ pub fn load_session_messages(ndb: &Ndb, txn: &Transaction, session_id: &str) ->
notes.sort_by_key(|note| note.created_at());
let event_count = notes.len() as u32;
+ let note_ids: HashSet<[u8; 32]> = notes.iter().map(|n| *n.id()).collect();
// Find the first conversation note (skip metadata like queue-operation)
// so the threading root is a real message.
@@ -169,6 +173,7 @@ pub fn load_session_messages(ndb: &Ndb, txn: &Transaction, session_id: &str) ->
event_count,
responded_perm_ids,
perm_request_note_ids,
+ note_ids,
}
}