notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

subscription.rs (13884B)


      1 use hashbrown::{HashMap, HashSet};
      2 use nostrdb::Filter;
      3 
      4 use crate::relay::{MetadataFilters, NormRelayUrl, OutboxSubId, RelayType, RelayUrlPkgs};
      5 
      6 pub struct OutboxSubscription {
      7     pub relays: HashSet<NormRelayUrl>,
      8     pub filters: MetadataFilters,
      9     json_size: usize,
     10     pub is_oneshot: bool,
     11     pub relay_type: RelayType,
     12 }
     13 
     14 impl OutboxSubscription {
     15     pub fn see_all(&mut self, at: u64) {
     16         for (_, meta) in self.filters.iter_mut() {
     17             meta.last_seen = Some(at);
     18         }
     19     }
     20 
     21     pub fn ingest_task(&mut self, task: ModifyTask) {
     22         match task {
     23             ModifyTask::Filters(modify_filters_task) => {
     24                 self.filters = MetadataFilters::new(modify_filters_task.0);
     25                 self.json_size = self.filters.json_size_sum();
     26             }
     27             ModifyTask::Relays(modify_relays_task) => {
     28                 self.relays = modify_relays_task.0;
     29             }
     30             ModifyTask::Full(full_modification_task) => {
     31                 self.filters = MetadataFilters::new(full_modification_task.filters);
     32                 self.json_size = self.filters.json_size_sum();
     33                 self.relays = full_modification_task.relays;
     34             }
     35         }
     36     }
     37 }
     38 
     39 #[derive(Default)]
     40 pub struct OutboxSubscriptions {
     41     subs: HashMap<OutboxSubId, OutboxSubscription>,
     42 }
     43 
     44 impl OutboxSubscriptions {
     45     pub fn view(&self, id: &OutboxSubId) -> Option<SubscriptionView<'_>> {
     46         let sub = self.subs.get(id)?;
     47 
     48         Some(SubscriptionView {
     49             id: *id,
     50             filters: &sub.filters,
     51             json_size: sub.json_size,
     52             is_oneshot: sub.is_oneshot,
     53         })
     54     }
     55 
     56     pub fn json_size(&self, id: &OutboxSubId) -> Option<usize> {
     57         self.subs.get(id).map(|s| s.json_size)
     58     }
     59 
     60     pub fn subset_oneshot(&self, ids: &HashSet<OutboxSubId>) -> HashSet<OutboxSubId> {
     61         ids.iter()
     62             .filter(|id| self.subs.get(*id).is_some_and(|s| s.is_oneshot))
     63             .copied()
     64             .collect()
     65     }
     66 
     67     pub fn is_oneshot(&self, id: &OutboxSubId) -> bool {
     68         self.subs.get(id).is_some_and(|s| s.is_oneshot)
     69     }
     70 
     71     pub fn json_size_sum(&self, ids: &HashSet<OutboxSubId>) -> usize {
     72         ids.iter()
     73             .map(|id| self.subs.get(id).map_or(0, |s| s.json_size))
     74             .sum()
     75     }
     76 
     77     pub fn filters_all(&self, ids: &HashSet<OutboxSubId>) -> Vec<Filter> {
     78         ids.iter()
     79             .filter_map(|id| self.subs.get(id))
     80             .flat_map(|sub| sub.filters.filters.iter().cloned())
     81             .collect()
     82     }
     83 
     84     pub fn get_mut(&mut self, id: &OutboxSubId) -> Option<&mut OutboxSubscription> {
     85         self.subs.get_mut(id)
     86     }
     87 
     88     pub fn get(&self, id: &OutboxSubId) -> Option<&OutboxSubscription> {
     89         self.subs.get(id)
     90     }
     91 
     92     pub fn remove(&mut self, id: &OutboxSubId) {
     93         self.subs.remove(id);
     94     }
     95 
     96     pub fn new_subscription(&mut self, id: OutboxSubId, task: SubscribeTask, is_oneshot: bool) {
     97         let filters = MetadataFilters::new(task.filters);
     98         let json_size = filters.json_size_sum();
     99         self.subs.insert(
    100             id,
    101             OutboxSubscription {
    102                 relays: task.relays.urls,
    103                 filters,
    104                 json_size,
    105                 is_oneshot,
    106                 relay_type: if task.relays.use_transparent {
    107                     RelayType::Transparent
    108                 } else {
    109                     RelayType::Compaction
    110                 },
    111             },
    112         );
    113     }
    114 }
    115 
    116 pub struct SubscriptionView<'a> {
    117     pub id: OutboxSubId,
    118     pub filters: &'a MetadataFilters,
    119     #[allow(dead_code)]
    120     pub json_size: usize,
    121     #[allow(dead_code)]
    122     pub is_oneshot: bool,
    123 }
    124 
    125 pub enum OutboxTask {
    126     Modify(ModifyTask),
    127     Subscribe(SubscribeTask),
    128     Unsubscribe,
    129     Oneshot(SubscribeTask),
    130 }
    131 
    132 pub enum ModifyTask {
    133     Filters(ModifyFiltersTask),
    134     Relays(ModifyRelaysTask),
    135     Full(FullModificationTask),
    136 }
    137 
    138 #[derive(Default)]
    139 pub struct ModifyFiltersTask(pub Vec<Filter>);
    140 
    141 pub struct ModifyRelaysTask(pub HashSet<NormRelayUrl>);
    142 
    143 pub struct FullModificationTask {
    144     pub filters: Vec<Filter>,
    145     pub relays: HashSet<NormRelayUrl>,
    146 }
    147 
    148 pub struct SubscribeTask {
    149     pub filters: Vec<Filter>,
    150     pub relays: RelayUrlPkgs,
    151 }
    152 
    153 #[cfg(test)]
    154 mod tests {
    155     use super::*;
    156     use crate::relay::RelayUrlPkgs;
    157     use crate::relay::{FullModificationTask, ModifyFiltersTask};
    158 
    159     fn subscribe_task(filters: Vec<Filter>, urls: RelayUrlPkgs) -> SubscribeTask {
    160         SubscribeTask {
    161             filters,
    162             relays: urls,
    163         }
    164     }
    165 
    166     fn relay_urls(url: &str) -> HashSet<NormRelayUrl> {
    167         let mut urls = HashSet::new();
    168         let relay = NormRelayUrl::new(url).unwrap();
    169         urls.insert(relay);
    170         urls
    171     }
    172 
    173     /// new_subscription should persist relay metadata and expose it via view().
    174     #[test]
    175     fn new_subscription_records_metadata() {
    176         let mut subs = OutboxSubscriptions::default();
    177         let mut pkgs = RelayUrlPkgs::new(relay_urls("wss://relay-meta.example.com"));
    178         pkgs.use_transparent = true;
    179         let filters = vec![Filter::new().kinds(vec![1]).limit(4).build()];
    180         let id = OutboxSubId(7);
    181 
    182         subs.new_subscription(id, subscribe_task(filters.clone(), pkgs), true);
    183 
    184         let view = subs.view(&id).expect("subscription view");
    185         assert_eq!(view.id, id);
    186         assert!(view.is_oneshot);
    187         assert_eq!(view.filters.get_filters().len(), filters.len());
    188         assert!(view.json_size > 0);
    189 
    190         let sub = subs.get_mut(&id).expect("subscription metadata");
    191         assert_eq!(sub.relays.len(), 1);
    192         assert_eq!(sub.relay_type, RelayType::Transparent);
    193     }
    194 
    195     /// subset_oneshot should only return IDs corresponding to oneshot subscriptions.
    196     #[test]
    197     fn subset_oneshot_filters_ids() {
    198         let mut subs = OutboxSubscriptions::default();
    199         let filters = vec![Filter::new().kinds(vec![1]).build()];
    200         let id_a = OutboxSubId(1);
    201         let id_b = OutboxSubId(2);
    202         subs.new_subscription(
    203             id_a,
    204             subscribe_task(
    205                 filters.clone(),
    206                 RelayUrlPkgs::new(relay_urls("wss://relay-a.example")),
    207             ),
    208             false,
    209         );
    210         subs.new_subscription(
    211             id_b,
    212             subscribe_task(
    213                 filters,
    214                 RelayUrlPkgs::new(relay_urls("wss://relay-b.example")),
    215             ),
    216             true,
    217         );
    218 
    219         let mut ids = HashSet::new();
    220         ids.insert(id_a);
    221         ids.insert(id_b);
    222 
    223         let oneshots = subs.subset_oneshot(&ids);
    224         let expected = {
    225             let mut s = HashSet::new();
    226             s.insert(id_b);
    227             s
    228         };
    229         assert_eq!(oneshots, expected);
    230     }
    231 
    232     /// json_size_sum aggregates the JSON payload size for the requested subscriptions.
    233     #[test]
    234     fn json_size_sum_accumulates_sizes() {
    235         let mut subs = OutboxSubscriptions::default();
    236         let filters = vec![Filter::new().kinds(vec![1]).build()];
    237         let id_a = OutboxSubId(1);
    238         let id_b = OutboxSubId(2);
    239         subs.new_subscription(
    240             id_a,
    241             subscribe_task(
    242                 filters.clone(),
    243                 RelayUrlPkgs::new(relay_urls("wss://relay-json-a.example")),
    244             ),
    245             false,
    246         );
    247         subs.new_subscription(
    248             id_b,
    249             subscribe_task(
    250                 filters,
    251                 RelayUrlPkgs::new(relay_urls("wss://relay-json-b.example")),
    252             ),
    253             false,
    254         );
    255 
    256         let mut ids = HashSet::new();
    257         ids.insert(id_a);
    258         ids.insert(id_b);
    259 
    260         let sum = subs.json_size_sum(&ids);
    261         let expected = subs.json_size(&id_a).unwrap() + subs.json_size(&id_b).unwrap();
    262         assert_eq!(sum, expected);
    263     }
    264 
    265     /// see_all should mark every filter as seen at the provided timestamp.
    266     #[test]
    267     fn see_all_marks_filters() {
    268         let mut subs = OutboxSubscriptions::default();
    269         let id = OutboxSubId(8);
    270         subs.new_subscription(
    271             id,
    272             subscribe_task(
    273                 vec![
    274                     Filter::new().kinds(vec![1]).limit(2).build(),
    275                     Filter::new().kinds(vec![4]).limit(1).build(),
    276                 ],
    277                 RelayUrlPkgs::new(relay_urls("wss://relay-see.example")),
    278             ),
    279             false,
    280         );
    281 
    282         let timestamp = 12345;
    283         let sub = subs.get_mut(&id).expect("subscription metadata");
    284         sub.see_all(timestamp);
    285 
    286         assert!(sub
    287             .filters
    288             .iter()
    289             .all(|(_, meta)| meta.last_seen == Some(timestamp)));
    290     }
    291 
    292     /// ingest_task should update json_size when filters are modified.
    293     #[test]
    294     fn ingest_task_updates_json_size_on_filter_change() {
    295         let mut subs = OutboxSubscriptions::default();
    296         let id = OutboxSubId(9);
    297         let small_filters = vec![Filter::new().kinds(vec![1]).build()];
    298         subs.new_subscription(
    299             id,
    300             subscribe_task(
    301                 small_filters,
    302                 RelayUrlPkgs::new(relay_urls("wss://relay-ingest.example")),
    303             ),
    304             false,
    305         );
    306 
    307         let original_size = subs.json_size(&id).unwrap();
    308 
    309         // Modify with larger filters
    310         let large_filters = vec![
    311             Filter::new().kinds(vec![1, 2, 3, 4, 5]).limit(100).build(),
    312             Filter::new().kinds(vec![6, 7, 8]).limit(50).build(),
    313         ];
    314         let sub = subs.get_mut(&id).unwrap();
    315         sub.ingest_task(ModifyTask::Filters(ModifyFiltersTask(large_filters)));
    316 
    317         let new_size = subs.json_size(&id).unwrap();
    318         assert_ne!(
    319             original_size, new_size,
    320             "json_size should change after filter modification"
    321         );
    322         assert!(
    323             new_size > original_size,
    324             "larger filters should have larger json_size"
    325         );
    326     }
    327 
    328     /// ingest_task with Full modification should update json_size.
    329     #[test]
    330     fn ingest_task_updates_json_size_on_full_change() {
    331         let mut subs = OutboxSubscriptions::default();
    332         let id = OutboxSubId(10);
    333         let small_filters = vec![Filter::new().kinds(vec![1]).build()];
    334         subs.new_subscription(
    335             id,
    336             subscribe_task(
    337                 small_filters,
    338                 RelayUrlPkgs::new(relay_urls("wss://relay-full.example")),
    339             ),
    340             false,
    341         );
    342 
    343         let original_size = subs.json_size(&id).unwrap();
    344 
    345         // Full modification with larger filters
    346         let large_filters = vec![
    347             Filter::new().kinds(vec![1, 2, 3, 4, 5]).limit(100).build(),
    348             Filter::new().kinds(vec![6, 7, 8]).limit(50).build(),
    349         ];
    350         let sub = subs.get_mut(&id).unwrap();
    351         sub.ingest_task(ModifyTask::Full(FullModificationTask {
    352             filters: large_filters,
    353             relays: relay_urls("wss://new-relay.example"),
    354         }));
    355 
    356         let new_size = subs.json_size(&id).unwrap();
    357         assert_ne!(
    358             original_size, new_size,
    359             "json_size should change after full modification"
    360         );
    361         assert!(
    362             new_size > original_size,
    363             "larger filters should have larger json_size"
    364         );
    365     }
    366 
    367     fn filter_has_since(filter: &Filter, expected: u64) -> bool {
    368         let json = filter.json().expect("filter json");
    369         json.contains(&format!("\"since\":{}", expected))
    370     }
    371 
    372     /// Full flow: see_all sets last_seen, then since_optimize applies it to filters.
    373     #[test]
    374     fn see_all_then_since_optimize_applies_since_to_filters() {
    375         let mut subs = OutboxSubscriptions::default();
    376         let id = OutboxSubId(11);
    377         let filters = vec![
    378             Filter::new().kinds(vec![1]).build(),
    379             Filter::new().kinds(vec![2]).build(),
    380         ];
    381         subs.new_subscription(
    382             id,
    383             subscribe_task(
    384                 filters,
    385                 RelayUrlPkgs::new(relay_urls("wss://relay-since.example")),
    386             ),
    387             false,
    388         );
    389 
    390         // Verify filters don't have since initially
    391         let view = subs.view(&id).unwrap();
    392         for filter in view.filters.get_filters() {
    393             let json = filter.json().expect("filter json");
    394             assert!(
    395                 !json.contains("\"since\""),
    396                 "filter should not have since initially"
    397             );
    398         }
    399 
    400         let timestamp = 1700000000u64;
    401         let sub = subs.get_mut(&id).unwrap();
    402         sub.see_all(timestamp);
    403         sub.filters.since_optimize();
    404 
    405         // Verify filters now have since
    406         let view = subs.view(&id).unwrap();
    407         for filter in view.filters.get_filters() {
    408             assert!(
    409                 filter_has_since(filter, timestamp),
    410                 "filter should have since after see_all + since_optimize"
    411             );
    412         }
    413     }
    414 
    415     /// Filters accessed via view() should have since after optimization.
    416     #[test]
    417     fn view_returns_optimized_filters() {
    418         let mut subs = OutboxSubscriptions::default();
    419         let id = OutboxSubId(12);
    420         let filters = vec![Filter::new().kinds(vec![1]).build()];
    421         subs.new_subscription(
    422             id,
    423             subscribe_task(
    424                 filters,
    425                 RelayUrlPkgs::new(relay_urls("wss://relay-view.example")),
    426             ),
    427             false,
    428         );
    429 
    430         let timestamp = 1234567890u64;
    431         {
    432             let sub = subs.get_mut(&id).unwrap();
    433             sub.see_all(timestamp);
    434             sub.filters.since_optimize();
    435         }
    436 
    437         // Access via view - should see the optimized filters
    438         let view = subs.view(&id).unwrap();
    439         let filter = &view.filters.get_filters()[0];
    440         assert!(
    441             filter_has_since(filter, timestamp),
    442             "view should return filters with since applied"
    443         );
    444     }
    445 }