notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

negentropy.rs (14779B)


      1 //! NIP-77 negentropy set reconciliation for relay event syncing.
      2 //!
      3 //! Provides a [`NegentropySync`] state machine that any app can use to
      4 //! discover and fetch missing events from a relay. The caller owns the
      5 //! relay pool and ndb — this module just drives the protocol.
      6 //!
      7 //! # Usage
      8 //!
      9 //! ```ignore
     10 //! // In your update loop's relay event callback, collect negentropy events:
     11 //! let mut neg_events = Vec::new();
     12 //! try_process_events_core(ctx, ui.ctx(), |app_ctx, ev| {
     13 //!     if ev.relay == my_relay {
     14 //!         neg_events.extend(NegEvent::from_relay(&ev.event));
     15 //!     }
     16 //! });
     17 //!
     18 //! // Then process everything in one call:
     19 //! self.neg_sync.process(neg_events, ctx.ndb, ctx.pool, &filter, &relay_url);
     20 //! ```
     21 
     22 use std::collections::HashSet;
     23 
     24 use crate::{ClientMessage, RelayPool};
     25 use negentropy::{Id, Negentropy, NegentropyStorageVector};
     26 use nostrdb::{Filter, Ndb, Transaction};
     27 
     28 /// Maximum number of event IDs to request in a single REQ.
     29 const FETCH_BATCH_SIZE: usize = 100;
     30 
     31 /// Result of a single [`NegentropySync::process`] call.
     32 #[derive(Debug, Default)]
     33 pub struct SyncResult {
     34     /// Genuinely new events fetched from the relay this round.
     35     pub new_events: usize,
     36     /// Events the relay reported as missing but we already tried to
     37     /// fetch in a previous round. These are unfetchable (filter
     38     /// mismatch, failed validation, etc.) and will not be retried.
     39     pub skipped: usize,
     40 }
     41 
     42 #[derive(Debug, PartialEq, Eq)]
     43 enum SyncState {
     44     Idle,
     45     Reconciling,
     46 }
     47 
     48 /// A negentropy-relevant event extracted from a raw relay message.
     49 ///
     50 /// Apps collect these inside their relay event callback, then pass
     51 /// them to [`NegentropySync::process`].
     52 pub enum NegEvent {
     53     /// A NEG-MSG response from the relay.
     54     Msg { sub_id: String, payload: String },
     55     /// A NEG-ERR response from the relay.
     56     Err { sub_id: String, reason: String },
     57     /// The relay (re)connected — triggers an immediate sync.
     58     RelayOpened,
     59 }
     60 
     61 impl NegEvent {
     62     /// Try to extract a negentropy event from a raw websocket event.
     63     ///
     64     /// Returns `None` if the message isn't a negentropy protocol message.
     65     /// Relay open events should be pushed separately by the app.
     66     pub fn from_relay(ws: &ewebsock::WsEvent) -> Option<Self> {
     67         let text = match ws {
     68             ewebsock::WsEvent::Message(ewebsock::WsMessage::Text(t)) => t,
     69             _ => return None,
     70         };
     71 
     72         if text.starts_with("[\"NEG-MSG\"") {
     73             let v: serde_json::Value = serde_json::from_str(text).ok()?;
     74             let arr = v.as_array()?;
     75             if arr.len() >= 3 && arr[0].as_str()? == "NEG-MSG" {
     76                 return Some(NegEvent::Msg {
     77                     sub_id: arr[1].as_str()?.to_string(),
     78                     payload: arr[2].as_str()?.to_string(),
     79                 });
     80             }
     81         } else if text.starts_with("[\"NEG-ERR\"") {
     82             let v: serde_json::Value = serde_json::from_str(text).ok()?;
     83             let arr = v.as_array()?;
     84             if arr.len() >= 3 && arr[0].as_str()? == "NEG-ERR" {
     85                 return Some(NegEvent::Err {
     86                     sub_id: arr[1].as_str()?.to_string(),
     87                     reason: arr[2].as_str()?.to_string(),
     88                 });
     89             }
     90         }
     91 
     92         None
     93     }
     94 }
     95 
     96 /// NIP-77 negentropy reconciliation state machine.
     97 ///
     98 /// Compares the client's local event set against a relay and fetches
     99 /// any missing events. Generic over event kinds — the caller provides
    100 /// the filter.
    101 pub struct NegentropySync {
    102     state: SyncState,
    103     sub_id: Option<String>,
    104     neg: Option<Negentropy<'static, NegentropyStorageVector>>,
    105     /// Whether a sync has been requested (startup, reconnect, or re-sync after fetch).
    106     sync_requested: bool,
    107     /// IDs accumulated across multi-round reconciliation.
    108     need_ids: Vec<[u8; 32]>,
    109     /// IDs sent via REQ that we're waiting to see ingested into ndb
    110     /// before starting the next round.
    111     pending_fetch_ids: Vec<[u8; 32]>,
    112     /// IDs fetched in the previous round. Used to detect events that
    113     /// the relay considers missing but that we can't reconcile locally
    114     /// (e.g. filter mismatch, failed validation). If the same IDs
    115     /// appear as missing again, we skip them to avoid an infinite loop.
    116     last_fetched_ids: HashSet<[u8; 32]>,
    117 }
    118 
    119 impl NegentropySync {
    120     pub fn new() -> Self {
    121         Self {
    122             state: SyncState::Idle,
    123             sub_id: None,
    124             neg: None,
    125             sync_requested: false,
    126             need_ids: Vec::new(),
    127             pending_fetch_ids: Vec::new(),
    128             last_fetched_ids: HashSet::new(),
    129         }
    130     }
    131 
    132     /// Request a sync on the next `process()` call.
    133     ///
    134     /// Call this on startup and reconnect. Also called internally
    135     /// after fetching missing events to verify catch-up is complete.
    136     pub fn trigger_now(&mut self) {
    137         self.sync_requested = true;
    138     }
    139 
    140     /// Process collected relay events and run periodic sync.
    141     ///
    142     /// Call this once per frame after collecting [`NegEvent`]s from
    143     /// the relay event loop. Handles the full protocol lifecycle:
    144     /// initiating sync, multi-round reconciliation, fetching missing
    145     /// events, error recovery, and periodic re-sync.
    146     ///
    147     /// Returns per-round fetch stats so the caller can decide whether
    148     /// to re-trigger another reconciliation round.
    149     pub fn process(
    150         &mut self,
    151         events: Vec<NegEvent>,
    152         ndb: &Ndb,
    153         pool: &mut RelayPool,
    154         filter: &Filter,
    155         relay_url: &str,
    156     ) -> SyncResult {
    157         let mut result = SyncResult::default();
    158 
    159         for event in events {
    160             match event {
    161                 NegEvent::RelayOpened => {
    162                     self.trigger_now();
    163                 }
    164                 NegEvent::Msg { sub_id, payload } => {
    165                     if self.sub_id.as_deref() != Some(&sub_id) {
    166                         continue;
    167                     }
    168                     let r = self.handle_msg(&payload, pool, relay_url);
    169                     result.new_events += r.new_events;
    170                     result.skipped += r.skipped;
    171                 }
    172                 NegEvent::Err { sub_id, reason } => {
    173                     if self.sub_id.as_deref() != Some(&sub_id) {
    174                         continue;
    175                     }
    176                     tracing::warn!("negentropy NEG-ERR: {reason}");
    177                     self.reset_after_error();
    178                 }
    179             }
    180         }
    181 
    182         // Wait for previously-fetched events to be ingested into ndb
    183         // before starting the next round. Without this, the next round
    184         // starts before the REQ responses arrive, causing the same
    185         // events to be identified as missing every round.
    186         if self.sync_requested && !self.pending_fetch_ids.is_empty() {
    187             if let Ok(txn) = Transaction::new(ndb) {
    188                 if ndb.get_note_by_id(&txn, &self.pending_fetch_ids[0]).is_ok() {
    189                     tracing::info!(
    190                         "negentropy: fetched events ingested, proceeding with next round"
    191                     );
    192                     self.pending_fetch_ids.clear();
    193                 } else {
    194                     // Events not yet ingested — wait for next frame
    195                     return result;
    196                 }
    197             }
    198         }
    199 
    200         // Initiate sync if requested and idle
    201         if self.sync_requested && self.state == SyncState::Idle {
    202             self.sync_requested = false;
    203             if let Some(open_msg) = self.initiate(ndb, filter) {
    204                 pool.send_to(&ClientMessage::Raw(open_msg), relay_url);
    205                 tracing::info!("negentropy: initiated sync");
    206             }
    207         }
    208 
    209         result
    210     }
    211 
    212     fn initiate(&mut self, ndb: &Ndb, filter: &Filter) -> Option<String> {
    213         let txn = Transaction::new(ndb).ok()?;
    214 
    215         let mut storage = NegentropyStorageVector::new();
    216         let result = ndb.fold(
    217             &txn,
    218             std::slice::from_ref(filter),
    219             &mut storage,
    220             |storage, note| {
    221                 let created_at = note.created_at();
    222                 let id = Id::from_byte_array(*note.id());
    223                 let _ = storage.insert(created_at, id);
    224                 storage
    225             },
    226         );
    227 
    228         if result.is_err() {
    229             return None;
    230         }
    231 
    232         storage.seal().ok()?;
    233 
    234         let mut neg = Negentropy::owned(storage, 0).ok()?;
    235         let init_msg = neg.initiate().ok()?;
    236         let init_hex = hex::encode(&init_msg);
    237 
    238         let filter_json = filter.json().ok()?;
    239         let sub_id = uuid::Uuid::new_v4().to_string();
    240 
    241         let msg = format!(
    242             r#"["NEG-OPEN","{}",{},"{}"]"#,
    243             sub_id, filter_json, init_hex
    244         );
    245 
    246         self.neg = Some(neg);
    247         self.sub_id = Some(sub_id);
    248         self.state = SyncState::Reconciling;
    249         self.need_ids.clear();
    250 
    251         Some(msg)
    252     }
    253 
    254     /// Handle a NEG-MSG from the relay and return per-round fetch stats.
    255     fn handle_msg(&mut self, msg_hex: &str, pool: &mut RelayPool, relay_url: &str) -> SyncResult {
    256         let zero = SyncResult::default();
    257         let neg = match self.neg.as_mut() {
    258             Some(n) => n,
    259             None => {
    260                 tracing::warn!("negentropy: received msg with no active session");
    261                 return zero;
    262             }
    263         };
    264 
    265         let msg_bytes = match hex::decode(msg_hex) {
    266             Ok(b) => b,
    267             Err(e) => {
    268                 tracing::warn!("negentropy hex decode: {e}");
    269                 self.reset_after_error();
    270                 return zero;
    271             }
    272         };
    273 
    274         let mut have_ids = Vec::new();
    275         let mut need_ids = Vec::new();
    276 
    277         match neg.reconcile_with_ids(&msg_bytes, &mut have_ids, &mut need_ids) {
    278             Ok(Some(next_msg)) => {
    279                 self.need_ids
    280                     .extend(need_ids.iter().map(|id| id.to_bytes()));
    281                 let next_hex = hex::encode(&next_msg);
    282                 let sub_id = self.sub_id.as_ref().unwrap();
    283                 let msg = format!(r#"["NEG-MSG","{}","{}"]"#, sub_id, next_hex);
    284                 pool.send_to(&ClientMessage::Raw(msg), relay_url);
    285                 zero
    286             }
    287             Ok(None) => {
    288                 // Reconciliation complete
    289                 self.need_ids
    290                     .extend(need_ids.iter().map(|id| id.to_bytes()));
    291                 let mut missing = std::mem::take(&mut self.need_ids);
    292 
    293                 // Send NEG-CLOSE
    294                 if let Some(sub_id) = &self.sub_id {
    295                     let close = format!(r#"["NEG-CLOSE","{}"]"#, sub_id);
    296                     pool.send_to(&ClientMessage::Raw(close), relay_url);
    297                 }
    298 
    299                 self.state = SyncState::Idle;
    300                 self.neg = None;
    301 
    302                 // Filter out events we already fetched last round. If
    303                 // the relay still reports them as missing it means they
    304                 // don't match our local filter (wrong kind/author,
    305                 // failed validation, etc.) and re-fetching won't help.
    306                 let skipped = if !self.last_fetched_ids.is_empty() {
    307                     let before = missing.len();
    308                     missing.retain(|id| !self.last_fetched_ids.contains(id));
    309                     let skipped = before - missing.len();
    310                     if skipped > 0 {
    311                         tracing::info!(
    312                             "negentropy: skipping {} events already fetched last round",
    313                             skipped
    314                         );
    315                     }
    316                     skipped
    317                 } else {
    318                     0
    319                 };
    320 
    321                 let new_events = missing.len();
    322                 if new_events > 0 {
    323                     tracing::info!("negentropy: fetching {} missing events", new_events);
    324                     Self::fetch_missing(&missing, pool, relay_url);
    325                     self.pending_fetch_ids = missing.clone();
    326                     self.last_fetched_ids = missing.into_iter().collect();
    327                 } else {
    328                     self.last_fetched_ids.clear();
    329                 }
    330                 SyncResult {
    331                     new_events,
    332                     skipped,
    333                 }
    334             }
    335             Err(e) => {
    336                 tracing::warn!("negentropy reconcile: {e}");
    337                 self.reset_after_error();
    338                 zero
    339             }
    340         }
    341     }
    342 
    343     fn reset_after_error(&mut self) {
    344         self.state = SyncState::Idle;
    345         self.sync_requested = false;
    346         self.sub_id = None;
    347         self.neg = None;
    348         self.need_ids.clear();
    349         self.pending_fetch_ids.clear();
    350         self.last_fetched_ids.clear();
    351     }
    352 
    353     fn fetch_missing(ids: &[[u8; 32]], pool: &mut RelayPool, relay_url: &str) {
    354         for chunk in ids.chunks(FETCH_BATCH_SIZE) {
    355             let sub_id = uuid::Uuid::new_v4().to_string();
    356             let filter = Filter::new().ids(chunk.iter()).build();
    357             let req = ClientMessage::req(sub_id, vec![filter]);
    358             pool.send_to(&req, relay_url);
    359         }
    360     }
    361 }
    362 
    363 impl Default for NegentropySync {
    364     fn default() -> Self {
    365         Self::new()
    366     }
    367 }
    368 
    369 #[cfg(test)]
    370 mod tests {
    371     use super::*;
    372 
    373     #[test]
    374     fn test_neg_event_from_relay_msg() {
    375         let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text(
    376             r#"["NEG-MSG","abc123","deadbeef"]"#.to_string(),
    377         ));
    378         match NegEvent::from_relay(&ws).unwrap() {
    379             NegEvent::Msg { sub_id, payload } => {
    380                 assert_eq!(sub_id, "abc123");
    381                 assert_eq!(payload, "deadbeef");
    382             }
    383             _ => panic!("expected Msg"),
    384         }
    385     }
    386 
    387     #[test]
    388     fn test_neg_event_from_relay_err() {
    389         let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text(
    390             r#"["NEG-ERR","abc123","RESULTS_TOO_BIG"]"#.to_string(),
    391         ));
    392         match NegEvent::from_relay(&ws).unwrap() {
    393             NegEvent::Err { sub_id, reason } => {
    394                 assert_eq!(sub_id, "abc123");
    395                 assert_eq!(reason, "RESULTS_TOO_BIG");
    396             }
    397             _ => panic!("expected Err"),
    398         }
    399     }
    400 
    401     #[test]
    402     fn test_neg_event_ignores_other() {
    403         let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text(
    404             r#"["EVENT","sub","{}"]"#.to_string(),
    405         ));
    406         assert!(NegEvent::from_relay(&ws).is_none());
    407     }
    408 
    409     #[test]
    410     fn test_no_sync_by_default() {
    411         let sync = NegentropySync::new();
    412         assert!(!sync.sync_requested);
    413     }
    414 
    415     #[test]
    416     fn test_trigger_now() {
    417         let mut sync = NegentropySync::new();
    418         assert!(!sync.sync_requested);
    419         sync.trigger_now();
    420         assert!(sync.sync_requested);
    421     }
    422 }