notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 9cce5fdcd4337655cf03775e5f49967f26506fa1
parent a036e02fc5fa90ad30140be20dac0f48c4301ba0
Author: William Casarin <jb55@jb55.com>
Date:   Wed, 25 Feb 2026 12:14:45 -0800

negentropy: fix infinite sync loop with dedup and richer results

Remove limit from negentropy filter so both sides compare the full
event set, preventing set-boundary disagreement that caused the same
event to be re-fetched every round. Track previously fetched IDs in a
HashSet to skip events the relay reports as missing but that we cannot
reconcile locally (filter mismatch, validation failure, etc.). Return
a SyncResult { new_events, skipped } instead of a bare usize so the
caller can distinguish genuinely new events from unfetchable ones.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Diffstat:
Mcrates/enostr/src/negentropy.rs | 87+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------
Mcrates/notedeck_dave/src/lib.rs | 14++++++++------
2 files changed, 73 insertions(+), 28 deletions(-)

diff --git a/crates/enostr/src/negentropy.rs b/crates/enostr/src/negentropy.rs @@ -19,6 +19,8 @@ //! self.neg_sync.process(neg_events, ctx.ndb, ctx.pool, &filter, &relay_url); //! ``` +use std::collections::HashSet; + use crate::{ClientMessage, RelayPool}; use negentropy::{Id, Negentropy, NegentropyStorageVector}; use nostrdb::{Filter, Ndb, Transaction}; @@ -26,6 +28,17 @@ use nostrdb::{Filter, Ndb, Transaction}; /// Maximum number of event IDs to request in a single REQ. const FETCH_BATCH_SIZE: usize = 100; +/// Result of a single [`NegentropySync::process`] call. +#[derive(Debug, Default)] +pub struct SyncResult { + /// Genuinely new events fetched from the relay this round. + pub new_events: usize, + /// Events the relay reported as missing but we already tried to + /// fetch in a previous round. These are unfetchable (filter + /// mismatch, failed validation, etc.) and will not be retried. + pub skipped: usize, +} + #[derive(Debug, PartialEq, Eq)] enum SyncState { Idle, @@ -96,6 +109,11 @@ pub struct NegentropySync { /// IDs sent via REQ that we're waiting to see ingested into ndb /// before starting the next round. pending_fetch_ids: Vec<[u8; 32]>, + /// IDs fetched in the previous round. Used to detect events that + /// the relay considers missing but that we can't reconcile locally + /// (e.g. filter mismatch, failed validation). If the same IDs + /// appear as missing again, we skip them to avoid an infinite loop. + last_fetched_ids: HashSet<[u8; 32]>, } impl NegentropySync { @@ -107,6 +125,7 @@ impl NegentropySync { sync_requested: false, need_ids: Vec::new(), pending_fetch_ids: Vec::new(), + last_fetched_ids: HashSet::new(), } } @@ -124,9 +143,6 @@ impl NegentropySync { /// the relay event loop. Handles the full protocol lifecycle: /// initiating sync, multi-round reconciliation, fetching missing /// events, error recovery, and periodic re-sync. - /// - /// Returns the number of missing events fetched this call, so - /// the caller can decide whether to re-trigger another round. pub fn process( &mut self, events: Vec<NegEvent>, @@ -134,8 +150,8 @@ impl NegentropySync { pool: &mut RelayPool, filter: &Filter, relay_url: &str, - ) -> usize { - let mut fetched = 0; + ) -> SyncResult { + let mut result = SyncResult::default(); for event in events { match event { @@ -146,7 +162,9 @@ impl NegentropySync { if self.sub_id.as_deref() != Some(&sub_id) { continue; } - fetched += self.handle_msg(&payload, pool, relay_url); + let r = self.handle_msg(&payload, pool, relay_url); + result.new_events += r.new_events; + result.skipped += r.skipped; } NegEvent::Err { sub_id, reason } => { if self.sub_id.as_deref() != Some(&sub_id) { @@ -171,7 +189,7 @@ impl NegentropySync { self.pending_fetch_ids.clear(); } else { // Events not yet ingested — wait for next frame - return fetched; + return result; } } } @@ -185,7 +203,7 @@ impl NegentropySync { } } - fetched + result } fn initiate(&mut self, ndb: &Ndb, filter: &Filter) -> Option<String> { @@ -230,15 +248,14 @@ impl NegentropySync { Some(msg) } - /// Handle a NEG-MSG from the relay. Returns the number of missing - /// events fetched (0 while still reconciling, >0 when complete and - /// events were fetched). - fn handle_msg(&mut self, msg_hex: &str, pool: &mut RelayPool, relay_url: &str) -> usize { + fn handle_msg(&mut self, msg_hex: &str, pool: &mut RelayPool, relay_url: &str) -> SyncResult { + let zero = SyncResult::default(); + let neg = match self.neg.as_mut() { Some(n) => n, None => { tracing::warn!("negentropy: received msg with no active session"); - return 0; + return zero; } }; @@ -247,7 +264,7 @@ impl NegentropySync { Err(e) => { tracing::warn!("negentropy hex decode: {e}"); self.reset_after_error(); - return 0; + return zero; } }; @@ -262,13 +279,13 @@ impl NegentropySync { let sub_id = self.sub_id.as_ref().unwrap(); let msg = format!(r#"["NEG-MSG","{}","{}"]"#, sub_id, next_hex); pool.send_to(&ClientMessage::Raw(msg), relay_url); - 0 + zero } Ok(None) => { // Reconciliation complete self.need_ids .extend(need_ids.iter().map(|id| id.to_bytes())); - let missing = std::mem::take(&mut self.need_ids); + let mut missing = std::mem::take(&mut self.need_ids); // Send NEG-CLOSE if let Some(sub_id) = &self.sub_id { @@ -279,18 +296,43 @@ impl NegentropySync { self.state = SyncState::Idle; self.neg = None; - let count = missing.len(); - if count > 0 { - tracing::info!("negentropy: fetching {} missing events", count); + // Filter out events we already fetched last round. If + // the relay still reports them as missing it means they + // don't match our local filter (wrong kind/author, + // failed validation, etc.) and re-fetching won't help. + let skipped = if !self.last_fetched_ids.is_empty() { + let before = missing.len(); + missing.retain(|id| !self.last_fetched_ids.contains(id)); + let skipped = before - missing.len(); + if skipped > 0 { + tracing::info!( + "negentropy: skipping {} events already fetched last round", + skipped + ); + } + skipped + } else { + 0 + }; + + let new_events = missing.len(); + if new_events > 0 { + tracing::info!("negentropy: fetching {} missing events", new_events); Self::fetch_missing(&missing, pool, relay_url); - self.pending_fetch_ids = missing; + self.pending_fetch_ids = missing.clone(); + self.last_fetched_ids = missing.into_iter().collect(); + } else { + self.last_fetched_ids.clear(); + } + SyncResult { + new_events, + skipped, } - count } Err(e) => { tracing::warn!("negentropy reconcile: {e}"); self.reset_after_error(); - 0 + zero } } } @@ -302,6 +344,7 @@ impl NegentropySync { self.neg = None; self.need_ids.clear(); self.pending_fetch_ids.clear(); + self.last_fetched_ids.clear(); } fn fetch_missing(ids: &[[u8; 32]], pool: &mut RelayPool, relay_url: &str) { diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs @@ -2521,22 +2521,19 @@ impl notedeck::App for Dave { let filter = nostrdb::Filter::new() .kinds([enostr::pns::PNS_KIND as u64]) .authors([pns_keys.keypair.pubkey.bytes()]) - .limit(500) .build(); - let fetched = + let result = self.neg_sync .process(neg_events, ctx.ndb, ctx.pool, &filter, &self.pns_relay_url); - // If events were found and we haven't hit the round limit, - // trigger another sync to pull more recent data. - if fetched > 0 { + if result.new_events > 0 { self.neg_sync_round += 1; if self.neg_sync_round < MAX_NEG_SYNC_ROUNDS { tracing::info!( "negentropy: scheduling round {}/{} (got {} events)", self.neg_sync_round + 1, MAX_NEG_SYNC_ROUNDS, - fetched + result.new_events ); self.neg_sync.trigger_now(); } else { @@ -2545,6 +2542,11 @@ impl notedeck::App for Dave { MAX_NEG_SYNC_ROUNDS ); } + } else if result.skipped > 0 { + tracing::info!( + "negentropy: relay has {} events we can't reconcile, stopping", + result.skipped + ); } }