notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

mod.rs (33965B)


      1 use hashbrown::{hash_map::RawEntryMut, HashMap, HashSet};
      2 use nostrdb::{Filter, Note};
      3 use std::{
      4     collections::{hash_map::DefaultHasher, BTreeMap},
      5     hash::{Hash, Hasher},
      6     time::{Duration, Instant, SystemTime, UNIX_EPOCH},
      7 };
      8 
      9 use crate::{
     10     relay::{
     11         coordinator::{CoordinationData, CoordinationSession, EoseIds},
     12         websocket::WebsocketRelay,
     13         ModifyTask, MulticastRelayCache, NormRelayUrl, OutboxSubId, OutboxSubscriptions,
     14         OutboxTask, RawEventData, RelayId, RelayLimitations, RelayReqStatus, RelayStatus,
     15         RelayType,
     16     },
     17     EventClientMessage, Wakeup, WebsocketConn,
     18 };
     19 
     20 mod handler;
     21 mod session;
     22 
     23 pub use handler::OutboxSessionHandler;
     24 pub use session::OutboxSession;
     25 
     26 const KEEPALIVE_PING_RATE: Duration = Duration::from_secs(45);
     27 const MAX_RECONNECT_DELAY: Duration = Duration::from_secs(30 * 60); // 30 minutes
     28 
     29 /// Computes the deterministic base delay for a given attempt number.
     30 /// Formula: `5s * 2^attempt`, capped at [`MAX_RECONNECT_DELAY`].
     31 fn base_reconnect_delay(attempt: u32) -> Duration {
     32     let secs = 5u64.checked_shl(attempt).unwrap_or(u64::MAX);
     33     Duration::from_secs(secs).min(MAX_RECONNECT_DELAY)
     34 }
     35 
     36 fn reconnect_jitter_seed(relay_url: &nostr::RelayUrl, attempt: u32) -> u64 {
     37     let now_nanos = SystemTime::now()
     38         .duration_since(UNIX_EPOCH)
     39         .unwrap_or_default()
     40         .as_nanos() as u64;
     41     let mut hasher = DefaultHasher::new();
     42     relay_url.hash(&mut hasher);
     43     attempt.hash(&mut hasher);
     44     now_nanos.hash(&mut hasher);
     45     hasher.finish()
     46 }
     47 
     48 /// Returns the reconnect delay for the given attempt count.
     49 ///
     50 /// Uses the exponential base delay as the primary component and adds up to 25%
     51 /// additive jitter (via relay/time mixed seed) to spread out simultaneous
     52 /// reconnects without undermining the exponential delay itself.
     53 fn next_reconnect_duration(attempt: u32, jitter_seed: u64) -> Duration {
     54     let base = base_reconnect_delay(attempt);
     55     let jitter_ceiling = base / 4;
     56     let jitter = if jitter_ceiling.is_zero() {
     57         Duration::ZERO
     58     } else {
     59         let jitter_ceiling_nanos = jitter_ceiling.as_nanos() as u64;
     60         Duration::from_nanos(jitter_seed % jitter_ceiling_nanos)
     61     };
     62     (base + jitter).min(MAX_RECONNECT_DELAY)
     63 }
     64 
     65 /// OutboxPool owns the active relay coordinators and applies staged subscription
     66 /// mutations to them each frame.
     67 pub struct OutboxPool {
     68     registry: SubRegistry,
     69     relays: HashMap<NormRelayUrl, CoordinationData>,
     70     subs: OutboxSubscriptions,
     71     multicast: MulticastRelayCache,
     72 }
     73 
     74 impl Default for OutboxPool {
     75     fn default() -> Self {
     76         Self {
     77             registry: SubRegistry { next_request_id: 0 },
     78             relays: HashMap::new(),
     79             multicast: Default::default(),
     80             subs: Default::default(),
     81         }
     82     }
     83 }
     84 
     85 impl OutboxPool {
     86     fn remove_completed_oneshots(&mut self, ids: HashSet<OutboxSubId>) {
     87         for id in ids {
     88             if self.all_have_eose(&id) {
     89                 self.subs.remove(&id);
     90             }
     91         }
     92     }
     93 
     94     #[profiling::function]
     95     fn ingest_session<W>(&mut self, session: OutboxSession, wakeup: &W)
     96     where
     97         W: Wakeup,
     98     {
     99         let sessions = self.collect_sessions(session);
    100         let mut pending_eose_ids = EoseIds::default();
    101 
    102         // Process relays with sessions
    103         let sessions_keys = if sessions.is_empty() {
    104             HashSet::new()
    105         } else {
    106             let sessions_keys: HashSet<NormRelayUrl> = sessions.keys().cloned().collect();
    107             let session_eose_ids = self.process_sessions(sessions, wakeup);
    108             pending_eose_ids.absorb(session_eose_ids);
    109             sessions_keys
    110         };
    111 
    112         // Also process EOSE for relays that have pending EOSE but no session
    113         // tasks. We only remove oneshots after all relay legs have reached EOSE.
    114         let mut eose_state = EoseState {
    115             relays: &mut self.relays,
    116             subs: &mut self.subs,
    117         };
    118         let extra_eose_ids =
    119             process_pending_eose_for_non_session_relays(&mut eose_state, &sessions_keys);
    120         pending_eose_ids.absorb(extra_eose_ids);
    121 
    122         optimize_since_for_fully_eosed_subs(&mut eose_state, pending_eose_ids.normal);
    123         self.remove_completed_oneshots(pending_eose_ids.oneshots);
    124     }
    125 
    126     /// Translates a session's queued tasks into per-relay coordination sessions.
    127     #[profiling::function]
    128     fn collect_sessions(
    129         &mut self,
    130         session: OutboxSession,
    131     ) -> HashMap<NormRelayUrl, CoordinationSession> {
    132         if session.tasks.is_empty() {
    133             return HashMap::new();
    134         }
    135 
    136         let mut sessions: HashMap<NormRelayUrl, CoordinationSession> = HashMap::new();
    137         'a: for (id, task) in session.tasks {
    138             match task {
    139                 OutboxTask::Modify(modify) => 's: {
    140                     let Some(sub) = self.subs.get_mut(&id) else {
    141                         continue 'a;
    142                     };
    143 
    144                     match &modify {
    145                         ModifyTask::Filters(_) => {
    146                             for relay in &sub.relays {
    147                                 get_session(&mut sessions, relay)
    148                                     .subscribe(id, sub.relay_type == RelayType::Transparent);
    149                             }
    150                         }
    151                         ModifyTask::Relays(modify_relays_task) => {
    152                             let relays_to_remove = sub.relays.difference(&modify_relays_task.0);
    153                             for relay in relays_to_remove {
    154                                 get_session(&mut sessions, relay).unsubscribe(id);
    155                             }
    156 
    157                             let relays_to_add = modify_relays_task.0.difference(&sub.relays);
    158                             for relay in relays_to_add {
    159                                 get_session(&mut sessions, relay)
    160                                     .subscribe(id, sub.relay_type == RelayType::Transparent);
    161                             }
    162                         }
    163                         ModifyTask::Full(full_modification_task) => {
    164                             let prev_relays = &sub.relays;
    165                             let new_relays = &full_modification_task.relays;
    166                             let relays_to_remove = prev_relays.difference(new_relays);
    167                             for relay in relays_to_remove {
    168                                 get_session(&mut sessions, relay).unsubscribe(id);
    169                             }
    170 
    171                             if new_relays.is_empty() {
    172                                 self.subs.remove(&id);
    173                                 break 's;
    174                             }
    175 
    176                             for relay in new_relays {
    177                                 get_session(&mut sessions, relay)
    178                                     .subscribe(id, sub.relay_type == RelayType::Transparent);
    179                             }
    180                         }
    181                     }
    182 
    183                     sub.ingest_task(modify);
    184                 }
    185                 OutboxTask::Unsubscribe => {
    186                     let Some(sub) = self.subs.get_mut(&id) else {
    187                         continue 'a;
    188                     };
    189 
    190                     for relay_id in &sub.relays {
    191                         get_session(&mut sessions, relay_id).unsubscribe(id);
    192                     }
    193 
    194                     self.subs.remove(&id);
    195                 }
    196                 OutboxTask::Oneshot(subscribe) => {
    197                     for relay in &subscribe.relays.urls {
    198                         get_session(&mut sessions, relay)
    199                             .subscribe(id, subscribe.relays.use_transparent);
    200                     }
    201                     self.subs.new_subscription(id, subscribe, true);
    202                 }
    203                 OutboxTask::Subscribe(subscribe) => {
    204                     for relay in &subscribe.relays.urls {
    205                         get_session(&mut sessions, relay)
    206                             .subscribe(id, subscribe.relays.use_transparent);
    207                     }
    208 
    209                     self.subs.new_subscription(id, subscribe, false);
    210                 }
    211             }
    212         }
    213 
    214         sessions
    215     }
    216 
    217     /// Ensures relay coordinators exist and feed them the coordination sessions.
    218     #[profiling::function]
    219     fn process_sessions<W>(
    220         &mut self,
    221         sessions: HashMap<NormRelayUrl, CoordinationSession>,
    222         wakeup: &W,
    223     ) -> EoseIds
    224     where
    225         W: Wakeup,
    226     {
    227         let mut pending_eoses = EoseIds::default();
    228         for (relay_id, session) in sessions {
    229             let relay = match self.relays.raw_entry_mut().from_key(&relay_id) {
    230                 RawEntryMut::Occupied(e) => 's: {
    231                     let res = e.into_mut();
    232 
    233                     if res.websocket.is_some() {
    234                         break 's res;
    235                     }
    236 
    237                     let Ok(websocket) = WebsocketConn::from_wakeup(relay_id.into(), wakeup.clone())
    238                     else {
    239                         // still can't generate websocket
    240                         break 's res;
    241                     };
    242 
    243                     res.websocket = Some(WebsocketRelay::new(websocket));
    244 
    245                     res
    246                 }
    247                 RawEntryMut::Vacant(e) => {
    248                     let coordinator = build_relay(relay_id.clone(), wakeup.clone());
    249                     let (_, res) = e.insert(relay_id, coordinator);
    250                     res
    251                 }
    252             };
    253             let eose_ids = relay.ingest_session(&self.subs, session);
    254 
    255             pending_eoses.absorb(eose_ids);
    256         }
    257 
    258         pending_eoses
    259     }
    260 
    261     pub fn start_session<'a, W>(&'a mut self, wakeup: W) -> OutboxSessionHandler<'a, W>
    262     where
    263         W: Wakeup,
    264     {
    265         OutboxSessionHandler {
    266             outbox: self,
    267             session: OutboxSession::default(),
    268             wakeup,
    269         }
    270     }
    271 
    272     pub fn broadcast_note<W>(&mut self, note: &Note, relays: Vec<RelayId>, wakeup: &W)
    273     where
    274         W: Wakeup,
    275     {
    276         for relay_id in relays {
    277             let Ok(msg) = EventClientMessage::try_from(note) else {
    278                 continue;
    279             };
    280             match relay_id {
    281                 RelayId::Websocket(norm_relay_url) => {
    282                     let rel = self.ensure_relay(&norm_relay_url, wakeup);
    283                     rel.send_event(msg);
    284                 }
    285                 RelayId::Multicast => {
    286                     if !self.multicast.is_setup() {
    287                         self.multicast.try_setup(wakeup);
    288                     };
    289 
    290                     self.multicast.broadcast(msg);
    291                 }
    292             }
    293         }
    294     }
    295 
    296     #[profiling::function]
    297     pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) {
    298         for relay in self.relays.values_mut() {
    299             let now = Instant::now();
    300 
    301             let Some(websocket) = &mut relay.websocket else {
    302                 continue;
    303             };
    304 
    305             match websocket.conn.status {
    306                 RelayStatus::Disconnected => {
    307                     let reconnect_at =
    308                         websocket.last_connect_attempt + websocket.retry_connect_after;
    309                     if now > reconnect_at {
    310                         websocket.last_connect_attempt = now;
    311                         websocket.reconnect_attempt = websocket.reconnect_attempt.saturating_add(1);
    312                         let jitter_seed =
    313                             reconnect_jitter_seed(&websocket.conn.url, websocket.reconnect_attempt);
    314                         let next_duration =
    315                             next_reconnect_duration(websocket.reconnect_attempt, jitter_seed);
    316                         tracing::debug!(
    317                             "reconnect attempt {}, backing off for {:?}",
    318                             websocket.reconnect_attempt,
    319                             next_duration,
    320                         );
    321                         websocket.retry_connect_after = next_duration;
    322                         if let Err(err) = websocket.conn.connect(wakeup.clone()) {
    323                             tracing::error!("error connecting to relay: {}", err);
    324                         }
    325                     }
    326                 }
    327                 RelayStatus::Connected => {
    328                     websocket.reconnect_attempt = 0;
    329                     websocket.retry_connect_after = WebsocketRelay::initial_reconnect_duration();
    330 
    331                     let should_ping = now - websocket.last_ping > KEEPALIVE_PING_RATE;
    332                     if should_ping {
    333                         tracing::trace!("pinging {}", websocket.conn.url);
    334                         websocket.conn.ping();
    335                         websocket.last_ping = Instant::now();
    336                     }
    337                 }
    338                 RelayStatus::Connecting => {}
    339             }
    340         }
    341     }
    342 
    343     fn ensure_relay<W>(&mut self, relay_id: &NormRelayUrl, wakeup: &W) -> &mut CoordinationData
    344     where
    345         W: Wakeup,
    346     {
    347         match self.relays.raw_entry_mut().from_key(relay_id) {
    348             RawEntryMut::Occupied(entry) => entry.into_mut(),
    349             RawEntryMut::Vacant(entry) => {
    350                 let (_, res) = entry.insert(
    351                     relay_id.clone(),
    352                     build_relay(relay_id.clone(), wakeup.clone()),
    353                 );
    354                 res
    355             }
    356         }
    357     }
    358 
    359     pub fn status(&self, id: &OutboxSubId) -> HashMap<&NormRelayUrl, RelayReqStatus> {
    360         let mut status = HashMap::new();
    361         for (url, relay) in &self.relays {
    362             let Some(res) = relay.req_status(id) else {
    363                 continue;
    364             };
    365             status.insert(url, res);
    366         }
    367 
    368         status
    369     }
    370 
    371     pub fn websocket_statuses(&self) -> BTreeMap<&NormRelayUrl, RelayStatus> {
    372         let mut status = BTreeMap::new();
    373 
    374         for (url, relay) in &self.relays {
    375             let relay_status = if let Some(websocket) = &relay.websocket {
    376                 websocket.conn.status
    377             } else {
    378                 RelayStatus::Disconnected
    379             };
    380             status.insert(url, relay_status);
    381         }
    382 
    383         status
    384     }
    385 
    386     pub fn has_eose(&self, id: &OutboxSubId) -> bool {
    387         for relay in self.relays.values() {
    388             if relay.req_status(id) == Some(RelayReqStatus::Eose) {
    389                 return true;
    390             }
    391         }
    392 
    393         false
    394     }
    395 
    396     pub fn all_have_eose(&self, id: &OutboxSubId) -> bool {
    397         for relay in self.relays.values() {
    398             let Some(status) = relay.req_status(id) else {
    399                 continue;
    400             };
    401             if status != RelayReqStatus::Eose {
    402                 return false;
    403             }
    404         }
    405 
    406         true
    407     }
    408 
    409     /// Returns a clone of the filters for the given subscription ID.
    410     pub fn filters(&self, id: &OutboxSubId) -> Option<&Vec<Filter>> {
    411         self.subs.view(id).map(|v| v.filters.get_filters())
    412     }
    413 
    414     #[profiling::function]
    415     pub fn try_recv<F>(&mut self, mut max_notes: usize, mut process: F)
    416     where
    417         for<'a> F: FnMut(RawEventData<'a>),
    418     {
    419         's: while max_notes > 0 {
    420             let mut received_any = false;
    421 
    422             for relay in self.relays.values_mut() {
    423                 let resp = relay.try_recv(&self.subs, &mut process);
    424 
    425                 if !resp.received_event {
    426                     continue;
    427                 }
    428 
    429                 received_any = true;
    430 
    431                 if resp.event_was_nostr_note {
    432                     max_notes = max_notes.saturating_sub(1);
    433                     if max_notes == 0 {
    434                         break 's;
    435                     }
    436                 }
    437             }
    438 
    439             if !received_any {
    440                 break;
    441             }
    442         }
    443 
    444         self.multicast.try_recv(process);
    445     }
    446 }
    447 
    448 struct EoseState<'a> {
    449     relays: &'a mut HashMap<NormRelayUrl, CoordinationData>,
    450     subs: &'a mut OutboxSubscriptions,
    451 }
    452 
    453 fn unix_now_secs() -> Option<u64> {
    454     SystemTime::now()
    455         .duration_since(UNIX_EPOCH)
    456         .ok()
    457         .map(|d| d.as_secs())
    458 }
    459 
    460 fn sub_all_relays_have_eose(state: &EoseState<'_>, id: &OutboxSubId) -> bool {
    461     let Some(sub) = state.subs.get(id) else {
    462         return false;
    463     };
    464     if sub.relays.is_empty() {
    465         return false;
    466     }
    467 
    468     for relay_id in &sub.relays {
    469         let Some(relay) = state.relays.get(relay_id) else {
    470             return false;
    471         };
    472         if relay.req_status(id) != Some(RelayReqStatus::Eose) {
    473             return false;
    474         }
    475     }
    476 
    477     true
    478 }
    479 
    480 fn optimize_since_for_fully_eosed_subs(state: &mut EoseState<'_>, ids: HashSet<OutboxSubId>) {
    481     let Some(now) = unix_now_secs() else {
    482         return;
    483     };
    484 
    485     for id in ids {
    486         // Since optimization is only safe after every relay leg for this
    487         // subscription has reached EOSE at least once.
    488         if !sub_all_relays_have_eose(state, &id) {
    489             continue;
    490         }
    491 
    492         if let Some(sub) = state.subs.get_mut(&id) {
    493             sub.see_all(now);
    494             sub.filters.since_optimize();
    495         }
    496     }
    497 }
    498 
    499 fn process_pending_eose_for_non_session_relays(
    500     state: &mut EoseState<'_>,
    501     sessions_keys: &HashSet<NormRelayUrl>,
    502 ) -> EoseIds {
    503     let mut pending_eoses = EoseIds::default();
    504 
    505     for (relay_id, relay) in state.relays.iter_mut() {
    506         if sessions_keys.contains(relay_id) {
    507             continue;
    508         }
    509 
    510         let eose_ids = relay.ingest_session(state.subs, CoordinationSession::default());
    511         pending_eoses.absorb(eose_ids);
    512     }
    513 
    514     pending_eoses
    515 }
    516 
    517 struct SubRegistry {
    518     next_request_id: u64,
    519 }
    520 
    521 impl SubRegistry {
    522     pub fn next(&mut self) -> OutboxSubId {
    523         let i = self.next_request_id;
    524         self.next_request_id += 1;
    525         OutboxSubId(i)
    526     }
    527 }
    528 
    529 pub fn get_session<'a>(
    530     map: &'a mut HashMap<NormRelayUrl, CoordinationSession>,
    531     id: &NormRelayUrl,
    532 ) -> &'a mut CoordinationSession {
    533     match map.raw_entry_mut().from_key(id) {
    534         RawEntryMut::Occupied(e) => e.into_mut(),
    535         RawEntryMut::Vacant(e) => {
    536             let session = CoordinationSession::default();
    537             let (_, res) = e.insert(id.clone(), session);
    538             res
    539         }
    540     }
    541 }
    542 
    543 fn build_relay<W>(relay_id: NormRelayUrl, wakeup: W) -> CoordinationData
    544 where
    545     W: Wakeup,
    546 {
    547     CoordinationData::new(
    548         RelayLimitations::default(), // TODO(kernelkind): add actual limitations
    549         relay_id,
    550         wakeup,
    551     )
    552 }
    553 
    554 #[cfg(test)]
    555 mod tests {
    556     use hashbrown::HashSet;
    557     use nostrdb::Filter;
    558 
    559     use super::*;
    560     use crate::relay::{
    561         coordinator::CoordinationTask,
    562         test_utils::{filters_json, trivial_filter, MockWakeup},
    563         RelayUrlPkgs,
    564     };
    565 
    566     /// Ensures the subscription registry always yields unique IDs.
    567     #[test]
    568     fn registry_generates_unique_ids() {
    569         let mut registry = SubRegistry { next_request_id: 0 };
    570 
    571         let id1 = registry.next();
    572         let id2 = registry.next();
    573         let id3 = registry.next();
    574 
    575         assert_ne!(id1, id2);
    576         assert_ne!(id2, id3);
    577         assert_ne!(id1, id3);
    578     }
    579 
    580     // ==================== OutboxPool tests ====================
    581 
    582     /// Default pool has no relays or subscriptions.
    583     #[test]
    584     fn outbox_pool_default_empty() {
    585         let pool = OutboxPool::default();
    586         assert!(pool.relays.is_empty());
    587         // Verify no subscriptions by checking that a lookup returns empty status
    588         assert!(pool.status(&OutboxSubId(0)).is_empty());
    589     }
    590 
    591     /// has_eose returns false when no relays are tracking the request.
    592     #[test]
    593     fn outbox_pool_has_eose_false_when_empty() {
    594         let pool = OutboxPool::default();
    595         assert!(!pool.has_eose(&OutboxSubId(0)));
    596     }
    597 
    598     /// status() returns empty map for unknown request IDs.
    599     #[test]
    600     fn outbox_pool_status_empty_for_unknown() {
    601         let pool = OutboxPool::default();
    602         let status = pool.status(&OutboxSubId(999));
    603         assert!(status.is_empty());
    604     }
    605 
    606     /// websocket_statuses() is empty before any relays connect.
    607     #[test]
    608     fn outbox_pool_websocket_statuses_empty_initially() {
    609         let pool = OutboxPool::default();
    610         let statuses = pool.websocket_statuses();
    611         assert!(statuses.is_empty());
    612     }
    613 
    614     /// Full modifications should unsubscribe old relays and resubscribe new ones using the updated filters.
    615     #[test]
    616     fn full_modification_updates_sessions_with_new_filters() {
    617         let mut pool = OutboxPool::default();
    618         let wakeup = MockWakeup::default();
    619         let relay_a = NormRelayUrl::new("wss://relay-a.example.com").unwrap();
    620         let relay_b = NormRelayUrl::new("wss://relay-b.example.com").unwrap();
    621 
    622         let mut urls = HashSet::new();
    623         urls.insert(relay_a.clone());
    624         let new_sub_id = {
    625             let mut handler = pool.start_session(wakeup.clone());
    626             handler.subscribe(trivial_filter(), RelayUrlPkgs::new(urls))
    627         };
    628 
    629         {
    630             let sub = pool
    631                 .subs
    632                 .get_mut(&new_sub_id)
    633                 .expect("subscription should be registered");
    634             assert_eq!(sub.relays.len(), 1);
    635             assert!(sub.relays.contains(&relay_a));
    636             assert!(!sub.is_oneshot);
    637             assert_eq!(sub.relay_type, RelayType::Compaction);
    638         }
    639 
    640         let sessions = {
    641             let mut updated_relays = HashSet::new();
    642             updated_relays.insert(relay_b.clone());
    643 
    644             let mut handler = pool.start_session(wakeup);
    645             handler.modify_filters(
    646                 new_sub_id,
    647                 vec![Filter::new().kinds(vec![3]).limit(1).build()],
    648             );
    649             handler.modify_relays(new_sub_id, updated_relays);
    650             let session = handler.export();
    651             pool.collect_sessions(session)
    652         };
    653 
    654         let old_task = sessions
    655             .get(&relay_a)
    656             .and_then(|session| session.tasks.get(&new_sub_id))
    657             .expect("expected a task for relay relay_a");
    658         assert!(matches!(old_task, CoordinationTask::Unsubscribe));
    659 
    660         let new_task = sessions
    661             .get(&relay_b)
    662             .and_then(|session| session.tasks.get(&new_sub_id))
    663             .expect("expected a task for relay relay_b");
    664         assert!(matches!(new_task, CoordinationTask::CompactionSub));
    665     }
    666 
    667     /// Base delay doubles on each attempt until it reaches the configured cap.
    668     #[test]
    669     fn reconnect_base_delay_doubles_with_cap() {
    670         assert_eq!(base_reconnect_delay(0), Duration::from_secs(5));
    671         assert_eq!(base_reconnect_delay(1), Duration::from_secs(10));
    672         assert_eq!(base_reconnect_delay(2), Duration::from_secs(20));
    673         assert_eq!(base_reconnect_delay(3), Duration::from_secs(40));
    674         assert_eq!(base_reconnect_delay(4), Duration::from_secs(80));
    675         assert_eq!(base_reconnect_delay(5), Duration::from_secs(160));
    676         assert_eq!(base_reconnect_delay(6), Duration::from_secs(320));
    677         assert_eq!(base_reconnect_delay(7), Duration::from_secs(640));
    678         assert_eq!(base_reconnect_delay(8), Duration::from_secs(1280));
    679         assert_eq!(base_reconnect_delay(9), MAX_RECONNECT_DELAY);
    680         // Saturates at cap for any large attempt count.
    681         assert_eq!(base_reconnect_delay(100), MAX_RECONNECT_DELAY);
    682     }
    683 
    684     /// Jittered delay is always >= the base and never exceeds base * 1.25 or the cap.
    685     #[test]
    686     fn reconnect_jitter_within_bounds() {
    687         for attempt in [0u32, 1, 3, 8, 9, 50, 100] {
    688             let base = base_reconnect_delay(attempt);
    689             let max_with_jitter = (base + (base / 4)).min(MAX_RECONNECT_DELAY);
    690             for sample in 0u64..20 {
    691                 let jittered = next_reconnect_duration(attempt, 0xBAD5EED ^ sample);
    692                 assert!(
    693                     jittered >= base,
    694                     "jittered {jittered:?} < base {base:?} at attempt {attempt}"
    695                 );
    696                 assert!(
    697                     jittered <= max_with_jitter,
    698                     "jittered {jittered:?} exceeds max-with-jitter {max_with_jitter:?} at attempt {attempt}"
    699                 );
    700             }
    701         }
    702     }
    703 
    704     /// Oneshot requests route to compaction mode by default.
    705     #[test]
    706     fn oneshot_routes_to_compaction() {
    707         let mut pool = OutboxPool::default();
    708         let relay = NormRelayUrl::new("wss://relay-oneshot.example.com").unwrap();
    709         let mut relays = HashSet::new();
    710         relays.insert(relay.clone());
    711         let filters = vec![Filter::new().kinds(vec![1]).limit(2).build()];
    712         let id = OutboxSubId(42);
    713 
    714         let mut session = OutboxSession::default();
    715         session.oneshot(id, filters.clone(), RelayUrlPkgs::new(relays));
    716 
    717         let sessions = pool.collect_sessions(session);
    718 
    719         let relay_task = sessions
    720             .get(&relay)
    721             .and_then(|session| session.tasks.get(&id))
    722             .expect("expected task for oneshot relay");
    723         assert!(matches!(relay_task, CoordinationTask::CompactionSub));
    724     }
    725 
    726     /// Unsubscribing from a multi-relay subscription emits unsubscribe tasks for each relay.
    727     #[test]
    728     fn unsubscribe_targets_all_relays() {
    729         let mut pool = OutboxPool::default();
    730         let relay_a = NormRelayUrl::new("wss://relay-a.example.com").unwrap();
    731         let relay_b = NormRelayUrl::new("wss://relay-b.example.com").unwrap();
    732         let id = OutboxSubId(42);
    733 
    734         // Subscribe to both relays
    735         let mut urls = HashSet::new();
    736         urls.insert(relay_a.clone());
    737         urls.insert(relay_b.clone());
    738 
    739         let mut session = OutboxSession::default();
    740         session.subscribe(id, trivial_filter(), RelayUrlPkgs::new(urls));
    741         pool.collect_sessions(session);
    742 
    743         // Unsubscribe
    744         let mut session = OutboxSession::default();
    745         session.unsubscribe(id);
    746         let sessions = pool.collect_sessions(session);
    747 
    748         // Both relays should receive unsubscribe tasks
    749         let task_a = sessions.get(&relay_a).and_then(|s| s.tasks.get(&id));
    750         let task_b = sessions.get(&relay_b).and_then(|s| s.tasks.get(&id));
    751 
    752         assert!(matches!(task_a, Some(CoordinationTask::Unsubscribe)));
    753         assert!(matches!(task_b, Some(CoordinationTask::Unsubscribe)));
    754     }
    755 
    756     /// Subscriptions with use_transparent=true route to transparent mode.
    757     #[test]
    758     fn subscribe_transparent_mode() {
    759         let mut pool = OutboxPool::default();
    760         let relay = NormRelayUrl::new("wss://relay-transparent.example.com").unwrap();
    761         let id = OutboxSubId(5);
    762 
    763         let mut urls = HashSet::new();
    764         urls.insert(relay.clone());
    765         let mut pkgs = RelayUrlPkgs::new(urls);
    766         pkgs.use_transparent = true;
    767 
    768         let mut session = OutboxSession::default();
    769         session.subscribe(id, trivial_filter(), pkgs);
    770         let sessions = pool.collect_sessions(session);
    771 
    772         let task = sessions.get(&relay).and_then(|s| s.tasks.get(&id));
    773         assert!(matches!(task, Some(CoordinationTask::TransparentSub)));
    774     }
    775 
    776     /// Modifying filters should re-subscribe the routed relays with the new filters.
    777     #[test]
    778     fn modify_filters_reissues_subscribe_for_existing_relays() {
    779         let mut pool = OutboxPool::default();
    780         let wakeup = MockWakeup::default();
    781         let relay = NormRelayUrl::new("wss://relay-modify.example.com").unwrap();
    782 
    783         let mut urls = HashSet::new();
    784         urls.insert(relay.clone());
    785         let sub_id = {
    786             let mut handler = pool.start_session(wakeup.clone());
    787             handler.subscribe(trivial_filter(), RelayUrlPkgs::new(urls))
    788         };
    789 
    790         let (sessions, expected_json) = {
    791             let mut handler = pool.start_session(wakeup);
    792             let updated_filters = vec![Filter::new().kinds(vec![7]).limit(2).build()];
    793             let expected_json = filters_json(&updated_filters);
    794             handler.modify_filters(sub_id, updated_filters);
    795             let session = handler.export();
    796             (pool.collect_sessions(session), expected_json)
    797         };
    798 
    799         let view = pool.subs.view(&sub_id).expect("updated subscription view");
    800         let stored_json = filters_json(view.filters.get_filters());
    801         assert_eq!(stored_json, expected_json);
    802 
    803         let task = sessions
    804             .get(&relay)
    805             .and_then(|session| session.tasks.get(&sub_id))
    806             .expect("expected coordination task");
    807         assert!(matches!(task, CoordinationTask::CompactionSub));
    808     }
    809 
    810     /// Modifying relays should unsubscribe removed relays and subscribe new ones.
    811     #[test]
    812     fn modify_relays_differs_routing_sets() {
    813         let mut pool = OutboxPool::default();
    814         let wakeup = MockWakeup::default();
    815         let relay_a = NormRelayUrl::new("wss://relay-diff-a.example.com").unwrap();
    816         let relay_b = NormRelayUrl::new("wss://relay-diff-b.example.com").unwrap();
    817 
    818         let mut urls = HashSet::new();
    819         urls.insert(relay_a.clone());
    820         let sub_id = {
    821             let mut handler = pool.start_session(wakeup.clone());
    822             handler.subscribe(trivial_filter(), RelayUrlPkgs::new(urls))
    823         };
    824 
    825         let sessions = {
    826             let mut handler = pool.start_session(wakeup);
    827             let mut new_urls = HashSet::new();
    828             new_urls.insert(relay_b.clone());
    829             handler.modify_relays(sub_id, new_urls);
    830             let session = handler.export();
    831             pool.collect_sessions(session)
    832         };
    833 
    834         let unsub_task = sessions
    835             .get(&relay_a)
    836             .and_then(|session| session.tasks.get(&sub_id))
    837             .expect("missing relay_a task");
    838         assert!(matches!(unsub_task, CoordinationTask::Unsubscribe));
    839 
    840         let sub_task = sessions
    841             .get(&relay_b)
    842             .and_then(|session| session.tasks.get(&sub_id))
    843             .expect("missing relay_b task");
    844         assert!(matches!(sub_task, CoordinationTask::CompactionSub));
    845     }
    846 
    847     /// Full modifications that end up with no relays should drop the subscription entirely.
    848     #[test]
    849     fn modify_full_with_empty_relays_removes_subscription() {
    850         let mut pool = OutboxPool::default();
    851         let wakeup = MockWakeup::default();
    852         let relay = NormRelayUrl::new("wss://relay-empty.example.com").unwrap();
    853 
    854         let mut urls = HashSet::new();
    855         urls.insert(relay.clone());
    856         let sub_id = {
    857             let mut handler = pool.start_session(wakeup.clone());
    858             handler.subscribe(trivial_filter(), RelayUrlPkgs::new(urls))
    859         };
    860 
    861         let sessions = {
    862             let mut handler = pool.start_session(wakeup);
    863             handler.modify_filters(sub_id, vec![Filter::new().kinds(vec![9]).limit(1).build()]);
    864             handler.modify_relays(sub_id, HashSet::new());
    865             let session = handler.export();
    866             pool.collect_sessions(session)
    867         };
    868 
    869         let task = sessions
    870             .get(&relay)
    871             .and_then(|session| session.tasks.get(&sub_id))
    872             .expect("expected unsubscribe for relay");
    873         assert!(matches!(task, CoordinationTask::Unsubscribe));
    874         assert!(
    875             pool.subs.get_mut(&sub_id).is_none(),
    876             "subscription metadata should be removed"
    877         );
    878     }
    879 
    880     // ==================== OutboxSessionHandler tests ====================
    881 
    882     /// The first subscribe issued via handler should return SubRequestId(0).
    883     #[test]
    884     fn outbox_session_handler_subscribe_returns_id() {
    885         let mut pool = OutboxPool::default();
    886         let wakeup = MockWakeup::default();
    887 
    888         let id = {
    889             let mut handler = pool.start_session(wakeup);
    890             handler.subscribe(trivial_filter(), RelayUrlPkgs::new(HashSet::new()))
    891         };
    892 
    893         assert_eq!(id, OutboxSubId(0));
    894     }
    895 
    896     /// Separate sessions should continue incrementing subscription IDs globally.
    897     #[test]
    898     fn outbox_session_handler_multiple_subscribes_unique_ids() {
    899         let mut pool = OutboxPool::default();
    900         let wakeup = MockWakeup::default();
    901 
    902         let id1 = {
    903             let mut handler = pool.start_session(wakeup.clone());
    904             handler.subscribe(trivial_filter(), RelayUrlPkgs::new(HashSet::new()))
    905         };
    906 
    907         let id2 = {
    908             let mut handler = pool.start_session(wakeup);
    909             handler.subscribe(trivial_filter(), RelayUrlPkgs::new(HashSet::new()))
    910         };
    911 
    912         assert_ne!(id1, id2);
    913         assert_eq!(id1, OutboxSubId(0));
    914         assert_eq!(id2, OutboxSubId(1));
    915     }
    916 
    917     /// Exporting/importing a session should carry over any pending tasks intact.
    918     #[test]
    919     fn outbox_session_handler_export_and_import() {
    920         let mut pool = OutboxPool::default();
    921         let wakeup = MockWakeup::default();
    922 
    923         // Create a handler and export its session
    924         let handler = pool.start_session(wakeup.clone());
    925         let session = handler.export();
    926 
    927         // Should be empty since we didn't do anything
    928         assert!(session.tasks.is_empty());
    929 
    930         // Import the session back
    931         let _handler = OutboxSessionHandler::import(&mut pool, session, wakeup);
    932     }
    933 
    934     // ==================== get_session tests ====================
    935 
    936     /// get_session should create a new coordination entry when missing.
    937     #[test]
    938     fn get_session_creates_new_if_missing() {
    939         let mut map: HashMap<NormRelayUrl, CoordinationSession> = HashMap::new();
    940         let url = NormRelayUrl::new("wss://relay.example.com").unwrap();
    941 
    942         let _session = get_session(&mut map, &url);
    943 
    944         // Should have created a new session
    945         assert!(map.contains_key(&url));
    946     }
    947 
    948     /// get_session returns the pre-existing coordination session.
    949     #[test]
    950     fn get_session_returns_existing() {
    951         let mut map: HashMap<NormRelayUrl, CoordinationSession> = HashMap::new();
    952         let url = NormRelayUrl::new("wss://relay.example.com").unwrap();
    953 
    954         let session = get_session(&mut map, &url);
    955         session.subscribe(OutboxSubId(0), false);
    956 
    957         // Map should still have exactly one entry
    958         assert_eq!(map.len(), 1);
    959     }
    960 }