mod.rs (33965B)
1 use hashbrown::{hash_map::RawEntryMut, HashMap, HashSet}; 2 use nostrdb::{Filter, Note}; 3 use std::{ 4 collections::{hash_map::DefaultHasher, BTreeMap}, 5 hash::{Hash, Hasher}, 6 time::{Duration, Instant, SystemTime, UNIX_EPOCH}, 7 }; 8 9 use crate::{ 10 relay::{ 11 coordinator::{CoordinationData, CoordinationSession, EoseIds}, 12 websocket::WebsocketRelay, 13 ModifyTask, MulticastRelayCache, NormRelayUrl, OutboxSubId, OutboxSubscriptions, 14 OutboxTask, RawEventData, RelayId, RelayLimitations, RelayReqStatus, RelayStatus, 15 RelayType, 16 }, 17 EventClientMessage, Wakeup, WebsocketConn, 18 }; 19 20 mod handler; 21 mod session; 22 23 pub use handler::OutboxSessionHandler; 24 pub use session::OutboxSession; 25 26 const KEEPALIVE_PING_RATE: Duration = Duration::from_secs(45); 27 const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30 * 60); // 30 minutes 28 29 /// Computes the deterministic base delay for a given attempt number. 30 /// Formula: `5s * 2^attempt`, capped at [`MAX_RECONNECT_DELAY`]. 31 fn base_reconnect_delay(attempt: u32) -> Duration { 32 let secs = 5u64.checked_shl(attempt).unwrap_or(u64::MAX); 33 Duration::from_secs(secs).min(MAX_RECONNECT_DELAY) 34 } 35 36 fn reconnect_jitter_seed(relay_url: &nostr::RelayUrl, attempt: u32) -> u64 { 37 let now_nanos = SystemTime::now() 38 .duration_since(UNIX_EPOCH) 39 .unwrap_or_default() 40 .as_nanos() as u64; 41 let mut hasher = DefaultHasher::new(); 42 relay_url.hash(&mut hasher); 43 attempt.hash(&mut hasher); 44 now_nanos.hash(&mut hasher); 45 hasher.finish() 46 } 47 48 /// Returns the reconnect delay for the given attempt count. 49 /// 50 /// Uses the exponential base delay as the primary component and adds up to 25% 51 /// additive jitter (via relay/time mixed seed) to spread out simultaneous 52 /// reconnects without undermining the exponential delay itself. 53 fn next_reconnect_duration(attempt: u32, jitter_seed: u64) -> Duration { 54 let base = base_reconnect_delay(attempt); 55 let jitter_ceiling = base / 4; 56 let jitter = if jitter_ceiling.is_zero() { 57 Duration::ZERO 58 } else { 59 let jitter_ceiling_nanos = jitter_ceiling.as_nanos() as u64; 60 Duration::from_nanos(jitter_seed % jitter_ceiling_nanos) 61 }; 62 (base + jitter).min(MAX_RECONNECT_DELAY) 63 } 64 65 /// OutboxPool owns the active relay coordinators and applies staged subscription 66 /// mutations to them each frame. 67 pub struct OutboxPool { 68 registry: SubRegistry, 69 relays: HashMap<NormRelayUrl, CoordinationData>, 70 subs: OutboxSubscriptions, 71 multicast: MulticastRelayCache, 72 } 73 74 impl Default for OutboxPool { 75 fn default() -> Self { 76 Self { 77 registry: SubRegistry { next_request_id: 0 }, 78 relays: HashMap::new(), 79 multicast: Default::default(), 80 subs: Default::default(), 81 } 82 } 83 } 84 85 impl OutboxPool { 86 fn remove_completed_oneshots(&mut self, ids: HashSet<OutboxSubId>) { 87 for id in ids { 88 if self.all_have_eose(&id) { 89 self.subs.remove(&id); 90 } 91 } 92 } 93 94 #[profiling::function] 95 fn ingest_session<W>(&mut self, session: OutboxSession, wakeup: &W) 96 where 97 W: Wakeup, 98 { 99 let sessions = self.collect_sessions(session); 100 let mut pending_eose_ids = EoseIds::default(); 101 102 // Process relays with sessions 103 let sessions_keys = if sessions.is_empty() { 104 HashSet::new() 105 } else { 106 let sessions_keys: HashSet<NormRelayUrl> = sessions.keys().cloned().collect(); 107 let session_eose_ids = self.process_sessions(sessions, wakeup); 108 pending_eose_ids.absorb(session_eose_ids); 109 sessions_keys 110 }; 111 112 // Also process EOSE for relays that have pending EOSE but no session 113 // tasks. We only remove oneshots after all relay legs have reached EOSE. 114 let mut eose_state = EoseState { 115 relays: &mut self.relays, 116 subs: &mut self.subs, 117 }; 118 let extra_eose_ids = 119 process_pending_eose_for_non_session_relays(&mut eose_state, &sessions_keys); 120 pending_eose_ids.absorb(extra_eose_ids); 121 122 optimize_since_for_fully_eosed_subs(&mut eose_state, pending_eose_ids.normal); 123 self.remove_completed_oneshots(pending_eose_ids.oneshots); 124 } 125 126 /// Translates a session's queued tasks into per-relay coordination sessions. 127 #[profiling::function] 128 fn collect_sessions( 129 &mut self, 130 session: OutboxSession, 131 ) -> HashMap<NormRelayUrl, CoordinationSession> { 132 if session.tasks.is_empty() { 133 return HashMap::new(); 134 } 135 136 let mut sessions: HashMap<NormRelayUrl, CoordinationSession> = HashMap::new(); 137 'a: for (id, task) in session.tasks { 138 match task { 139 OutboxTask::Modify(modify) => 's: { 140 let Some(sub) = self.subs.get_mut(&id) else { 141 continue 'a; 142 }; 143 144 match &modify { 145 ModifyTask::Filters(_) => { 146 for relay in &sub.relays { 147 get_session(&mut sessions, relay) 148 .subscribe(id, sub.relay_type == RelayType::Transparent); 149 } 150 } 151 ModifyTask::Relays(modify_relays_task) => { 152 let relays_to_remove = sub.relays.difference(&modify_relays_task.0); 153 for relay in relays_to_remove { 154 get_session(&mut sessions, relay).unsubscribe(id); 155 } 156 157 let relays_to_add = modify_relays_task.0.difference(&sub.relays); 158 for relay in relays_to_add { 159 get_session(&mut sessions, relay) 160 .subscribe(id, sub.relay_type == RelayType::Transparent); 161 } 162 } 163 ModifyTask::Full(full_modification_task) => { 164 let prev_relays = &sub.relays; 165 let new_relays = &full_modification_task.relays; 166 let relays_to_remove = prev_relays.difference(new_relays); 167 for relay in relays_to_remove { 168 get_session(&mut sessions, relay).unsubscribe(id); 169 } 170 171 if new_relays.is_empty() { 172 self.subs.remove(&id); 173 break 's; 174 } 175 176 for relay in new_relays { 177 get_session(&mut sessions, relay) 178 .subscribe(id, sub.relay_type == RelayType::Transparent); 179 } 180 } 181 } 182 183 sub.ingest_task(modify); 184 } 185 OutboxTask::Unsubscribe => { 186 let Some(sub) = self.subs.get_mut(&id) else { 187 continue 'a; 188 }; 189 190 for relay_id in &sub.relays { 191 get_session(&mut sessions, relay_id).unsubscribe(id); 192 } 193 194 self.subs.remove(&id); 195 } 196 OutboxTask::Oneshot(subscribe) => { 197 for relay in &subscribe.relays.urls { 198 get_session(&mut sessions, relay) 199 .subscribe(id, subscribe.relays.use_transparent); 200 } 201 self.subs.new_subscription(id, subscribe, true); 202 } 203 OutboxTask::Subscribe(subscribe) => { 204 for relay in &subscribe.relays.urls { 205 get_session(&mut sessions, relay) 206 .subscribe(id, subscribe.relays.use_transparent); 207 } 208 209 self.subs.new_subscription(id, subscribe, false); 210 } 211 } 212 } 213 214 sessions 215 } 216 217 /// Ensures relay coordinators exist and feed them the coordination sessions. 218 #[profiling::function] 219 fn process_sessions<W>( 220 &mut self, 221 sessions: HashMap<NormRelayUrl, CoordinationSession>, 222 wakeup: &W, 223 ) -> EoseIds 224 where 225 W: Wakeup, 226 { 227 let mut pending_eoses = EoseIds::default(); 228 for (relay_id, session) in sessions { 229 let relay = match self.relays.raw_entry_mut().from_key(&relay_id) { 230 RawEntryMut::Occupied(e) => 's: { 231 let res = e.into_mut(); 232 233 if res.websocket.is_some() { 234 break 's res; 235 } 236 237 let Ok(websocket) = WebsocketConn::from_wakeup(relay_id.into(), wakeup.clone()) 238 else { 239 // still can't generate websocket 240 break 's res; 241 }; 242 243 res.websocket = Some(WebsocketRelay::new(websocket)); 244 245 res 246 } 247 RawEntryMut::Vacant(e) => { 248 let coordinator = build_relay(relay_id.clone(), wakeup.clone()); 249 let (_, res) = e.insert(relay_id, coordinator); 250 res 251 } 252 }; 253 let eose_ids = relay.ingest_session(&self.subs, session); 254 255 pending_eoses.absorb(eose_ids); 256 } 257 258 pending_eoses 259 } 260 261 pub fn start_session<'a, W>(&'a mut self, wakeup: W) -> OutboxSessionHandler<'a, W> 262 where 263 W: Wakeup, 264 { 265 OutboxSessionHandler { 266 outbox: self, 267 session: OutboxSession::default(), 268 wakeup, 269 } 270 } 271 272 pub fn broadcast_note<W>(&mut self, note: &Note, relays: Vec<RelayId>, wakeup: &W) 273 where 274 W: Wakeup, 275 { 276 for relay_id in relays { 277 let Ok(msg) = EventClientMessage::try_from(note) else { 278 continue; 279 }; 280 match relay_id { 281 RelayId::Websocket(norm_relay_url) => { 282 let rel = self.ensure_relay(&norm_relay_url, wakeup); 283 rel.send_event(msg); 284 } 285 RelayId::Multicast => { 286 if !self.multicast.is_setup() { 287 self.multicast.try_setup(wakeup); 288 }; 289 290 self.multicast.broadcast(msg); 291 } 292 } 293 } 294 } 295 296 #[profiling::function] 297 pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { 298 for relay in self.relays.values_mut() { 299 let now = Instant::now(); 300 301 let Some(websocket) = &mut relay.websocket else { 302 continue; 303 }; 304 305 match websocket.conn.status { 306 RelayStatus::Disconnected => { 307 let reconnect_at = 308 websocket.last_connect_attempt + websocket.retry_connect_after; 309 if now > reconnect_at { 310 websocket.last_connect_attempt = now; 311 websocket.reconnect_attempt = websocket.reconnect_attempt.saturating_add(1); 312 let jitter_seed = 313 reconnect_jitter_seed(&websocket.conn.url, websocket.reconnect_attempt); 314 let next_duration = 315 next_reconnect_duration(websocket.reconnect_attempt, jitter_seed); 316 tracing::debug!( 317 "reconnect attempt {}, backing off for {:?}", 318 websocket.reconnect_attempt, 319 next_duration, 320 ); 321 websocket.retry_connect_after = next_duration; 322 if let Err(err) = websocket.conn.connect(wakeup.clone()) { 323 tracing::error!("error connecting to relay: {}", err); 324 } 325 } 326 } 327 RelayStatus::Connected => { 328 websocket.reconnect_attempt = 0; 329 websocket.retry_connect_after = WebsocketRelay::initial_reconnect_duration(); 330 331 let should_ping = now - websocket.last_ping > KEEPALIVE_PING_RATE; 332 if should_ping { 333 tracing::trace!("pinging {}", websocket.conn.url); 334 websocket.conn.ping(); 335 websocket.last_ping = Instant::now(); 336 } 337 } 338 RelayStatus::Connecting => {} 339 } 340 } 341 } 342 343 fn ensure_relay<W>(&mut self, relay_id: &NormRelayUrl, wakeup: &W) -> &mut CoordinationData 344 where 345 W: Wakeup, 346 { 347 match self.relays.raw_entry_mut().from_key(relay_id) { 348 RawEntryMut::Occupied(entry) => entry.into_mut(), 349 RawEntryMut::Vacant(entry) => { 350 let (_, res) = entry.insert( 351 relay_id.clone(), 352 build_relay(relay_id.clone(), wakeup.clone()), 353 ); 354 res 355 } 356 } 357 } 358 359 pub fn status(&self, id: &OutboxSubId) -> HashMap<&NormRelayUrl, RelayReqStatus> { 360 let mut status = HashMap::new(); 361 for (url, relay) in &self.relays { 362 let Some(res) = relay.req_status(id) else { 363 continue; 364 }; 365 status.insert(url, res); 366 } 367 368 status 369 } 370 371 pub fn websocket_statuses(&self) -> BTreeMap<&NormRelayUrl, RelayStatus> { 372 let mut status = BTreeMap::new(); 373 374 for (url, relay) in &self.relays { 375 let relay_status = if let Some(websocket) = &relay.websocket { 376 websocket.conn.status 377 } else { 378 RelayStatus::Disconnected 379 }; 380 status.insert(url, relay_status); 381 } 382 383 status 384 } 385 386 pub fn has_eose(&self, id: &OutboxSubId) -> bool { 387 for relay in self.relays.values() { 388 if relay.req_status(id) == Some(RelayReqStatus::Eose) { 389 return true; 390 } 391 } 392 393 false 394 } 395 396 pub fn all_have_eose(&self, id: &OutboxSubId) -> bool { 397 for relay in self.relays.values() { 398 let Some(status) = relay.req_status(id) else { 399 continue; 400 }; 401 if status != RelayReqStatus::Eose { 402 return false; 403 } 404 } 405 406 true 407 } 408 409 /// Returns a clone of the filters for the given subscription ID. 410 pub fn filters(&self, id: &OutboxSubId) -> Option<&Vec<Filter>> { 411 self.subs.view(id).map(|v| v.filters.get_filters()) 412 } 413 414 #[profiling::function] 415 pub fn try_recv<F>(&mut self, mut max_notes: usize, mut process: F) 416 where 417 for<'a> F: FnMut(RawEventData<'a>), 418 { 419 's: while max_notes > 0 { 420 let mut received_any = false; 421 422 for relay in self.relays.values_mut() { 423 let resp = relay.try_recv(&self.subs, &mut process); 424 425 if !resp.received_event { 426 continue; 427 } 428 429 received_any = true; 430 431 if resp.event_was_nostr_note { 432 max_notes = max_notes.saturating_sub(1); 433 if max_notes == 0 { 434 break 's; 435 } 436 } 437 } 438 439 if !received_any { 440 break; 441 } 442 } 443 444 self.multicast.try_recv(process); 445 } 446 } 447 448 struct EoseState<'a> { 449 relays: &'a mut HashMap<NormRelayUrl, CoordinationData>, 450 subs: &'a mut OutboxSubscriptions, 451 } 452 453 fn unix_now_secs() -> Option<u64> { 454 SystemTime::now() 455 .duration_since(UNIX_EPOCH) 456 .ok() 457 .map(|d| d.as_secs()) 458 } 459 460 fn sub_all_relays_have_eose(state: &EoseState<'_>, id: &OutboxSubId) -> bool { 461 let Some(sub) = state.subs.get(id) else { 462 return false; 463 }; 464 if sub.relays.is_empty() { 465 return false; 466 } 467 468 for relay_id in &sub.relays { 469 let Some(relay) = state.relays.get(relay_id) else { 470 return false; 471 }; 472 if relay.req_status(id) != Some(RelayReqStatus::Eose) { 473 return false; 474 } 475 } 476 477 true 478 } 479 480 fn optimize_since_for_fully_eosed_subs(state: &mut EoseState<'_>, ids: HashSet<OutboxSubId>) { 481 let Some(now) = unix_now_secs() else { 482 return; 483 }; 484 485 for id in ids { 486 // Since optimization is only safe after every relay leg for this 487 // subscription has reached EOSE at least once. 488 if !sub_all_relays_have_eose(state, &id) { 489 continue; 490 } 491 492 if let Some(sub) = state.subs.get_mut(&id) { 493 sub.see_all(now); 494 sub.filters.since_optimize(); 495 } 496 } 497 } 498 499 fn process_pending_eose_for_non_session_relays( 500 state: &mut EoseState<'_>, 501 sessions_keys: &HashSet<NormRelayUrl>, 502 ) -> EoseIds { 503 let mut pending_eoses = EoseIds::default(); 504 505 for (relay_id, relay) in state.relays.iter_mut() { 506 if sessions_keys.contains(relay_id) { 507 continue; 508 } 509 510 let eose_ids = relay.ingest_session(state.subs, CoordinationSession::default()); 511 pending_eoses.absorb(eose_ids); 512 } 513 514 pending_eoses 515 } 516 517 struct SubRegistry { 518 next_request_id: u64, 519 } 520 521 impl SubRegistry { 522 pub fn next(&mut self) -> OutboxSubId { 523 let i = self.next_request_id; 524 self.next_request_id += 1; 525 OutboxSubId(i) 526 } 527 } 528 529 pub fn get_session<'a>( 530 map: &'a mut HashMap<NormRelayUrl, CoordinationSession>, 531 id: &NormRelayUrl, 532 ) -> &'a mut CoordinationSession { 533 match map.raw_entry_mut().from_key(id) { 534 RawEntryMut::Occupied(e) => e.into_mut(), 535 RawEntryMut::Vacant(e) => { 536 let session = CoordinationSession::default(); 537 let (_, res) = e.insert(id.clone(), session); 538 res 539 } 540 } 541 } 542 543 fn build_relay<W>(relay_id: NormRelayUrl, wakeup: W) -> CoordinationData 544 where 545 W: Wakeup, 546 { 547 CoordinationData::new( 548 RelayLimitations::default(), // TODO(kernelkind): add actual limitations 549 relay_id, 550 wakeup, 551 ) 552 } 553 554 #[cfg(test)] 555 mod tests { 556 use hashbrown::HashSet; 557 use nostrdb::Filter; 558 559 use super::*; 560 use crate::relay::{ 561 coordinator::CoordinationTask, 562 test_utils::{filters_json, trivial_filter, MockWakeup}, 563 RelayUrlPkgs, 564 }; 565 566 /// Ensures the subscription registry always yields unique IDs. 567 #[test] 568 fn registry_generates_unique_ids() { 569 let mut registry = SubRegistry { next_request_id: 0 }; 570 571 let id1 = registry.next(); 572 let id2 = registry.next(); 573 let id3 = registry.next(); 574 575 assert_ne!(id1, id2); 576 assert_ne!(id2, id3); 577 assert_ne!(id1, id3); 578 } 579 580 // ==================== OutboxPool tests ==================== 581 582 /// Default pool has no relays or subscriptions. 583 #[test] 584 fn outbox_pool_default_empty() { 585 let pool = OutboxPool::default(); 586 assert!(pool.relays.is_empty()); 587 // Verify no subscriptions by checking that a lookup returns empty status 588 assert!(pool.status(&OutboxSubId(0)).is_empty()); 589 } 590 591 /// has_eose returns false when no relays are tracking the request. 592 #[test] 593 fn outbox_pool_has_eose_false_when_empty() { 594 let pool = OutboxPool::default(); 595 assert!(!pool.has_eose(&OutboxSubId(0))); 596 } 597 598 /// status() returns empty map for unknown request IDs. 599 #[test] 600 fn outbox_pool_status_empty_for_unknown() { 601 let pool = OutboxPool::default(); 602 let status = pool.status(&OutboxSubId(999)); 603 assert!(status.is_empty()); 604 } 605 606 /// websocket_statuses() is empty before any relays connect. 607 #[test] 608 fn outbox_pool_websocket_statuses_empty_initially() { 609 let pool = OutboxPool::default(); 610 let statuses = pool.websocket_statuses(); 611 assert!(statuses.is_empty()); 612 } 613 614 /// Full modifications should unsubscribe old relays and resubscribe new ones using the updated filters. 615 #[test] 616 fn full_modification_updates_sessions_with_new_filters() { 617 let mut pool = OutboxPool::default(); 618 let wakeup = MockWakeup::default(); 619 let relay_a = NormRelayUrl::new("wss://relay-a.example.com").unwrap(); 620 let relay_b = NormRelayUrl::new("wss://relay-b.example.com").unwrap(); 621 622 let mut urls = HashSet::new(); 623 urls.insert(relay_a.clone()); 624 let new_sub_id = { 625 let mut handler = pool.start_session(wakeup.clone()); 626 handler.subscribe(trivial_filter(), RelayUrlPkgs::new(urls)) 627 }; 628 629 { 630 let sub = pool 631 .subs 632 .get_mut(&new_sub_id) 633 .expect("subscription should be registered"); 634 assert_eq!(sub.relays.len(), 1); 635 assert!(sub.relays.contains(&relay_a)); 636 assert!(!sub.is_oneshot); 637 assert_eq!(sub.relay_type, RelayType::Compaction); 638 } 639 640 let sessions = { 641 let mut updated_relays = HashSet::new(); 642 updated_relays.insert(relay_b.clone()); 643 644 let mut handler = pool.start_session(wakeup); 645 handler.modify_filters( 646 new_sub_id, 647 vec![Filter::new().kinds(vec![3]).limit(1).build()], 648 ); 649 handler.modify_relays(new_sub_id, updated_relays); 650 let session = handler.export(); 651 pool.collect_sessions(session) 652 }; 653 654 let old_task = sessions 655 .get(&relay_a) 656 .and_then(|session| session.tasks.get(&new_sub_id)) 657 .expect("expected a task for relay relay_a"); 658 assert!(matches!(old_task, CoordinationTask::Unsubscribe)); 659 660 let new_task = sessions 661 .get(&relay_b) 662 .and_then(|session| session.tasks.get(&new_sub_id)) 663 .expect("expected a task for relay relay_b"); 664 assert!(matches!(new_task, CoordinationTask::CompactionSub)); 665 } 666 667 /// Base delay doubles on each attempt until it reaches the configured cap. 668 #[test] 669 fn reconnect_base_delay_doubles_with_cap() { 670 assert_eq!(base_reconnect_delay(0), Duration::from_secs(5)); 671 assert_eq!(base_reconnect_delay(1), Duration::from_secs(10)); 672 assert_eq!(base_reconnect_delay(2), Duration::from_secs(20)); 673 assert_eq!(base_reconnect_delay(3), Duration::from_secs(40)); 674 assert_eq!(base_reconnect_delay(4), Duration::from_secs(80)); 675 assert_eq!(base_reconnect_delay(5), Duration::from_secs(160)); 676 assert_eq!(base_reconnect_delay(6), Duration::from_secs(320)); 677 assert_eq!(base_reconnect_delay(7), Duration::from_secs(640)); 678 assert_eq!(base_reconnect_delay(8), Duration::from_secs(1280)); 679 assert_eq!(base_reconnect_delay(9), MAX_RECONNECT_DELAY); 680 // Saturates at cap for any large attempt count. 681 assert_eq!(base_reconnect_delay(100), MAX_RECONNECT_DELAY); 682 } 683 684 /// Jittered delay is always >= the base and never exceeds base * 1.25 or the cap. 685 #[test] 686 fn reconnect_jitter_within_bounds() { 687 for attempt in [0u32, 1, 3, 8, 9, 50, 100] { 688 let base = base_reconnect_delay(attempt); 689 let max_with_jitter = (base + (base / 4)).min(MAX_RECONNECT_DELAY); 690 for sample in 0u64..20 { 691 let jittered = next_reconnect_duration(attempt, 0xBAD5EED ^ sample); 692 assert!( 693 jittered >= base, 694 "jittered {jittered:?} < base {base:?} at attempt {attempt}" 695 ); 696 assert!( 697 jittered <= max_with_jitter, 698 "jittered {jittered:?} exceeds max-with-jitter {max_with_jitter:?} at attempt {attempt}" 699 ); 700 } 701 } 702 } 703 704 /// Oneshot requests route to compaction mode by default. 705 #[test] 706 fn oneshot_routes_to_compaction() { 707 let mut pool = OutboxPool::default(); 708 let relay = NormRelayUrl::new("wss://relay-oneshot.example.com").unwrap(); 709 let mut relays = HashSet::new(); 710 relays.insert(relay.clone()); 711 let filters = vec![Filter::new().kinds(vec![1]).limit(2).build()]; 712 let id = OutboxSubId(42); 713 714 let mut session = OutboxSession::default(); 715 session.oneshot(id, filters.clone(), RelayUrlPkgs::new(relays)); 716 717 let sessions = pool.collect_sessions(session); 718 719 let relay_task = sessions 720 .get(&relay) 721 .and_then(|session| session.tasks.get(&id)) 722 .expect("expected task for oneshot relay"); 723 assert!(matches!(relay_task, CoordinationTask::CompactionSub)); 724 } 725 726 /// Unsubscribing from a multi-relay subscription emits unsubscribe tasks for each relay. 727 #[test] 728 fn unsubscribe_targets_all_relays() { 729 let mut pool = OutboxPool::default(); 730 let relay_a = NormRelayUrl::new("wss://relay-a.example.com").unwrap(); 731 let relay_b = NormRelayUrl::new("wss://relay-b.example.com").unwrap(); 732 let id = OutboxSubId(42); 733 734 // Subscribe to both relays 735 let mut urls = HashSet::new(); 736 urls.insert(relay_a.clone()); 737 urls.insert(relay_b.clone()); 738 739 let mut session = OutboxSession::default(); 740 session.subscribe(id, trivial_filter(), RelayUrlPkgs::new(urls)); 741 pool.collect_sessions(session); 742 743 // Unsubscribe 744 let mut session = OutboxSession::default(); 745 session.unsubscribe(id); 746 let sessions = pool.collect_sessions(session); 747 748 // Both relays should receive unsubscribe tasks 749 let task_a = sessions.get(&relay_a).and_then(|s| s.tasks.get(&id)); 750 let task_b = sessions.get(&relay_b).and_then(|s| s.tasks.get(&id)); 751 752 assert!(matches!(task_a, Some(CoordinationTask::Unsubscribe))); 753 assert!(matches!(task_b, Some(CoordinationTask::Unsubscribe))); 754 } 755 756 /// Subscriptions with use_transparent=true route to transparent mode. 757 #[test] 758 fn subscribe_transparent_mode() { 759 let mut pool = OutboxPool::default(); 760 let relay = NormRelayUrl::new("wss://relay-transparent.example.com").unwrap(); 761 let id = OutboxSubId(5); 762 763 let mut urls = HashSet::new(); 764 urls.insert(relay.clone()); 765 let mut pkgs = RelayUrlPkgs::new(urls); 766 pkgs.use_transparent = true; 767 768 let mut session = OutboxSession::default(); 769 session.subscribe(id, trivial_filter(), pkgs); 770 let sessions = pool.collect_sessions(session); 771 772 let task = sessions.get(&relay).and_then(|s| s.tasks.get(&id)); 773 assert!(matches!(task, Some(CoordinationTask::TransparentSub))); 774 } 775 776 /// Modifying filters should re-subscribe the routed relays with the new filters. 777 #[test] 778 fn modify_filters_reissues_subscribe_for_existing_relays() { 779 let mut pool = OutboxPool::default(); 780 let wakeup = MockWakeup::default(); 781 let relay = NormRelayUrl::new("wss://relay-modify.example.com").unwrap(); 782 783 let mut urls = HashSet::new(); 784 urls.insert(relay.clone()); 785 let sub_id = { 786 let mut handler = pool.start_session(wakeup.clone()); 787 handler.subscribe(trivial_filter(), RelayUrlPkgs::new(urls)) 788 }; 789 790 let (sessions, expected_json) = { 791 let mut handler = pool.start_session(wakeup); 792 let updated_filters = vec![Filter::new().kinds(vec![7]).limit(2).build()]; 793 let expected_json = filters_json(&updated_filters); 794 handler.modify_filters(sub_id, updated_filters); 795 let session = handler.export(); 796 (pool.collect_sessions(session), expected_json) 797 }; 798 799 let view = pool.subs.view(&sub_id).expect("updated subscription view"); 800 let stored_json = filters_json(view.filters.get_filters()); 801 assert_eq!(stored_json, expected_json); 802 803 let task = sessions 804 .get(&relay) 805 .and_then(|session| session.tasks.get(&sub_id)) 806 .expect("expected coordination task"); 807 assert!(matches!(task, CoordinationTask::CompactionSub)); 808 } 809 810 /// Modifying relays should unsubscribe removed relays and subscribe new ones. 811 #[test] 812 fn modify_relays_differs_routing_sets() { 813 let mut pool = OutboxPool::default(); 814 let wakeup = MockWakeup::default(); 815 let relay_a = NormRelayUrl::new("wss://relay-diff-a.example.com").unwrap(); 816 let relay_b = NormRelayUrl::new("wss://relay-diff-b.example.com").unwrap(); 817 818 let mut urls = HashSet::new(); 819 urls.insert(relay_a.clone()); 820 let sub_id = { 821 let mut handler = pool.start_session(wakeup.clone()); 822 handler.subscribe(trivial_filter(), RelayUrlPkgs::new(urls)) 823 }; 824 825 let sessions = { 826 let mut handler = pool.start_session(wakeup); 827 let mut new_urls = HashSet::new(); 828 new_urls.insert(relay_b.clone()); 829 handler.modify_relays(sub_id, new_urls); 830 let session = handler.export(); 831 pool.collect_sessions(session) 832 }; 833 834 let unsub_task = sessions 835 .get(&relay_a) 836 .and_then(|session| session.tasks.get(&sub_id)) 837 .expect("missing relay_a task"); 838 assert!(matches!(unsub_task, CoordinationTask::Unsubscribe)); 839 840 let sub_task = sessions 841 .get(&relay_b) 842 .and_then(|session| session.tasks.get(&sub_id)) 843 .expect("missing relay_b task"); 844 assert!(matches!(sub_task, CoordinationTask::CompactionSub)); 845 } 846 847 /// Full modifications that end up with no relays should drop the subscription entirely. 848 #[test] 849 fn modify_full_with_empty_relays_removes_subscription() { 850 let mut pool = OutboxPool::default(); 851 let wakeup = MockWakeup::default(); 852 let relay = NormRelayUrl::new("wss://relay-empty.example.com").unwrap(); 853 854 let mut urls = HashSet::new(); 855 urls.insert(relay.clone()); 856 let sub_id = { 857 let mut handler = pool.start_session(wakeup.clone()); 858 handler.subscribe(trivial_filter(), RelayUrlPkgs::new(urls)) 859 }; 860 861 let sessions = { 862 let mut handler = pool.start_session(wakeup); 863 handler.modify_filters(sub_id, vec![Filter::new().kinds(vec![9]).limit(1).build()]); 864 handler.modify_relays(sub_id, HashSet::new()); 865 let session = handler.export(); 866 pool.collect_sessions(session) 867 }; 868 869 let task = sessions 870 .get(&relay) 871 .and_then(|session| session.tasks.get(&sub_id)) 872 .expect("expected unsubscribe for relay"); 873 assert!(matches!(task, CoordinationTask::Unsubscribe)); 874 assert!( 875 pool.subs.get_mut(&sub_id).is_none(), 876 "subscription metadata should be removed" 877 ); 878 } 879 880 // ==================== OutboxSessionHandler tests ==================== 881 882 /// The first subscribe issued via handler should return SubRequestId(0). 883 #[test] 884 fn outbox_session_handler_subscribe_returns_id() { 885 let mut pool = OutboxPool::default(); 886 let wakeup = MockWakeup::default(); 887 888 let id = { 889 let mut handler = pool.start_session(wakeup); 890 handler.subscribe(trivial_filter(), RelayUrlPkgs::new(HashSet::new())) 891 }; 892 893 assert_eq!(id, OutboxSubId(0)); 894 } 895 896 /// Separate sessions should continue incrementing subscription IDs globally. 897 #[test] 898 fn outbox_session_handler_multiple_subscribes_unique_ids() { 899 let mut pool = OutboxPool::default(); 900 let wakeup = MockWakeup::default(); 901 902 let id1 = { 903 let mut handler = pool.start_session(wakeup.clone()); 904 handler.subscribe(trivial_filter(), RelayUrlPkgs::new(HashSet::new())) 905 }; 906 907 let id2 = { 908 let mut handler = pool.start_session(wakeup); 909 handler.subscribe(trivial_filter(), RelayUrlPkgs::new(HashSet::new())) 910 }; 911 912 assert_ne!(id1, id2); 913 assert_eq!(id1, OutboxSubId(0)); 914 assert_eq!(id2, OutboxSubId(1)); 915 } 916 917 /// Exporting/importing a session should carry over any pending tasks intact. 918 #[test] 919 fn outbox_session_handler_export_and_import() { 920 let mut pool = OutboxPool::default(); 921 let wakeup = MockWakeup::default(); 922 923 // Create a handler and export its session 924 let handler = pool.start_session(wakeup.clone()); 925 let session = handler.export(); 926 927 // Should be empty since we didn't do anything 928 assert!(session.tasks.is_empty()); 929 930 // Import the session back 931 let _handler = OutboxSessionHandler::import(&mut pool, session, wakeup); 932 } 933 934 // ==================== get_session tests ==================== 935 936 /// get_session should create a new coordination entry when missing. 937 #[test] 938 fn get_session_creates_new_if_missing() { 939 let mut map: HashMap<NormRelayUrl, CoordinationSession> = HashMap::new(); 940 let url = NormRelayUrl::new("wss://relay.example.com").unwrap(); 941 942 let _session = get_session(&mut map, &url); 943 944 // Should have created a new session 945 assert!(map.contains_key(&url)); 946 } 947 948 /// get_session returns the pre-existing coordination session. 949 #[test] 950 fn get_session_returns_existing() { 951 let mut map: HashMap<NormRelayUrl, CoordinationSession> = HashMap::new(); 952 let url = NormRelayUrl::new("wss://relay.example.com").unwrap(); 953 954 let session = get_session(&mut map, &url); 955 session.subscribe(OutboxSubId(0), false); 956 957 // Map should still have exactly one entry 958 assert_eq!(map.len(), 1); 959 } 960 }