notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

transparent.rs (18863B)


      1 use hashbrown::HashMap;
      2 use uuid::Uuid;
      3 
      4 use crate::{
      5     relay::{
      6         subscription::SubscriptionView, MetadataFilters, OutboxSubId, OutboxSubscriptions,
      7         QueuedTasks, RelayReqId, RelayReqStatus, RelayTask, SubPass, SubPassGuardian,
      8         SubPassRevocation, WebsocketRelay,
      9     },
     10     ClientMessage,
     11 };
     12 
     13 /// TransparentData tracks the outstanding transparent REQs and their metadata.
     14 #[derive(Default)]
     15 pub struct TransparentData {
     16     request_to_sid: HashMap<OutboxSubId, RelayReqId>,
     17     sid_status: HashMap<RelayReqId, SubData>,
     18     queue: QueuedTasks,
     19 }
     20 
     21 impl TransparentData {
     22     #[allow(dead_code)]
     23     pub fn num_subs(&self) -> usize {
     24         self.sid_status.len()
     25     }
     26 
     27     #[allow(dead_code)]
     28     pub fn contains(&self, id: &OutboxSubId) -> bool {
     29         self.request_to_sid.contains_key(id)
     30     }
     31 
     32     pub fn set_req_status(&mut self, sid: &str, status: RelayReqStatus) {
     33         let Some(entry) = self.sid_status.get_mut(sid) else {
     34             return;
     35         };
     36         entry.status = status;
     37     }
     38 
     39     pub fn req_status(&self, req_id: &OutboxSubId) -> Option<RelayReqStatus> {
     40         let sid = self.request_to_sid.get(req_id)?;
     41         Some(self.sid_status.get(sid)?.status)
     42     }
     43 
     44     /// Returns the OutboxSubId associated with the given relay subscription ID.
     45     pub fn id(&self, sid: &RelayReqId) -> Option<OutboxSubId> {
     46         self.sid_status.get(sid).map(|d| d.sub_req_id)
     47     }
     48 }
     49 
     50 pub struct TransparentRelay<'a> {
     51     relay: Option<&'a mut WebsocketRelay>,
     52     data: &'a mut TransparentData,
     53     sub_guardian: &'a mut SubPassGuardian,
     54 }
     55 
     56 /// TransparentRelay manages per-subscription REQs for outbox subscriptions which
     57 /// need to get EOSE ASAP (or some other need)
     58 impl<'a> TransparentRelay<'a> {
     59     pub fn new(
     60         relay: Option<&'a mut WebsocketRelay>,
     61         data: &'a mut TransparentData,
     62         sub_guardian: &'a mut SubPassGuardian,
     63     ) -> Self {
     64         Self {
     65             relay,
     66             data,
     67             sub_guardian,
     68         }
     69     }
     70 
     71     pub fn try_flush_queue(&mut self, subs: &OutboxSubscriptions) {
     72         while self.sub_guardian.available_passes() > 0 && !self.data.queue.is_empty() {
     73             let Some(next) = self.data.queue.pop() else {
     74                 return;
     75             };
     76 
     77             let Some(view) = subs.view(&next) else {
     78                 continue;
     79             };
     80 
     81             self.subscribe(view);
     82         }
     83     }
     84 
     85     pub fn subscribe(&mut self, view: SubscriptionView) {
     86         let req_id = view.id;
     87         let Some(existing_sid) = self.data.request_to_sid.get(&req_id) else {
     88             let Some(new_pass) = self.sub_guardian.take_pass() else {
     89                 self.data.queue.add(req_id, RelayTask::Subscribe);
     90                 return;
     91             };
     92             tracing::debug!("Transparent took pass for {req_id:?}");
     93             let sid: RelayReqId = Uuid::new_v4().into();
     94             self.data.request_to_sid.insert(req_id, sid.clone());
     95             send_req(&mut self.relay, &sid, view.filters);
     96             self.data.sid_status.insert(
     97                 sid,
     98                 SubData {
     99                     status: RelayReqStatus::InitialQuery,
    100                     sub_pass: new_pass,
    101                     sub_req_id: req_id,
    102                 },
    103             );
    104             return;
    105         };
    106 
    107         let Some(sub_data) = self.data.sid_status.get_mut(existing_sid) else {
    108             return;
    109         };
    110 
    111         // we're replacing the existing sub with new filters
    112         sub_data.status = RelayReqStatus::InitialQuery;
    113 
    114         send_req(&mut self.relay, existing_sid, view.filters);
    115     }
    116 
    117     pub fn unsubscribe(&mut self, req_id: OutboxSubId) {
    118         let Some(sid) = self.data.request_to_sid.remove(&req_id) else {
    119             self.data.queue.add(req_id, RelayTask::Unsubscribe);
    120             return;
    121         };
    122 
    123         let Some(removed) = self.data.sid_status.remove(&sid) else {
    124             return;
    125         };
    126 
    127         self.sub_guardian.return_pass(removed.sub_pass);
    128 
    129         let Some(relay) = &mut self.relay else {
    130             return;
    131         };
    132 
    133         if relay.is_connected() {
    134             relay.conn.send(&ClientMessage::close(sid.to_string()));
    135         }
    136     }
    137 
    138     #[profiling::function]
    139     pub fn handle_relay_open(&mut self, subs: &OutboxSubscriptions) {
    140         let Some(relay) = &mut self.relay else {
    141             return;
    142         };
    143 
    144         if !relay.is_connected() {
    145             return;
    146         }
    147 
    148         for (sid, data) in &self.data.sid_status {
    149             let Some(view) = subs.view(&data.sub_req_id) else {
    150                 continue;
    151             };
    152 
    153             relay.conn.send(&ClientMessage::req(
    154                 sid.to_string(),
    155                 view.filters.get_filters().clone(),
    156             ));
    157         }
    158     }
    159 }
    160 
    161 fn send_req(relay: &mut Option<&mut WebsocketRelay>, sid: &RelayReqId, filters: &MetadataFilters) {
    162     let Some(relay) = relay.as_mut() else {
    163         return;
    164     };
    165 
    166     if !relay.is_connected() {
    167         return;
    168     }
    169 
    170     relay.conn.send(&ClientMessage::req(
    171         sid.to_string(),
    172         filters.get_filters().clone(),
    173     ));
    174 }
    175 
    176 #[allow(dead_code)]
    177 pub fn revocate_transparent_subs(
    178     mut relay: Option<&mut WebsocketRelay>,
    179     data: &mut TransparentData,
    180     revocations: Vec<SubPassRevocation>,
    181 ) {
    182     // Snapshot the pairs we intend to process (can't mutate while iterating).
    183     let pairs: Vec<(OutboxSubId, RelayReqId)> = data
    184         .request_to_sid
    185         .iter()
    186         .take(revocations.len())
    187         .map(|(id, sid)| (*id, sid.clone()))
    188         .collect();
    189 
    190     for (mut revocation, (id, sid)) in revocations.into_iter().zip(pairs) {
    191         // If we fail to remove the mapping, skip without consuming other state.
    192         if data.request_to_sid.remove(&id).is_none() {
    193             continue;
    194         }
    195 
    196         let Some(status) = data.sid_status.remove(&sid) else {
    197             continue;
    198         };
    199 
    200         revocation.revocate(status.sub_pass);
    201         data.queue.add(id, RelayTask::Subscribe);
    202 
    203         let Some(relay) = &mut relay else {
    204             continue;
    205         };
    206 
    207         if relay.is_connected() {
    208             relay.conn.send(&ClientMessage::close(sid.to_string()));
    209         }
    210     }
    211 }
    212 
    213 struct SubData {
    214     pub status: RelayReqStatus,
    215     pub sub_pass: SubPass,
    216     pub sub_req_id: OutboxSubId,
    217 }
    218 
    219 #[cfg(test)]
    220 mod tests {
    221     use super::*;
    222     use crate::relay::{RelayUrlPkgs, SubscribeTask};
    223     use hashbrown::HashSet;
    224     use nostrdb::Filter;
    225 
    226     // ==================== TransparentData tests ====================
    227 
    228     fn trivial_filter() -> Vec<Filter> {
    229         vec![Filter::new().kinds([0]).build()]
    230     }
    231 
    232     fn create_subs_with_filter(id: OutboxSubId, filters: Vec<Filter>) -> OutboxSubscriptions {
    233         let mut subs = OutboxSubscriptions::default();
    234         insert_sub(&mut subs, id, filters, false);
    235         subs
    236     }
    237 
    238     fn insert_sub(
    239         subs: &mut OutboxSubscriptions,
    240         id: OutboxSubId,
    241         filters: Vec<Filter>,
    242         is_oneshot: bool,
    243     ) {
    244         subs.new_subscription(
    245             id,
    246             SubscribeTask {
    247                 filters,
    248                 relays: RelayUrlPkgs::new(HashSet::new()),
    249             },
    250             is_oneshot,
    251         );
    252     }
    253 
    254     #[test]
    255     fn transparent_data_manual_insert_and_query() {
    256         let mut data = TransparentData::default();
    257         let mut guardian = SubPassGuardian::new(1);
    258         let pass = guardian.take_pass().unwrap();
    259 
    260         let req_id = OutboxSubId(42);
    261         let sid = RelayReqId::default();
    262 
    263         data.request_to_sid.insert(req_id, sid.clone());
    264         data.sid_status.insert(
    265             sid.clone(),
    266             SubData {
    267                 status: RelayReqStatus::InitialQuery,
    268                 sub_pass: pass,
    269                 sub_req_id: req_id,
    270             },
    271         );
    272 
    273         assert!(data.contains(&req_id));
    274         assert_eq!(data.num_subs(), 1);
    275         assert_eq!(data.req_status(&req_id), Some(RelayReqStatus::InitialQuery));
    276 
    277         // Update status
    278         data.set_req_status(&sid.to_string(), RelayReqStatus::Eose);
    279         assert_eq!(data.req_status(&req_id), Some(RelayReqStatus::Eose));
    280     }
    281 
    282     // ==================== TransparentRelay tests ====================
    283 
    284     #[test]
    285     fn transparent_relay_subscribe_creates_mapping() {
    286         let mut data = TransparentData::default();
    287         let mut guardian = SubPassGuardian::new(5);
    288         let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter());
    289 
    290         {
    291             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    292             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    293         }
    294 
    295         assert!(data.contains(&OutboxSubId(0)));
    296         assert_eq!(data.num_subs(), 1);
    297         assert_eq!(guardian.available_passes(), 4); // One pass consumed
    298     }
    299 
    300     #[test]
    301     fn transparent_relay_subscribe_queues_when_no_passes() {
    302         let mut data = TransparentData::default();
    303         let mut guardian = SubPassGuardian::new(0); // No passes available
    304         let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter());
    305 
    306         {
    307             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    308             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    309         }
    310 
    311         // Should be queued, not active
    312         assert!(!data.contains(&OutboxSubId(0)));
    313         assert_eq!(data.num_subs(), 0);
    314         assert_eq!(data.queue.len(), 1);
    315     }
    316 
    317     #[test]
    318     fn transparent_relay_unsubscribe_returns_pass() {
    319         let mut data = TransparentData::default();
    320         let mut guardian = SubPassGuardian::new(1);
    321         let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter());
    322 
    323         {
    324             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    325             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    326         }
    327 
    328         assert_eq!(guardian.available_passes(), 0);
    329         assert!(data.queue.is_empty());
    330 
    331         {
    332             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    333             relay.unsubscribe(OutboxSubId(0));
    334         }
    335 
    336         assert_eq!(guardian.available_passes(), 1);
    337         assert!(!data.contains(&OutboxSubId(0)));
    338         assert_eq!(data.num_subs(), 0);
    339         assert!(data.queue.is_empty());
    340     }
    341 
    342     #[test]
    343     fn transparent_relay_sub_unsub_no_passes() {
    344         let mut data = TransparentData::default();
    345 
    346         // no passes available
    347         let mut guardian = SubPassGuardian::new(0);
    348         let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter());
    349 
    350         {
    351             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    352             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    353         }
    354 
    355         assert!(!data.queue.is_empty());
    356 
    357         {
    358             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    359             relay.unsubscribe(OutboxSubId(0));
    360         }
    361 
    362         assert!(data.queue.is_empty());
    363     }
    364 
    365     #[test]
    366     fn transparent_relay_unsubscribe_unknown_no_op() {
    367         let mut data = TransparentData::default();
    368         let mut guardian = SubPassGuardian::new(5);
    369 
    370         {
    371             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    372             relay.unsubscribe(OutboxSubId(999)); // Unknown ID
    373         }
    374 
    375         // Should not panic, passes unchanged
    376         assert_eq!(guardian.available_passes(), 5);
    377     }
    378 
    379     #[test]
    380     fn transparent_relay_subscribe_replaces_existing() {
    381         let mut data = TransparentData::default();
    382         let mut guardian = SubPassGuardian::new(5);
    383 
    384         let filters1 = vec![Filter::new().kinds(vec![1]).build()];
    385         let filters2 = vec![Filter::new().kinds(vec![4]).build()];
    386 
    387         let subs1 = create_subs_with_filter(OutboxSubId(0), filters1);
    388 
    389         {
    390             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    391             relay.subscribe(subs1.view(&OutboxSubId(0)).unwrap());
    392         }
    393 
    394         assert_eq!(guardian.available_passes(), 4);
    395 
    396         let subs2 = create_subs_with_filter(OutboxSubId(0), filters2);
    397 
    398         {
    399             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    400             relay.subscribe(subs2.view(&OutboxSubId(0)).unwrap());
    401         }
    402 
    403         // Should still have same number of passes (replaced, not added)
    404         assert_eq!(guardian.available_passes(), 4);
    405         assert_eq!(data.num_subs(), 1);
    406 
    407         // Verify replacement happened - status should be reset to InitialQuery
    408         let sid = data.request_to_sid.get(&OutboxSubId(0)).unwrap();
    409         let sub_data = data.sid_status.get(sid).unwrap();
    410         assert_eq!(sub_data.status, RelayReqStatus::InitialQuery);
    411     }
    412 
    413     #[test]
    414     fn transparent_relay_try_flush_queue_processes_when_passes_available() {
    415         let mut data = TransparentData::default();
    416         let mut guardian = SubPassGuardian::new(0); // Start with no passes
    417         let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter());
    418 
    419         // Queue a subscription
    420         {
    421             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    422             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    423         }
    424 
    425         assert_eq!(data.queue.len(), 1);
    426         assert!(!data.contains(&OutboxSubId(0)));
    427 
    428         // Return a pass
    429         guardian.spawn_passes(1);
    430 
    431         // Flush queue
    432         {
    433             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    434             relay.try_flush_queue(&subs);
    435         }
    436 
    437         // Should now be active
    438         assert!(data.queue.is_empty());
    439         assert!(data.contains(&OutboxSubId(0)));
    440     }
    441 
    442     #[test]
    443     fn transparent_relay_multiple_subscriptions() {
    444         let mut data = TransparentData::default();
    445         let mut guardian = SubPassGuardian::new(3);
    446         let mut subs = OutboxSubscriptions::default();
    447         insert_sub(&mut subs, OutboxSubId(0), trivial_filter(), false);
    448         insert_sub(&mut subs, OutboxSubId(1), trivial_filter(), false);
    449         insert_sub(&mut subs, OutboxSubId(2), trivial_filter(), false);
    450 
    451         {
    452             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    453             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    454             relay.subscribe(subs.view(&OutboxSubId(1)).unwrap());
    455             relay.subscribe(subs.view(&OutboxSubId(2)).unwrap());
    456         }
    457 
    458         assert_eq!(data.num_subs(), 3);
    459         assert_eq!(guardian.available_passes(), 0);
    460 
    461         // All should be tracked
    462         assert!(data.contains(&OutboxSubId(0)));
    463         assert!(data.contains(&OutboxSubId(1)));
    464         assert!(data.contains(&OutboxSubId(2)));
    465     }
    466 
    467     #[test]
    468     fn transparent_data_id_returns_outbox_sub_id() {
    469         let mut data = TransparentData::default();
    470         let mut guardian = SubPassGuardian::new(2);
    471         let mut subs = OutboxSubscriptions::default();
    472         insert_sub(&mut subs, OutboxSubId(0), trivial_filter(), true);
    473         insert_sub(&mut subs, OutboxSubId(1), trivial_filter(), false);
    474 
    475         {
    476             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    477             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    478             relay.subscribe(subs.view(&OutboxSubId(1)).unwrap());
    479         }
    480 
    481         let sid = data.request_to_sid.get(&OutboxSubId(0)).unwrap().clone();
    482 
    483         // id() should return the OutboxSubId for the relay subscription
    484         let outbox_id = data.id(&sid);
    485         assert_eq!(outbox_id, Some(OutboxSubId(0)));
    486 
    487         // Unknown sid should return None
    488         let unknown_sid = RelayReqId::from("unknown");
    489         assert!(data.id(&unknown_sid).is_none());
    490     }
    491 
    492     // ==================== revocate_transparent_subs tests ====================
    493 
    494     #[test]
    495     fn revocate_transparent_subs_removes_subscriptions() {
    496         let mut data = TransparentData::default();
    497         let mut guardian = SubPassGuardian::new(3);
    498         let mut subs = OutboxSubscriptions::default();
    499         insert_sub(&mut subs, OutboxSubId(0), trivial_filter(), false);
    500         insert_sub(&mut subs, OutboxSubId(1), trivial_filter(), false);
    501         insert_sub(&mut subs, OutboxSubId(2), trivial_filter(), false);
    502 
    503         // Set up some subscriptions
    504         {
    505             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    506             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    507             relay.subscribe(subs.view(&OutboxSubId(1)).unwrap());
    508             relay.subscribe(subs.view(&OutboxSubId(2)).unwrap());
    509         }
    510 
    511         assert_eq!(data.num_subs(), 3);
    512 
    513         // Create revocations for 2 subs
    514         let revocations = vec![SubPassRevocation::new(), SubPassRevocation::new()];
    515 
    516         revocate_transparent_subs(None, &mut data, revocations);
    517 
    518         // Should have removed 2 subscriptions
    519         assert_eq!(data.num_subs(), 1);
    520         assert_eq!(data.queue.len(), 2);
    521     }
    522 
    523     #[test]
    524     fn revocate_transparent_subs_empty_revocations() {
    525         let mut data = TransparentData::default();
    526         let mut guardian = SubPassGuardian::new(2);
    527         let subs = create_subs_with_filter(OutboxSubId(0), trivial_filter());
    528 
    529         {
    530             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    531             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    532         }
    533 
    534         // No revocations
    535         let revocations: Vec<SubPassRevocation> = vec![];
    536         revocate_transparent_subs(None, &mut data, revocations);
    537 
    538         // Nothing should change
    539         assert_eq!(data.num_subs(), 1);
    540     }
    541 
    542     #[test]
    543     fn revocate_transparent_subs_exactly_matching() {
    544         // Test with exactly matching number of revocations and subscriptions
    545         let mut data = TransparentData::default();
    546         let mut guardian = SubPassGuardian::new(3);
    547         let mut subs = OutboxSubscriptions::default();
    548         insert_sub(&mut subs, OutboxSubId(0), trivial_filter(), false);
    549         insert_sub(&mut subs, OutboxSubId(1), trivial_filter(), false);
    550         insert_sub(&mut subs, OutboxSubId(2), trivial_filter(), false);
    551 
    552         // Create 3 subscriptions
    553         {
    554             let mut relay = TransparentRelay::new(None, &mut data, &mut guardian);
    555             relay.subscribe(subs.view(&OutboxSubId(0)).unwrap());
    556             relay.subscribe(subs.view(&OutboxSubId(1)).unwrap());
    557             relay.subscribe(subs.view(&OutboxSubId(2)).unwrap());
    558         }
    559 
    560         assert_eq!(data.num_subs(), 3);
    561         assert_eq!(guardian.available_passes(), 0);
    562 
    563         // Create exactly 3 revocations
    564         let revocations = vec![
    565             SubPassRevocation::new(),
    566             SubPassRevocation::new(),
    567             SubPassRevocation::new(),
    568         ];
    569 
    570         // This should revoke all subscriptions
    571         revocate_transparent_subs(None, &mut data, revocations);
    572 
    573         assert_eq!(data.num_subs(), 0);
    574         assert_eq!(data.queue.len(), 3);
    575     }
    576 }