notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit f4abb1b87bb7437070b62052245a9da0811b21fb
parent 3938f43d8b2c560057055a357c596e4218234da1
Author: William Casarin <jb55@jb55.com>
Date:   Mon, 23 Feb 2026 11:29:10 -0800

neg: add NIP-77 negentropy sync with multi-round fetching

Add negentropy set reconciliation (NIP-77) for syncing PNS-wrapped
events from relays. Since strfry limits responses to 500 events per
request, the sync automatically re-triggers up to 5 rounds when
missing events are found, pulling enough recent data to cover
active sessions without downloading the entire history.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Diffstat:
MCargo.lock | 2++
MCargo.toml | 1+
Mcrates/enostr/Cargo.toml | 6++++--
Mcrates/enostr/src/lib.rs | 1+
Acrates/enostr/src/negentropy.rs | 352+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/enostr/src/relay/message.rs | 14++++++++++----
Mcrates/notedeck_dave/src/lib.rs | 108++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
Mcrates/notedeck_dave/src/session_loader.rs | 41+++++++++++++++++++++++++++++++++++++++++
8 files changed, 496 insertions(+), 29 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -2118,6 +2118,7 @@ dependencies = [ "hex", "hkdf", "mio", + "negentropy 0.5.0", "nostr 0.37.0", "nostrdb", "serde", @@ -2129,6 +2130,7 @@ dependencies = [ "tokio", "tracing", "url", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml @@ -60,6 +60,7 @@ fluent-resmgr = "0.0.8" fluent-langneg = "0.13" glam = { version = "0.30", features = ["bytemuck"] } hex = { version = "0.4.3", features = ["serde"] } +negentropy = { version = "0.5", default-features = false, features = ["std"] } image = { version = "0.25", features = ["jpeg", "png", "webp"] } indexmap = "2.6.0" log = "0.4.17" diff --git a/crates/enostr/Cargo.toml b/crates/enostr/Cargo.toml @@ -23,4 +23,6 @@ tokenator = { workspace = true } hashbrown = { workspace = true } hkdf = { workspace = true } sha2 = { workspace = true } -base64 = { workspace = true } -\ No newline at end of file +base64 = { workspace = true } +negentropy = { workspace = true } +uuid = { workspace = true } +\ No newline at end of file diff --git a/crates/enostr/src/lib.rs b/crates/enostr/src/lib.rs @@ -2,6 +2,7 @@ mod client; mod error; mod filter; mod keypair; +pub mod negentropy; mod note; pub mod pns; mod profile; diff --git a/crates/enostr/src/negentropy.rs b/crates/enostr/src/negentropy.rs @@ -0,0 +1,352 @@ +//! NIP-77 negentropy set reconciliation for relay event syncing. +//! +//! Provides a [`NegentropySync`] state machine that any app can use to +//! discover and fetch missing events from a relay. The caller owns the +//! relay pool and ndb — this module just drives the protocol. +//! +//! # Usage +//! +//! ```ignore +//! // In your update loop's relay event callback, collect negentropy events: +//! let mut neg_events = Vec::new(); +//! try_process_events_core(ctx, ui.ctx(), |app_ctx, ev| { +//! if ev.relay == my_relay { +//! neg_events.extend(NegEvent::from_relay(&ev.event)); +//! } +//! }); +//! +//! // Then process everything in one call: +//! self.neg_sync.process(neg_events, ctx.ndb, ctx.pool, &filter, &relay_url); +//! ``` + +use crate::{ClientMessage, RelayPool}; +use negentropy::{Id, Negentropy, NegentropyStorageVector}; +use nostrdb::{Filter, Ndb, Transaction}; + +/// Maximum number of event IDs to request in a single REQ. +const FETCH_BATCH_SIZE: usize = 100; + +#[derive(Debug, PartialEq, Eq)] +enum SyncState { + Idle, + Reconciling, +} + +/// A negentropy-relevant event extracted from a raw relay message. +/// +/// Apps collect these inside their relay event callback, then pass +/// them to [`NegentropySync::process`]. +pub enum NegEvent { + /// A NEG-MSG response from the relay. + Msg { sub_id: String, payload: String }, + /// A NEG-ERR response from the relay. + Err { sub_id: String, reason: String }, + /// The relay (re)connected — triggers an immediate sync. + RelayOpened, +} + +impl NegEvent { + /// Try to extract a negentropy event from a raw websocket event. + /// + /// Returns `None` if the message isn't a negentropy protocol message. + /// Relay open events should be pushed separately by the app. + pub fn from_relay(ws: &ewebsock::WsEvent) -> Option<Self> { + let text = match ws { + ewebsock::WsEvent::Message(ewebsock::WsMessage::Text(t)) => t, + _ => return None, + }; + + if text.starts_with("[\"NEG-MSG\"") { + let v: serde_json::Value = serde_json::from_str(text).ok()?; + let arr = v.as_array()?; + if arr.len() >= 3 && arr[0].as_str()? == "NEG-MSG" { + return Some(NegEvent::Msg { + sub_id: arr[1].as_str()?.to_string(), + payload: arr[2].as_str()?.to_string(), + }); + } + } else if text.starts_with("[\"NEG-ERR\"") { + let v: serde_json::Value = serde_json::from_str(text).ok()?; + let arr = v.as_array()?; + if arr.len() >= 3 && arr[0].as_str()? == "NEG-ERR" { + return Some(NegEvent::Err { + sub_id: arr[1].as_str()?.to_string(), + reason: arr[2].as_str()?.to_string(), + }); + } + } + + None + } +} + +/// NIP-77 negentropy reconciliation state machine. +/// +/// Compares the client's local event set against a relay and fetches +/// any missing events. Generic over event kinds — the caller provides +/// the filter. +pub struct NegentropySync { + state: SyncState, + sub_id: Option<String>, + neg: Option<Negentropy<'static, NegentropyStorageVector>>, + /// Whether a sync has been requested (startup, reconnect, or re-sync after fetch). + sync_requested: bool, + /// IDs accumulated across multi-round reconciliation. + need_ids: Vec<[u8; 32]>, +} + +impl NegentropySync { + pub fn new() -> Self { + Self { + state: SyncState::Idle, + sub_id: None, + neg: None, + sync_requested: false, + need_ids: Vec::new(), + } + } + + /// Request a sync on the next `process()` call. + /// + /// Call this on startup and reconnect. Also called internally + /// after fetching missing events to verify catch-up is complete. + pub fn trigger_now(&mut self) { + self.sync_requested = true; + } + + /// Process collected relay events and run periodic sync. + /// + /// Call this once per frame after collecting [`NegEvent`]s from + /// the relay event loop. Handles the full protocol lifecycle: + /// initiating sync, multi-round reconciliation, fetching missing + /// events, error recovery, and periodic re-sync. + /// + /// Returns the number of missing events fetched this call, so + /// the caller can decide whether to re-trigger another round. + pub fn process( + &mut self, + events: Vec<NegEvent>, + ndb: &Ndb, + pool: &mut RelayPool, + filter: &Filter, + relay_url: &str, + ) -> usize { + let mut fetched = 0; + + for event in events { + match event { + NegEvent::RelayOpened => { + self.trigger_now(); + } + NegEvent::Msg { sub_id, payload } => { + if self.sub_id.as_deref() != Some(&sub_id) { + continue; + } + fetched += self.handle_msg(&payload, pool, relay_url); + } + NegEvent::Err { sub_id, reason } => { + if self.sub_id.as_deref() != Some(&sub_id) { + continue; + } + tracing::warn!("negentropy NEG-ERR: {reason}"); + self.reset_after_error(); + } + } + } + + // Initiate sync if requested and idle + if self.sync_requested && self.state == SyncState::Idle { + self.sync_requested = false; + if let Some(open_msg) = self.initiate(ndb, filter) { + pool.send_to(&ClientMessage::Raw(open_msg), relay_url); + tracing::info!("negentropy: initiated sync"); + } + } + + fetched + } + + fn initiate(&mut self, ndb: &Ndb, filter: &Filter) -> Option<String> { + let txn = Transaction::new(ndb).ok()?; + + let mut storage = NegentropyStorageVector::new(); + let result = ndb.fold( + &txn, + std::slice::from_ref(filter), + &mut storage, + |storage, note| { + let created_at = note.created_at(); + let id = Id::from_byte_array(*note.id()); + let _ = storage.insert(created_at, id); + storage + }, + ); + + if result.is_err() { + return None; + } + + storage.seal().ok()?; + + let mut neg = Negentropy::owned(storage, 0).ok()?; + let init_msg = neg.initiate().ok()?; + let init_hex = hex::encode(&init_msg); + + let filter_json = filter.json().ok()?; + let sub_id = uuid::Uuid::new_v4().to_string(); + + let msg = format!( + r#"["NEG-OPEN","{}",{},"{}"]"#, + sub_id, filter_json, init_hex + ); + + self.neg = Some(neg); + self.sub_id = Some(sub_id); + self.state = SyncState::Reconciling; + self.need_ids.clear(); + + Some(msg) + } + + /// Handle a NEG-MSG from the relay. Returns the number of missing + /// events fetched (0 while still reconciling, >0 when complete and + /// events were fetched). + fn handle_msg(&mut self, msg_hex: &str, pool: &mut RelayPool, relay_url: &str) -> usize { + let neg = match self.neg.as_mut() { + Some(n) => n, + None => { + tracing::warn!("negentropy: received msg with no active session"); + return 0; + } + }; + + let msg_bytes = match hex::decode(msg_hex) { + Ok(b) => b, + Err(e) => { + tracing::warn!("negentropy hex decode: {e}"); + self.reset_after_error(); + return 0; + } + }; + + let mut have_ids = Vec::new(); + let mut need_ids = Vec::new(); + + match neg.reconcile_with_ids(&msg_bytes, &mut have_ids, &mut need_ids) { + Ok(Some(next_msg)) => { + self.need_ids + .extend(need_ids.iter().map(|id| id.to_bytes())); + let next_hex = hex::encode(&next_msg); + let sub_id = self.sub_id.as_ref().unwrap(); + let msg = format!(r#"["NEG-MSG","{}","{}"]"#, sub_id, next_hex); + pool.send_to(&ClientMessage::Raw(msg), relay_url); + 0 + } + Ok(None) => { + // Reconciliation complete + self.need_ids + .extend(need_ids.iter().map(|id| id.to_bytes())); + let missing = std::mem::take(&mut self.need_ids); + + // Send NEG-CLOSE + if let Some(sub_id) = &self.sub_id { + let close = format!(r#"["NEG-CLOSE","{}"]"#, sub_id); + pool.send_to(&ClientMessage::Raw(close), relay_url); + } + + self.state = SyncState::Idle; + self.neg = None; + + let count = missing.len(); + if count > 0 { + tracing::info!("negentropy: fetching {} missing events", count); + Self::fetch_missing(&missing, pool, relay_url); + } + count + } + Err(e) => { + tracing::warn!("negentropy reconcile: {e}"); + self.reset_after_error(); + 0 + } + } + } + + fn reset_after_error(&mut self) { + self.state = SyncState::Idle; + self.sync_requested = false; + self.sub_id = None; + self.neg = None; + self.need_ids.clear(); + } + + fn fetch_missing(ids: &[[u8; 32]], pool: &mut RelayPool, relay_url: &str) { + for chunk in ids.chunks(FETCH_BATCH_SIZE) { + let sub_id = uuid::Uuid::new_v4().to_string(); + let filter = Filter::new().ids(chunk.iter()).build(); + let req = ClientMessage::req(sub_id, vec![filter]); + pool.send_to(&req, relay_url); + } + } +} + +impl Default for NegentropySync { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_neg_event_from_relay_msg() { + let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text( + r#"["NEG-MSG","abc123","deadbeef"]"#.to_string(), + )); + match NegEvent::from_relay(&ws).unwrap() { + NegEvent::Msg { sub_id, payload } => { + assert_eq!(sub_id, "abc123"); + assert_eq!(payload, "deadbeef"); + } + _ => panic!("expected Msg"), + } + } + + #[test] + fn test_neg_event_from_relay_err() { + let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text( + r#"["NEG-ERR","abc123","RESULTS_TOO_BIG"]"#.to_string(), + )); + match NegEvent::from_relay(&ws).unwrap() { + NegEvent::Err { sub_id, reason } => { + assert_eq!(sub_id, "abc123"); + assert_eq!(reason, "RESULTS_TOO_BIG"); + } + _ => panic!("expected Err"), + } + } + + #[test] + fn test_neg_event_ignores_other() { + let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text( + r#"["EVENT","sub","{}"]"#.to_string(), + )); + assert!(NegEvent::from_relay(&ws).is_none()); + } + + #[test] + fn test_no_sync_by_default() { + let sync = NegentropySync::new(); + assert!(!sync.sync_requested); + } + + #[test] + fn test_trigger_now() { + let mut sync = NegentropySync::new(); + assert!(!sync.sync_requested); + sync.trigger_now(); + assert!(sync.sync_requested); + } +} diff --git a/crates/enostr/src/relay/message.rs b/crates/enostr/src/relay/message.rs @@ -43,10 +43,16 @@ impl<'a> From<&'a WsEvent> for RelayEvent<'a> { impl<'a> From<&'a WsMessage> for RelayEvent<'a> { fn from(wsmsg: &'a WsMessage) -> RelayEvent<'a> { match wsmsg { - WsMessage::Text(s) => match RelayMessage::from_json(s).map(RelayEvent::Message) { - Ok(msg) => msg, - Err(err) => RelayEvent::Error(err), - }, + WsMessage::Text(s) => { + // NIP-77 negentropy messages are handled separately via NegEvent + if s.starts_with("[\"NEG-") { + return RelayEvent::Other(wsmsg); + } + match RelayMessage::from_json(s).map(RelayEvent::Message) { + Ok(msg) => msg, + Err(err) => RelayEvent::Error(err), + } + } wsmsg => RelayEvent::Other(wsmsg), } } diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs @@ -66,6 +66,11 @@ pub use vec3::Vec3; /// Default relay URL used for PNS event publishing and subscription. const DEFAULT_PNS_RELAY: &str = "ws://relay.jb55.com/"; +/// Maximum consecutive negentropy sync rounds before stopping. +/// Each round pulls up to the relay's limit (typically 500 events), +/// so 5 rounds fetches up to ~2500 recent events. +const MAX_NEG_SYNC_ROUNDS: u8 = 5; + /// Normalize a relay URL to always have a trailing slash. fn normalize_relay_url(url: String) -> String { if url.ends_with('/') { @@ -161,6 +166,12 @@ pub struct Dave { hostname: String, /// PNS relay URL (configurable via DAVE_RELAY env or settings UI). pns_relay_url: String, + /// Negentropy sync state for PNS event reconciliation. + neg_sync: enostr::negentropy::NegentropySync, + /// How many consecutive negentropy sync rounds have completed. + /// Reset on startup/reconnect, incremented each time events are found. + /// Caps at [`MAX_NEG_SYNC_ROUNDS`] to avoid pulling the entire history. + neg_sync_round: u8, /// Persists DaveSettings to dave_settings.json settings_serializer: TimedSerializer<DaveSettings>, } @@ -420,6 +431,8 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr pending_summaries: Vec::new(), hostname, pns_relay_url, + neg_sync: enostr::negentropy::NegentropySync::new(), + neg_sync_round: 0, settings_serializer, } } @@ -1633,31 +1646,28 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr continue; } - let title = session_events::get_tag_value(&note, "title") - .unwrap_or("Untitled") - .to_string(); - let cwd_str = session_events::get_tag_value(&note, "cwd").unwrap_or(""); - let cwd = std::path::PathBuf::from(cwd_str); - let hostname = session_events::get_tag_value(&note, "hostname") - .unwrap_or("") - .to_string(); - let home_dir = session_events::get_tag_value(&note, "home_dir") - .unwrap_or("") - .to_string(); + // Look up the latest revision of this session. PNS wrapping + // causes old revisions (including pre-deletion) to arrive from + // the relay. Only create a session if the latest revision is valid. + let Some(state) = session_loader::latest_valid_session(ctx.ndb, &txn, claude_sid) + else { + continue; + }; tracing::info!( "discovered new session from relay: '{}' ({}) on {}", - title, + state.title, claude_sid, - hostname, + state.hostname, ); existing_ids.insert(claude_sid.to_string()); + let cwd = std::path::PathBuf::from(&state.cwd); let dave_sid = self.session_manager.new_resumed_session( cwd, claude_sid.to_string(), - title.clone(), + state.title.clone(), AiMode::Agentic, ); @@ -1665,9 +1675,9 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr let loaded = session_loader::load_session_messages(ctx.ndb, &txn, claude_sid); if let Some(session) = self.session_manager.get_mut(dave_sid) { - session.details.hostname = hostname; - if !home_dir.is_empty() { - session.details.home_dir = home_dir; + session.details.hostname = state.hostname.clone(); + if !state.home_dir.is_empty() { + session.details.home_dir = state.home_dir.clone(); } if !loaded.messages.is_empty() { tracing::info!( @@ -1678,7 +1688,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr } // Determine if this is a remote session - let cwd_path = std::path::PathBuf::from(cwd_str); + let cwd_path = std::path::PathBuf::from(&state.cwd); if !cwd_path.exists() { session.source = session::SessionSource::Remote; } @@ -1694,8 +1704,8 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr ); agentic.seen_note_ids = loaded.note_ids; // Set remote status - agentic.remote_status = AgentStatus::from_status_str(status_str); - agentic.remote_status_ts = note.created_at(); + agentic.remote_status = AgentStatus::from_status_str(&state.status); + agentic.remote_status_ts = state.created_at; // Set up live conversation subscription so we can // receive messages from remote clients (e.g. phone) @@ -1710,7 +1720,7 @@ You are an AI agent for the nostr protocol called Dave, created by Damus. nostr agentic.live_conversation_sub = Some(sub); tracing::info!( "subscribed for live conversation events for session '{}'", - &title, + &state.title, ); } Err(e) => { @@ -2201,9 +2211,11 @@ impl notedeck::App for Dave { // Re-send PNS subscription when the relay (re)connects. let pns_sub_id = self.pns_relay_sub.clone(); let pns_relay = self.pns_relay_url.clone(); + let mut neg_events: Vec<enostr::negentropy::NegEvent> = Vec::new(); try_process_events_core(ctx, ui.ctx(), |app_ctx, ev| { - if let enostr::RelayEvent::Opened = (&ev.event).into() { - if ev.relay == pns_relay { + if ev.relay == pns_relay { + if let enostr::RelayEvent::Opened = (&ev.event).into() { + neg_events.push(enostr::negentropy::NegEvent::RelayOpened); if let Some(sub_id) = &pns_sub_id { if let Some(sk) = app_ctx.accounts.get_selected_account().keypair().secret_key @@ -2212,6 +2224,7 @@ impl notedeck::App for Dave { let pns_filter = nostrdb::Filter::new() .kinds([enostr::pns::PNS_KIND as u64]) .authors([pns_keys.keypair.pubkey.bytes()]) + .limit(500) .build(); let req = enostr::ClientMessage::req(sub_id.clone(), vec![pns_filter]); app_ctx.pool.send_to(&req, &pns_relay); @@ -2219,9 +2232,53 @@ impl notedeck::App for Dave { } } } + + neg_events.extend(enostr::negentropy::NegEvent::from_relay(&ev.event)); } }); + // Reset round counter on relay reconnect so we do a fresh burst + if neg_events + .iter() + .any(|e| matches!(e, enostr::negentropy::NegEvent::RelayOpened)) + { + self.neg_sync_round = 0; + } + + // Negentropy sync: reconcile local events against PNS relay, + // fetch any missing kind-1080 events via standard REQ. + if let Some(sk) = ctx.accounts.get_selected_account().keypair().secret_key { + let pns_keys = enostr::pns::derive_pns_keys(&sk.secret_bytes()); + let filter = nostrdb::Filter::new() + .kinds([enostr::pns::PNS_KIND as u64]) + .authors([pns_keys.keypair.pubkey.bytes()]) + .limit(500) + .build(); + let fetched = + self.neg_sync + .process(neg_events, ctx.ndb, ctx.pool, &filter, &self.pns_relay_url); + + // If events were found and we haven't hit the round limit, + // trigger another sync to pull more recent data. + if fetched > 0 { + self.neg_sync_round += 1; + if self.neg_sync_round < MAX_NEG_SYNC_ROUNDS { + tracing::info!( + "negentropy: scheduling round {}/{} (got {} events)", + self.neg_sync_round + 1, + MAX_NEG_SYNC_ROUNDS, + fetched + ); + self.neg_sync.trigger_now(); + } else { + tracing::info!( + "negentropy: reached max rounds ({}), stopping", + MAX_NEG_SYNC_ROUNDS + ); + } + } + } + // Poll for external spawn-agent commands via IPC self.poll_ipc_commands(); @@ -2251,6 +2308,10 @@ impl notedeck::App for Dave { self.restore_sessions_from_ndb(ctx); + // Trigger initial negentropy sync after startup + self.neg_sync.trigger_now(); + self.neg_sync_round = 0; + // Subscribe to PNS events on relays for session discovery from other devices. // Also subscribe locally in ndb for kind-31988 session state events // so we detect new sessions appearing after PNS unwrapping. @@ -2268,6 +2329,7 @@ impl notedeck::App for Dave { let pns_filter = nostrdb::Filter::new() .kinds([enostr::pns::PNS_KIND as u64]) .authors([pns_keys.keypair.pubkey.bytes()]) + .limit(500) .build(); let sub_id = uuid::Uuid::new_v4().to_string(); let req = enostr::ClientMessage::req(sub_id.clone(), vec![pns_filter]); diff --git a/crates/notedeck_dave/src/session_loader.rs b/crates/notedeck_dave/src/session_loader.rs @@ -296,6 +296,47 @@ pub fn load_session_states(ndb: &Ndb, txn: &Transaction) -> Vec<SessionState> { states } +/// Look up the latest valid revision of a single session by d-tag. +/// +/// PNS wrapping causes relays to store all revisions of replaceable +/// events. This queries for the latest revision and returns it only +/// if it's non-deleted and in the current format. +pub fn latest_valid_session( + ndb: &Ndb, + txn: &Transaction, + session_id: &str, +) -> Option<SessionState> { + use crate::session_events::AI_SESSION_STATE_KIND; + + let filter = Filter::new() + .kinds([AI_SESSION_STATE_KIND as u64]) + .tags([session_id], 'd') + .limit(1) + .build(); + + let results = ndb.query(txn, &[filter], 1).ok()?; + let note = &results.first()?.note; + + if get_tag_value(note, "status") == Some("deleted") { + return None; + } + if note.content().starts_with('{') { + return None; + } + + Some(SessionState { + claude_session_id: session_id.to_string(), + title: get_tag_value(note, "title") + .unwrap_or("Untitled") + .to_string(), + cwd: get_tag_value(note, "cwd").unwrap_or("").to_string(), + status: get_tag_value(note, "status").unwrap_or("idle").to_string(), + hostname: get_tag_value(note, "hostname").unwrap_or("").to_string(), + home_dir: get_tag_value(note, "home_dir").unwrap_or("").to_string(), + created_at: note.created_at(), + }) +} + fn truncate(s: &str, max_chars: usize) -> String { if s.chars().count() <= max_chars { s.to_string()