negentropy.rs (14779B)
1 //! NIP-77 negentropy set reconciliation for relay event syncing. 2 //! 3 //! Provides a [`NegentropySync`] state machine that any app can use to 4 //! discover and fetch missing events from a relay. The caller owns the 5 //! relay pool and ndb — this module just drives the protocol. 6 //! 7 //! # Usage 8 //! 9 //! ```ignore 10 //! // In your update loop's relay event callback, collect negentropy events: 11 //! let mut neg_events = Vec::new(); 12 //! try_process_events_core(ctx, ui.ctx(), |app_ctx, ev| { 13 //! if ev.relay == my_relay { 14 //! neg_events.extend(NegEvent::from_relay(&ev.event)); 15 //! } 16 //! }); 17 //! 18 //! // Then process everything in one call: 19 //! self.neg_sync.process(neg_events, ctx.ndb, ctx.pool, &filter, &relay_url); 20 //! ``` 21 22 use std::collections::HashSet; 23 24 use crate::{ClientMessage, RelayPool}; 25 use negentropy::{Id, Negentropy, NegentropyStorageVector}; 26 use nostrdb::{Filter, Ndb, Transaction}; 27 28 /// Maximum number of event IDs to request in a single REQ. 29 const FETCH_BATCH_SIZE: usize = 100; 30 31 /// Result of a single [`NegentropySync::process`] call. 32 #[derive(Debug, Default)] 33 pub struct SyncResult { 34 /// Genuinely new events fetched from the relay this round. 35 pub new_events: usize, 36 /// Events the relay reported as missing but we already tried to 37 /// fetch in a previous round. These are unfetchable (filter 38 /// mismatch, failed validation, etc.) and will not be retried. 39 pub skipped: usize, 40 } 41 42 #[derive(Debug, PartialEq, Eq)] 43 enum SyncState { 44 Idle, 45 Reconciling, 46 } 47 48 /// A negentropy-relevant event extracted from a raw relay message. 49 /// 50 /// Apps collect these inside their relay event callback, then pass 51 /// them to [`NegentropySync::process`]. 52 pub enum NegEvent { 53 /// A NEG-MSG response from the relay. 54 Msg { sub_id: String, payload: String }, 55 /// A NEG-ERR response from the relay. 56 Err { sub_id: String, reason: String }, 57 /// The relay (re)connected — triggers an immediate sync. 58 RelayOpened, 59 } 60 61 impl NegEvent { 62 /// Try to extract a negentropy event from a raw websocket event. 63 /// 64 /// Returns `None` if the message isn't a negentropy protocol message. 65 /// Relay open events should be pushed separately by the app. 66 pub fn from_relay(ws: &ewebsock::WsEvent) -> Option<Self> { 67 let text = match ws { 68 ewebsock::WsEvent::Message(ewebsock::WsMessage::Text(t)) => t, 69 _ => return None, 70 }; 71 72 if text.starts_with("[\"NEG-MSG\"") { 73 let v: serde_json::Value = serde_json::from_str(text).ok()?; 74 let arr = v.as_array()?; 75 if arr.len() >= 3 && arr[0].as_str()? == "NEG-MSG" { 76 return Some(NegEvent::Msg { 77 sub_id: arr[1].as_str()?.to_string(), 78 payload: arr[2].as_str()?.to_string(), 79 }); 80 } 81 } else if text.starts_with("[\"NEG-ERR\"") { 82 let v: serde_json::Value = serde_json::from_str(text).ok()?; 83 let arr = v.as_array()?; 84 if arr.len() >= 3 && arr[0].as_str()? == "NEG-ERR" { 85 return Some(NegEvent::Err { 86 sub_id: arr[1].as_str()?.to_string(), 87 reason: arr[2].as_str()?.to_string(), 88 }); 89 } 90 } 91 92 None 93 } 94 } 95 96 /// NIP-77 negentropy reconciliation state machine. 97 /// 98 /// Compares the client's local event set against a relay and fetches 99 /// any missing events. Generic over event kinds — the caller provides 100 /// the filter. 101 pub struct NegentropySync { 102 state: SyncState, 103 sub_id: Option<String>, 104 neg: Option<Negentropy<'static, NegentropyStorageVector>>, 105 /// Whether a sync has been requested (startup, reconnect, or re-sync after fetch). 106 sync_requested: bool, 107 /// IDs accumulated across multi-round reconciliation. 108 need_ids: Vec<[u8; 32]>, 109 /// IDs sent via REQ that we're waiting to see ingested into ndb 110 /// before starting the next round. 111 pending_fetch_ids: Vec<[u8; 32]>, 112 /// IDs fetched in the previous round. Used to detect events that 113 /// the relay considers missing but that we can't reconcile locally 114 /// (e.g. filter mismatch, failed validation). If the same IDs 115 /// appear as missing again, we skip them to avoid an infinite loop. 116 last_fetched_ids: HashSet<[u8; 32]>, 117 } 118 119 impl NegentropySync { 120 pub fn new() -> Self { 121 Self { 122 state: SyncState::Idle, 123 sub_id: None, 124 neg: None, 125 sync_requested: false, 126 need_ids: Vec::new(), 127 pending_fetch_ids: Vec::new(), 128 last_fetched_ids: HashSet::new(), 129 } 130 } 131 132 /// Request a sync on the next `process()` call. 133 /// 134 /// Call this on startup and reconnect. Also called internally 135 /// after fetching missing events to verify catch-up is complete. 136 pub fn trigger_now(&mut self) { 137 self.sync_requested = true; 138 } 139 140 /// Process collected relay events and run periodic sync. 141 /// 142 /// Call this once per frame after collecting [`NegEvent`]s from 143 /// the relay event loop. Handles the full protocol lifecycle: 144 /// initiating sync, multi-round reconciliation, fetching missing 145 /// events, error recovery, and periodic re-sync. 146 /// 147 /// Returns per-round fetch stats so the caller can decide whether 148 /// to re-trigger another reconciliation round. 149 pub fn process( 150 &mut self, 151 events: Vec<NegEvent>, 152 ndb: &Ndb, 153 pool: &mut RelayPool, 154 filter: &Filter, 155 relay_url: &str, 156 ) -> SyncResult { 157 let mut result = SyncResult::default(); 158 159 for event in events { 160 match event { 161 NegEvent::RelayOpened => { 162 self.trigger_now(); 163 } 164 NegEvent::Msg { sub_id, payload } => { 165 if self.sub_id.as_deref() != Some(&sub_id) { 166 continue; 167 } 168 let r = self.handle_msg(&payload, pool, relay_url); 169 result.new_events += r.new_events; 170 result.skipped += r.skipped; 171 } 172 NegEvent::Err { sub_id, reason } => { 173 if self.sub_id.as_deref() != Some(&sub_id) { 174 continue; 175 } 176 tracing::warn!("negentropy NEG-ERR: {reason}"); 177 self.reset_after_error(); 178 } 179 } 180 } 181 182 // Wait for previously-fetched events to be ingested into ndb 183 // before starting the next round. Without this, the next round 184 // starts before the REQ responses arrive, causing the same 185 // events to be identified as missing every round. 186 if self.sync_requested && !self.pending_fetch_ids.is_empty() { 187 if let Ok(txn) = Transaction::new(ndb) { 188 if ndb.get_note_by_id(&txn, &self.pending_fetch_ids[0]).is_ok() { 189 tracing::info!( 190 "negentropy: fetched events ingested, proceeding with next round" 191 ); 192 self.pending_fetch_ids.clear(); 193 } else { 194 // Events not yet ingested — wait for next frame 195 return result; 196 } 197 } 198 } 199 200 // Initiate sync if requested and idle 201 if self.sync_requested && self.state == SyncState::Idle { 202 self.sync_requested = false; 203 if let Some(open_msg) = self.initiate(ndb, filter) { 204 pool.send_to(&ClientMessage::Raw(open_msg), relay_url); 205 tracing::info!("negentropy: initiated sync"); 206 } 207 } 208 209 result 210 } 211 212 fn initiate(&mut self, ndb: &Ndb, filter: &Filter) -> Option<String> { 213 let txn = Transaction::new(ndb).ok()?; 214 215 let mut storage = NegentropyStorageVector::new(); 216 let result = ndb.fold( 217 &txn, 218 std::slice::from_ref(filter), 219 &mut storage, 220 |storage, note| { 221 let created_at = note.created_at(); 222 let id = Id::from_byte_array(*note.id()); 223 let _ = storage.insert(created_at, id); 224 storage 225 }, 226 ); 227 228 if result.is_err() { 229 return None; 230 } 231 232 storage.seal().ok()?; 233 234 let mut neg = Negentropy::owned(storage, 0).ok()?; 235 let init_msg = neg.initiate().ok()?; 236 let init_hex = hex::encode(&init_msg); 237 238 let filter_json = filter.json().ok()?; 239 let sub_id = uuid::Uuid::new_v4().to_string(); 240 241 let msg = format!( 242 r#"["NEG-OPEN","{}",{},"{}"]"#, 243 sub_id, filter_json, init_hex 244 ); 245 246 self.neg = Some(neg); 247 self.sub_id = Some(sub_id); 248 self.state = SyncState::Reconciling; 249 self.need_ids.clear(); 250 251 Some(msg) 252 } 253 254 /// Handle a NEG-MSG from the relay and return per-round fetch stats. 255 fn handle_msg(&mut self, msg_hex: &str, pool: &mut RelayPool, relay_url: &str) -> SyncResult { 256 let zero = SyncResult::default(); 257 let neg = match self.neg.as_mut() { 258 Some(n) => n, 259 None => { 260 tracing::warn!("negentropy: received msg with no active session"); 261 return zero; 262 } 263 }; 264 265 let msg_bytes = match hex::decode(msg_hex) { 266 Ok(b) => b, 267 Err(e) => { 268 tracing::warn!("negentropy hex decode: {e}"); 269 self.reset_after_error(); 270 return zero; 271 } 272 }; 273 274 let mut have_ids = Vec::new(); 275 let mut need_ids = Vec::new(); 276 277 match neg.reconcile_with_ids(&msg_bytes, &mut have_ids, &mut need_ids) { 278 Ok(Some(next_msg)) => { 279 self.need_ids 280 .extend(need_ids.iter().map(|id| id.to_bytes())); 281 let next_hex = hex::encode(&next_msg); 282 let sub_id = self.sub_id.as_ref().unwrap(); 283 let msg = format!(r#"["NEG-MSG","{}","{}"]"#, sub_id, next_hex); 284 pool.send_to(&ClientMessage::Raw(msg), relay_url); 285 zero 286 } 287 Ok(None) => { 288 // Reconciliation complete 289 self.need_ids 290 .extend(need_ids.iter().map(|id| id.to_bytes())); 291 let mut missing = std::mem::take(&mut self.need_ids); 292 293 // Send NEG-CLOSE 294 if let Some(sub_id) = &self.sub_id { 295 let close = format!(r#"["NEG-CLOSE","{}"]"#, sub_id); 296 pool.send_to(&ClientMessage::Raw(close), relay_url); 297 } 298 299 self.state = SyncState::Idle; 300 self.neg = None; 301 302 // Filter out events we already fetched last round. If 303 // the relay still reports them as missing it means they 304 // don't match our local filter (wrong kind/author, 305 // failed validation, etc.) and re-fetching won't help. 306 let skipped = if !self.last_fetched_ids.is_empty() { 307 let before = missing.len(); 308 missing.retain(|id| !self.last_fetched_ids.contains(id)); 309 let skipped = before - missing.len(); 310 if skipped > 0 { 311 tracing::info!( 312 "negentropy: skipping {} events already fetched last round", 313 skipped 314 ); 315 } 316 skipped 317 } else { 318 0 319 }; 320 321 let new_events = missing.len(); 322 if new_events > 0 { 323 tracing::info!("negentropy: fetching {} missing events", new_events); 324 Self::fetch_missing(&missing, pool, relay_url); 325 self.pending_fetch_ids = missing.clone(); 326 self.last_fetched_ids = missing.into_iter().collect(); 327 } else { 328 self.last_fetched_ids.clear(); 329 } 330 SyncResult { 331 new_events, 332 skipped, 333 } 334 } 335 Err(e) => { 336 tracing::warn!("negentropy reconcile: {e}"); 337 self.reset_after_error(); 338 zero 339 } 340 } 341 } 342 343 fn reset_after_error(&mut self) { 344 self.state = SyncState::Idle; 345 self.sync_requested = false; 346 self.sub_id = None; 347 self.neg = None; 348 self.need_ids.clear(); 349 self.pending_fetch_ids.clear(); 350 self.last_fetched_ids.clear(); 351 } 352 353 fn fetch_missing(ids: &[[u8; 32]], pool: &mut RelayPool, relay_url: &str) { 354 for chunk in ids.chunks(FETCH_BATCH_SIZE) { 355 let sub_id = uuid::Uuid::new_v4().to_string(); 356 let filter = Filter::new().ids(chunk.iter()).build(); 357 let req = ClientMessage::req(sub_id, vec![filter]); 358 pool.send_to(&req, relay_url); 359 } 360 } 361 } 362 363 impl Default for NegentropySync { 364 fn default() -> Self { 365 Self::new() 366 } 367 } 368 369 #[cfg(test)] 370 mod tests { 371 use super::*; 372 373 #[test] 374 fn test_neg_event_from_relay_msg() { 375 let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text( 376 r#"["NEG-MSG","abc123","deadbeef"]"#.to_string(), 377 )); 378 match NegEvent::from_relay(&ws).unwrap() { 379 NegEvent::Msg { sub_id, payload } => { 380 assert_eq!(sub_id, "abc123"); 381 assert_eq!(payload, "deadbeef"); 382 } 383 _ => panic!("expected Msg"), 384 } 385 } 386 387 #[test] 388 fn test_neg_event_from_relay_err() { 389 let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text( 390 r#"["NEG-ERR","abc123","RESULTS_TOO_BIG"]"#.to_string(), 391 )); 392 match NegEvent::from_relay(&ws).unwrap() { 393 NegEvent::Err { sub_id, reason } => { 394 assert_eq!(sub_id, "abc123"); 395 assert_eq!(reason, "RESULTS_TOO_BIG"); 396 } 397 _ => panic!("expected Err"), 398 } 399 } 400 401 #[test] 402 fn test_neg_event_ignores_other() { 403 let ws = ewebsock::WsEvent::Message(ewebsock::WsMessage::Text( 404 r#"["EVENT","sub","{}"]"#.to_string(), 405 )); 406 assert!(NegEvent::from_relay(&ws).is_none()); 407 } 408 409 #[test] 410 fn test_no_sync_by_default() { 411 let sync = NegentropySync::new(); 412 assert!(!sync.sync_requested); 413 } 414 415 #[test] 416 fn test_trigger_now() { 417 let mut sync = NegentropySync::new(); 418 assert!(!sync.sync_requested); 419 sync.trigger_now(); 420 assert!(sync.sync_requested); 421 } 422 }