notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

coordinator.rs (19027B)


      1 use ewebsock::{WsEvent, WsMessage};
      2 use hashbrown::{HashMap, HashSet};
      3 
      4 use crate::{
      5     relay::{
      6         compaction::{CompactionData, CompactionRelay, CompactionSession},
      7         transparent::{revocate_transparent_subs, TransparentData, TransparentRelay},
      8         BroadcastCache, BroadcastRelay, NormRelayUrl, OutboxSubId, OutboxSubscriptions,
      9         RawEventData, RelayCoordinatorLimits, RelayImplType, RelayLimitations, RelayReqId,
     10         RelayReqStatus, RelayType, SubPassGuardian, SubPassRevocation, WebsocketRelay,
     11     },
     12     EventClientMessage, RelayMessage, RelayStatus, Wakeup, WebsocketConn,
     13 };
     14 
     15 /// RelayCoordinator routes each Outbox subscription to either the compaction or
     16 /// transparent relay engine and tracks their status.
     17 pub struct CoordinationData {
     18     limits: RelayCoordinatorLimits,
     19     pub(crate) websocket: Option<WebsocketRelay>,
     20     coordination: HashMap<OutboxSubId, RelayType>,
     21     compaction_data: CompactionData,
     22     transparent_data: TransparentData, // for outbox subs that prefer to be transparent
     23     broadcast_cache: BroadcastCache,
     24     eose_queue: Vec<RelayReqId>,
     25 }
     26 
     27 impl CoordinationData {
     28     pub fn new<W>(limits: RelayLimitations, norm_url: NormRelayUrl, wakeup: W) -> Self
     29     where
     30         W: Wakeup,
     31     {
     32         let websocket = match WebsocketConn::from_wakeup(norm_url.clone().into(), wakeup) {
     33             Ok(w) => Some(WebsocketRelay::new(w)),
     34             Err(e) => {
     35                 tracing::error!("could not open websocket to {norm_url:?}: {e}");
     36                 None
     37             }
     38         };
     39         let limits = RelayCoordinatorLimits::new(limits);
     40         let compaction_data = CompactionData::default();
     41         Self {
     42             limits,
     43             websocket,
     44             compaction_data,
     45             transparent_data: TransparentData::default(),
     46             coordination: Default::default(),
     47             broadcast_cache: Default::default(),
     48             eose_queue: Vec::new(),
     49         }
     50     }
     51 
     52     /// Change if we found a new NIP-11 `max_subscriptions`
     53     #[allow(dead_code)]
     54     pub fn set_max_size(&mut self, subs: &OutboxSubscriptions, max_size: usize) {
     55         let Some(revocations) = self.limits.new_total(max_size) else {
     56             return;
     57         };
     58 
     59         let mut trans_left = self.transparent_data.num_subs();
     60         let mut compact_left = self.compaction_data.num_subs();
     61 
     62         let (trans_revocations, compacts_revocations): (
     63             Vec<SubPassRevocation>,
     64             Vec<SubPassRevocation>,
     65         ) = revocations.into_iter().partition(|_| {
     66             let take_trans = (trans_left > compact_left && trans_left > 0) || (compact_left == 0);
     67 
     68             if take_trans {
     69                 trans_left -= 1;
     70             } else {
     71                 compact_left -= 1;
     72             }
     73             take_trans
     74         });
     75 
     76         if !trans_revocations.is_empty() {
     77             revocate_transparent_subs(
     78                 self.websocket.as_mut(),
     79                 &mut self.transparent_data,
     80                 trans_revocations,
     81             );
     82         }
     83 
     84         if !compacts_revocations.is_empty() {
     85             CompactionRelay::new(
     86                 self.websocket.as_mut(),
     87                 &mut self.compaction_data,
     88                 self.limits.max_json_bytes,
     89                 &mut self.limits.sub_guardian,
     90                 subs,
     91             )
     92             .revocate_all(compacts_revocations);
     93         }
     94     }
     95 
     96     #[profiling::function]
     97     pub fn ingest_session(
     98         &mut self,
     99         subs: &OutboxSubscriptions,
    100         session: CoordinationSession,
    101     ) -> EoseIds {
    102         let mut trans_unsubs: HashSet<OutboxSubId> = HashSet::new();
    103         let mut trans = HashSet::new();
    104         let mut compaction_session = CompactionSession::default();
    105         let mut eose_ids = EoseIds::default();
    106 
    107         for (id, task) in session.tasks {
    108             match task {
    109                 CoordinationTask::TransparentSub => {
    110                     if let Some(RelayType::Compaction) = self.coordination.get(&id) {
    111                         compaction_session.unsub(id);
    112                     }
    113                     self.coordination.insert(id, RelayType::Transparent);
    114                     trans.insert(id);
    115                 }
    116                 CoordinationTask::CompactionSub => {
    117                     if let Some(RelayType::Transparent) = self.coordination.get(&id) {
    118                         trans_unsubs.insert(id);
    119                     }
    120                     self.coordination.insert(id, RelayType::Compaction);
    121                     compaction_session.sub(id);
    122                 }
    123                 CoordinationTask::Unsubscribe => {
    124                     let Some(rtype) = self.coordination.remove(&id) else {
    125                         continue;
    126                     };
    127 
    128                     match rtype {
    129                         RelayType::Compaction => {
    130                             compaction_session.unsub(id);
    131                         }
    132                         RelayType::Transparent => {
    133                             trans_unsubs.insert(id);
    134                         }
    135                     }
    136                 }
    137             }
    138         }
    139 
    140         // Drain EOSE queue and collect IDs
    141         for sid in self.eose_queue.drain(..) {
    142             // Try compaction first
    143             let Some(compaction_ids) = self.compaction_data.ids(&sid) else {
    144                 let Some(transparent_id) = self.transparent_data.id(&sid) else {
    145                     continue;
    146                 };
    147 
    148                 if subs.is_oneshot(&transparent_id) {
    149                     trans_unsubs.insert(transparent_id);
    150                     eose_ids.oneshots.insert(transparent_id);
    151                 } else {
    152                     eose_ids.normal.insert(transparent_id);
    153                 }
    154                 continue;
    155             };
    156 
    157             let oneshots = subs.subset_oneshot(compaction_ids);
    158 
    159             for id in compaction_ids {
    160                 if oneshots.contains(id) {
    161                     compaction_session.unsub(*id);
    162                     eose_ids.oneshots.insert(*id);
    163                 } else {
    164                     eose_ids.normal.insert(*id);
    165                 }
    166             }
    167         }
    168 
    169         if !trans_unsubs.is_empty() {
    170             let mut transparent = TransparentRelay::new(
    171                 self.websocket.as_mut(),
    172                 &mut self.transparent_data,
    173                 &mut self.limits.sub_guardian,
    174             );
    175             for unsub in trans_unsubs {
    176                 transparent.unsubscribe(unsub);
    177             }
    178         }
    179 
    180         if !trans.is_empty() {
    181             compaction_session.request_free_subs(trans.len());
    182         }
    183 
    184         if !compaction_session.is_empty() {
    185             CompactionRelay::new(
    186                 self.websocket.as_mut(),
    187                 &mut self.compaction_data,
    188                 self.limits.max_json_bytes,
    189                 &mut self.limits.sub_guardian,
    190                 subs,
    191             )
    192             .ingest_session(compaction_session);
    193         }
    194 
    195         let mut transparent = TransparentRelay::new(
    196             self.websocket.as_mut(),
    197             &mut self.transparent_data,
    198             &mut self.limits.sub_guardian,
    199         );
    200         for id in trans {
    201             let Some(view) = subs.view(&id) else {
    202                 continue;
    203             };
    204             transparent.subscribe(view);
    205         }
    206 
    207         transparent.try_flush_queue(subs);
    208         tracing::trace!(
    209             "Using {} of {} subs",
    210             self.limits.sub_guardian.total_passes() - self.limits.sub_guardian.available_passes(),
    211             self.limits.sub_guardian.total_passes()
    212         );
    213 
    214         eose_ids
    215     }
    216 
    217     pub fn send_event(&mut self, msg: EventClientMessage) {
    218         BroadcastRelay::websocket(self.websocket.as_mut(), &mut self.broadcast_cache)
    219             .broadcast(msg);
    220     }
    221 
    222     #[allow(dead_code)]
    223     pub fn set_req_status(&mut self, sid: &str, status: RelayReqStatus) {
    224         // the compaction & transparent data only act on sids that they already know, so whichever
    225         // this sid belongs to, it'll make it to its rightful home
    226         self.compaction_data.set_req_status(sid, status);
    227         self.transparent_data.set_req_status(sid, status);
    228     }
    229 
    230     pub fn req_status(&self, id: &OutboxSubId) -> Option<RelayReqStatus> {
    231         match self.coordination.get(id)? {
    232             RelayType::Compaction => self.compaction_data.req_status(id),
    233             RelayType::Transparent => self.transparent_data.req_status(id),
    234         }
    235     }
    236 
    237     #[allow(dead_code)]
    238     pub fn has_req_status(&self, id: &OutboxSubId, status: RelayReqStatus) -> bool {
    239         self.req_status(id) == Some(status)
    240     }
    241 
    242     fn url(&self) -> &str {
    243         let Some(websocket) = &self.websocket else {
    244             return "";
    245         };
    246         websocket.conn.url.as_str()
    247     }
    248 
    249     // whether we received
    250     #[profiling::function]
    251     pub(crate) fn try_recv<F>(&mut self, subs: &OutboxSubscriptions, act: &mut F) -> RecvResponse
    252     where
    253         for<'a> F: FnMut(RawEventData<'a>),
    254     {
    255         let Some(websocket) = self.websocket.as_mut() else {
    256             return RecvResponse::default();
    257         };
    258 
    259         let event = {
    260             profiling::scope!("webscket try_recv");
    261 
    262             let Some(event) = websocket.conn.receiver.try_recv() else {
    263                 return RecvResponse::default();
    264             };
    265             event
    266         };
    267 
    268         let msg = match &event {
    269             WsEvent::Opened => {
    270                 websocket.conn.set_status(RelayStatus::Connected);
    271                 websocket.reconnect_attempt = 0;
    272                 websocket.retry_connect_after = WebsocketRelay::initial_reconnect_duration();
    273                 handle_relay_open(
    274                     websocket,
    275                     &mut self.broadcast_cache,
    276                     &mut self.compaction_data,
    277                     &mut self.transparent_data,
    278                     self.limits.max_json_bytes,
    279                     &mut self.limits.sub_guardian,
    280                     subs,
    281                 );
    282                 None
    283             }
    284             WsEvent::Closed => {
    285                 websocket.conn.set_status(RelayStatus::Disconnected);
    286                 None
    287             }
    288             WsEvent::Error(err) => {
    289                 tracing::error!("relay {} error: {:?}", websocket.conn.url, err);
    290                 websocket.conn.set_status(RelayStatus::Disconnected);
    291                 None
    292             }
    293             WsEvent::Message(ws_message) => match ws_message {
    294                 #[cfg(not(target_arch = "wasm32"))]
    295                 WsMessage::Ping(bs) => {
    296                     websocket.conn.sender.send(WsMessage::Pong(bs.clone()));
    297                     None
    298                 }
    299                 WsMessage::Text(text) => {
    300                     tracing::trace!("relay {} received text: {}", websocket.conn.url, text);
    301                     match RelayMessage::from_json(text) {
    302                         Ok(msg) => Some(msg),
    303                         Err(err) => {
    304                             tracing::error!(
    305                                 "relay {} message decode error: {:?}",
    306                                 websocket.conn.url,
    307                                 err
    308                             );
    309                             None
    310                         }
    311                     }
    312                 }
    313                 _ => None,
    314             },
    315         };
    316 
    317         let mut resp = RecvResponse::received();
    318         let Some(msg) = msg else {
    319             return resp;
    320         };
    321 
    322         match msg {
    323             RelayMessage::OK(cr) => tracing::info!("OK {:?}", cr),
    324             RelayMessage::Eose(sid) => {
    325                 tracing::debug!("Relay {} received EOSE for subscription: {sid}", self.url());
    326                 self.compaction_data
    327                     .set_req_status(sid, RelayReqStatus::Eose);
    328                 self.transparent_data
    329                     .set_req_status(sid, RelayReqStatus::Eose);
    330                 self.eose_queue.push(RelayReqId(sid.to_string()));
    331             }
    332             RelayMessage::Event(_, ev) => {
    333                 profiling::scope!("ingest event");
    334                 resp.event_was_nostr_note = true;
    335                 act(RawEventData {
    336                     url: websocket.conn.url.as_str(),
    337                     event_json: ev,
    338                     relay_type: RelayImplType::Websocket,
    339                 });
    340             }
    341             RelayMessage::Notice(msg) => {
    342                 tracing::warn!("Notice from {}: {}", self.url(), msg)
    343             }
    344             RelayMessage::Closed(sid, _) => {
    345                 tracing::trace!("Relay {} received CLOSED: {sid}", self.url());
    346                 self.compaction_data
    347                     .set_req_status(sid, RelayReqStatus::Closed);
    348                 self.transparent_data
    349                     .set_req_status(sid, RelayReqStatus::Closed);
    350             }
    351         }
    352 
    353         resp
    354     }
    355 }
    356 
    357 #[derive(Default)]
    358 pub struct RecvResponse {
    359     pub received_event: bool,
    360     pub event_was_nostr_note: bool,
    361 }
    362 
    363 impl RecvResponse {
    364     pub fn received() -> Self {
    365         RecvResponse {
    366             received_event: true,
    367             event_was_nostr_note: false,
    368         }
    369     }
    370 }
    371 
    372 #[derive(Default)]
    373 pub struct EoseIds {
    374     pub oneshots: HashSet<OutboxSubId>,
    375     pub normal: HashSet<OutboxSubId>,
    376 }
    377 
    378 impl EoseIds {
    379     /// Merges IDs from `other` into `self`, preserving set uniqueness.
    380     pub fn absorb(&mut self, other: EoseIds) {
    381         self.oneshots.extend(other.oneshots);
    382         self.normal.extend(other.normal);
    383     }
    384 }
    385 
    386 fn handle_relay_open(
    387     websocket: &mut WebsocketRelay,
    388     broadcast_cache: &mut BroadcastCache,
    389     compaction: &mut CompactionData,
    390     transparent: &mut TransparentData,
    391     max_json: usize,
    392     guardian: &mut SubPassGuardian,
    393     subs: &OutboxSubscriptions,
    394 ) {
    395     BroadcastRelay::websocket(Some(websocket), broadcast_cache).try_flush_queue();
    396     let mut transparent = TransparentRelay::new(Some(websocket), transparent, guardian);
    397     transparent.handle_relay_open(subs);
    398     let mut compaction =
    399         CompactionRelay::new(Some(websocket), compaction, max_json, guardian, subs);
    400     compaction.handle_relay_open();
    401 }
    402 
    403 #[derive(Default)]
    404 pub struct CoordinationSession {
    405     pub tasks: HashMap<OutboxSubId, CoordinationTask>,
    406 }
    407 
    408 pub enum CoordinationTask {
    409     TransparentSub,
    410     CompactionSub,
    411     Unsubscribe,
    412 }
    413 
    414 impl CoordinationSession {
    415     pub fn subscribe(&mut self, id: OutboxSubId, use_transparent: bool) {
    416         self.tasks.insert(
    417             id,
    418             if use_transparent {
    419                 CoordinationTask::TransparentSub
    420             } else {
    421                 CoordinationTask::CompactionSub
    422             },
    423         );
    424     }
    425 
    426     pub fn unsubscribe(&mut self, id: OutboxSubId) {
    427         self.tasks.insert(id, CoordinationTask::Unsubscribe);
    428     }
    429 }
    430 
    431 #[cfg(test)]
    432 mod tests {
    433     use super::*;
    434 
    435     /// Returns the task held for `id`, panicking when no matching task exists.
    436     #[track_caller]
    437     fn expect_task<'a>(session: &'a CoordinationSession, id: OutboxSubId) -> &'a CoordinationTask {
    438         session
    439             .tasks
    440             .get(&id)
    441             .unwrap_or_else(|| panic!("Expected task for {:?}", id))
    442     }
    443 
    444     // ==================== CoordinationSession tests ====================
    445 
    446     /// Newly created coordination sessions hold no tasks.
    447     #[test]
    448     fn coordination_session_default_empty() {
    449         let session = CoordinationSession::default();
    450         assert!(session.tasks.is_empty());
    451     }
    452 
    453     /// Transparent subscriptions should be recorded as TransparentSub tasks.
    454     #[test]
    455     fn coordination_session_subscribe_transparent() {
    456         let mut session = CoordinationSession::default();
    457 
    458         session.subscribe(OutboxSubId(0), true); // use_transparent = true
    459 
    460         assert!(matches!(
    461             expect_task(&session, OutboxSubId(0)),
    462             CoordinationTask::TransparentSub
    463         ));
    464     }
    465 
    466     /// Compaction mode subscriptions should be recorded as CompactionSub tasks.
    467     #[test]
    468     fn coordination_session_subscribe_compaction() {
    469         let mut session = CoordinationSession::default();
    470 
    471         session.subscribe(OutboxSubId(0), false); // use_transparent = false means compaction
    472 
    473         assert!(matches!(
    474             expect_task(&session, OutboxSubId(0)),
    475             CoordinationTask::CompactionSub
    476         ));
    477     }
    478 
    479     /// Unsubscribe should record an Unsubscribe task.
    480     #[test]
    481     fn coordination_session_unsubscribe() {
    482         let mut session = CoordinationSession::default();
    483 
    484         session.unsubscribe(OutboxSubId(42));
    485 
    486         assert!(matches!(
    487             expect_task(&session, OutboxSubId(42)),
    488             CoordinationTask::Unsubscribe
    489         ));
    490     }
    491 
    492     /// Subsequent subscribe calls should overwrite previous modes.
    493     #[test]
    494     fn coordination_session_subscribe_overwrites_previous() {
    495         let mut session = CoordinationSession::default();
    496 
    497         // First subscribe as transparent
    498         session.subscribe(OutboxSubId(0), true);
    499 
    500         assert!(matches!(
    501             expect_task(&session, OutboxSubId(0)),
    502             CoordinationTask::TransparentSub
    503         ));
    504 
    505         // Then as compaction
    506         session.subscribe(OutboxSubId(0), false);
    507 
    508         // Should be compaction now
    509         assert!(matches!(
    510             expect_task(&session, OutboxSubId(0)),
    511             CoordinationTask::CompactionSub
    512         ));
    513     }
    514 
    515     /// Unsubscribe should override any prior subscribe entries.
    516     #[test]
    517     fn coordination_session_unsubscribe_overwrites_subscribe() {
    518         let mut session = CoordinationSession::default();
    519 
    520         session.subscribe(OutboxSubId(0), true);
    521         assert!(matches!(
    522             expect_task(&session, OutboxSubId(0)),
    523             CoordinationTask::TransparentSub
    524         ));
    525         session.unsubscribe(OutboxSubId(0));
    526 
    527         assert!(matches!(
    528             expect_task(&session, OutboxSubId(0)),
    529             CoordinationTask::Unsubscribe
    530         ));
    531     }
    532 
    533     /// Multiple tasks can be recorded in a single session.
    534     #[test]
    535     fn coordination_session_multiple_tasks() {
    536         let mut session = CoordinationSession::default();
    537 
    538         session.subscribe(OutboxSubId(0), true);
    539         session.subscribe(OutboxSubId(1), false);
    540         session.unsubscribe(OutboxSubId(2));
    541 
    542         assert_eq!(session.tasks.len(), 3);
    543     }
    544 
    545     // ==================== EoseIds tests ====================
    546 
    547     #[test]
    548     fn eose_ids_default_empty() {
    549         let eose_ids = EoseIds::default();
    550         assert!(eose_ids.oneshots.is_empty());
    551         assert!(eose_ids.normal.is_empty());
    552     }
    553 
    554     /// absorb merges oneshot and normal ID sets into the target accumulator.
    555     #[test]
    556     fn eose_ids_absorb_merges_both_sets() {
    557         let mut acc = EoseIds::default();
    558         let mut incoming = EoseIds::default();
    559 
    560         acc.oneshots.insert(OutboxSubId(1));
    561         incoming.oneshots.insert(OutboxSubId(2));
    562         incoming.normal.insert(OutboxSubId(3));
    563 
    564         acc.absorb(incoming);
    565 
    566         assert!(acc.oneshots.contains(&OutboxSubId(1)));
    567         assert!(acc.oneshots.contains(&OutboxSubId(2)));
    568         assert!(acc.normal.contains(&OutboxSubId(3)));
    569     }
    570 }