commit 50da963f76f5862c70b23c20a63b9575ed820f87
parent 684241f62a02cf36480fadbd5674b88403506f16
Author: William Casarin <jb55@jb55.com>
Date: Wed, 25 Feb 2026 12:18:44 -0800
enostr: return negentropy sync stats
Diffstat:
2 files changed, 71 insertions(+), 24 deletions(-)
diff --git a/crates/enostr/src/negentropy.rs b/crates/enostr/src/negentropy.rs
@@ -19,6 +19,8 @@
//! self.neg_sync.process(neg_events, ctx.ndb, ctx.pool, &filter, &relay_url);
//! ```
+use std::collections::HashSet;
+
use crate::{ClientMessage, RelayPool};
use negentropy::{Id, Negentropy, NegentropyStorageVector};
use nostrdb::{Filter, Ndb, Transaction};
@@ -26,6 +28,17 @@ use nostrdb::{Filter, Ndb, Transaction};
/// Maximum number of event IDs to request in a single REQ.
const FETCH_BATCH_SIZE: usize = 100;
+/// Result of a single [`NegentropySync::process`] call.
+#[derive(Debug, Default)]
+pub struct SyncResult {
+ /// Genuinely new events fetched from the relay this round.
+ pub new_events: usize,
+ /// Events the relay reported as missing but we already tried to
+ /// fetch in a previous round. These are unfetchable (filter
+ /// mismatch, failed validation, etc.) and will not be retried.
+ pub skipped: usize,
+}
+
#[derive(Debug, PartialEq, Eq)]
enum SyncState {
Idle,
@@ -96,6 +109,11 @@ pub struct NegentropySync {
/// IDs sent via REQ that we're waiting to see ingested into ndb
/// before starting the next round.
pending_fetch_ids: Vec<[u8; 32]>,
+ /// IDs fetched in the previous round. Used to detect events that
+ /// the relay considers missing but that we can't reconcile locally
+ /// (e.g. filter mismatch, failed validation). If the same IDs
+ /// appear as missing again, we skip them to avoid an infinite loop.
+ last_fetched_ids: HashSet<[u8; 32]>,
}
impl NegentropySync {
@@ -107,6 +125,7 @@ impl NegentropySync {
sync_requested: false,
need_ids: Vec::new(),
pending_fetch_ids: Vec::new(),
+ last_fetched_ids: HashSet::new(),
}
}
@@ -125,8 +144,8 @@ impl NegentropySync {
/// initiating sync, multi-round reconciliation, fetching missing
/// events, error recovery, and periodic re-sync.
///
- /// Returns the number of missing events fetched this call, so
- /// the caller can decide whether to re-trigger another round.
+ /// Returns per-round fetch stats so the caller can decide whether
+ /// to re-trigger another reconciliation round.
pub fn process(
&mut self,
events: Vec<NegEvent>,
@@ -134,8 +153,8 @@ impl NegentropySync {
pool: &mut RelayPool,
filter: &Filter,
relay_url: &str,
- ) -> usize {
- let mut fetched = 0;
+ ) -> SyncResult {
+ let mut result = SyncResult::default();
for event in events {
match event {
@@ -146,7 +165,9 @@ impl NegentropySync {
if self.sub_id.as_deref() != Some(&sub_id) {
continue;
}
- fetched += self.handle_msg(&payload, pool, relay_url);
+ let r = self.handle_msg(&payload, pool, relay_url);
+ result.new_events += r.new_events;
+ result.skipped += r.skipped;
}
NegEvent::Err { sub_id, reason } => {
if self.sub_id.as_deref() != Some(&sub_id) {
@@ -171,7 +192,7 @@ impl NegentropySync {
self.pending_fetch_ids.clear();
} else {
// Events not yet ingested — wait for next frame
- return fetched;
+ return result;
}
}
}
@@ -185,7 +206,7 @@ impl NegentropySync {
}
}
- fetched
+ result
}
fn initiate(&mut self, ndb: &Ndb, filter: &Filter) -> Option<String> {
@@ -230,15 +251,14 @@ impl NegentropySync {
Some(msg)
}
- /// Handle a NEG-MSG from the relay. Returns the number of missing
- /// events fetched (0 while still reconciling, >0 when complete and
- /// events were fetched).
- fn handle_msg(&mut self, msg_hex: &str, pool: &mut RelayPool, relay_url: &str) -> usize {
+ /// Handle a NEG-MSG from the relay and return per-round fetch stats.
+ fn handle_msg(&mut self, msg_hex: &str, pool: &mut RelayPool, relay_url: &str) -> SyncResult {
+ let zero = SyncResult::default();
let neg = match self.neg.as_mut() {
Some(n) => n,
None => {
tracing::warn!("negentropy: received msg with no active session");
- return 0;
+ return zero;
}
};
@@ -247,7 +267,7 @@ impl NegentropySync {
Err(e) => {
tracing::warn!("negentropy hex decode: {e}");
self.reset_after_error();
- return 0;
+ return zero;
}
};
@@ -262,13 +282,13 @@ impl NegentropySync {
let sub_id = self.sub_id.as_ref().unwrap();
let msg = format!(r#"["NEG-MSG","{}","{}"]"#, sub_id, next_hex);
pool.send_to(&ClientMessage::Raw(msg), relay_url);
- 0
+ zero
}
Ok(None) => {
// Reconciliation complete
self.need_ids
.extend(need_ids.iter().map(|id| id.to_bytes()));
- let missing = std::mem::take(&mut self.need_ids);
+ let mut missing = std::mem::take(&mut self.need_ids);
// Send NEG-CLOSE
if let Some(sub_id) = &self.sub_id {
@@ -279,18 +299,43 @@ impl NegentropySync {
self.state = SyncState::Idle;
self.neg = None;
- let count = missing.len();
- if count > 0 {
- tracing::info!("negentropy: fetching {} missing events", count);
+ // Filter out events we already fetched last round. If
+ // the relay still reports them as missing it means they
+ // don't match our local filter (wrong kind/author,
+ // failed validation, etc.) and re-fetching won't help.
+ let skipped = if !self.last_fetched_ids.is_empty() {
+ let before = missing.len();
+ missing.retain(|id| !self.last_fetched_ids.contains(id));
+ let skipped = before - missing.len();
+ if skipped > 0 {
+ tracing::info!(
+ "negentropy: skipping {} events already fetched last round",
+ skipped
+ );
+ }
+ skipped
+ } else {
+ 0
+ };
+
+ let new_events = missing.len();
+ if new_events > 0 {
+ tracing::info!("negentropy: fetching {} missing events", new_events);
Self::fetch_missing(&missing, pool, relay_url);
- self.pending_fetch_ids = missing;
+ self.pending_fetch_ids = missing.clone();
+ self.last_fetched_ids = missing.into_iter().collect();
+ } else {
+ self.last_fetched_ids.clear();
+ }
+ SyncResult {
+ new_events,
+ skipped,
}
- count
}
Err(e) => {
tracing::warn!("negentropy reconcile: {e}");
self.reset_after_error();
- 0
+ zero
}
}
}
@@ -302,6 +347,7 @@ impl NegentropySync {
self.neg = None;
self.need_ids.clear();
self.pending_fetch_ids.clear();
+ self.last_fetched_ids.clear();
}
fn fetch_missing(ids: &[[u8; 32]], pool: &mut RelayPool, relay_url: &str) {
diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs
@@ -2529,14 +2529,15 @@ impl notedeck::App for Dave {
// If events were found and we haven't hit the round limit,
// trigger another sync to pull more recent data.
- if fetched > 0 {
+ if fetched.new_events > 0 {
self.neg_sync_round += 1;
if self.neg_sync_round < MAX_NEG_SYNC_ROUNDS {
tracing::info!(
- "negentropy: scheduling round {}/{} (got {} events)",
+ "negentropy: scheduling round {}/{} (got {} new, {} skipped)",
self.neg_sync_round + 1,
MAX_NEG_SYNC_ROUNDS,
- fetched
+ fetched.new_events,
+ fetched.skipped
);
self.neg_sync.trigger_now();
} else {