notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

session.rs (16397B)


      1 use hashbrown::{hash_map::Entry, HashMap, HashSet};
      2 use nostrdb::Filter;
      3 
      4 use crate::relay::{
      5     FullModificationTask, ModifyFiltersTask, ModifyRelaysTask, ModifyTask, NormRelayUrl,
      6     OutboxSubId, OutboxTask, RelayUrlPkgs, SubscribeTask,
      7 };
      8 
      9 /// OutboxSession records subscription mutations for the current frame before they
     10 /// are applied to the relay coordinators.
     11 #[derive(Default)]
     12 pub struct OutboxSession {
     13     pub tasks: HashMap<OutboxSubId, OutboxTask>,
     14 }
     15 
     16 impl OutboxSession {
     17     #[profiling::function]
     18     pub fn new_filters(&mut self, id: OutboxSubId, mut new_filters: Vec<Filter>) {
     19         filters_prune_empty(&mut new_filters);
     20         if new_filters.is_empty() {
     21             self.unsubscribe(id);
     22             return;
     23         }
     24 
     25         let entry = self.tasks.entry(id);
     26 
     27         let mut entry = match entry {
     28             Entry::Occupied(occupied_entry) => {
     29                 if matches!(occupied_entry.get(), OutboxTask::Oneshot(_)) {
     30                     // we don't modify oneshots
     31                     return;
     32                 }
     33                 occupied_entry
     34             }
     35             Entry::Vacant(vacant_entry) => {
     36                 vacant_entry.insert(OutboxTask::Modify(ModifyTask::Filters(ModifyFiltersTask(
     37                     new_filters,
     38                 ))));
     39                 return;
     40             }
     41         };
     42 
     43         match entry.get_mut() {
     44             OutboxTask::Modify(modify) => match modify {
     45                 ModifyTask::Filters(_) => {
     46                     self.tasks.insert(
     47                         id,
     48                         OutboxTask::Modify(ModifyTask::Filters(ModifyFiltersTask(new_filters))),
     49                     );
     50                 }
     51                 ModifyTask::Relays(modify_relays_task) => {
     52                     let relays = std::mem::take(&mut modify_relays_task.0);
     53                     *entry.get_mut() = OutboxTask::Modify(ModifyTask::Full(FullModificationTask {
     54                         filters: new_filters,
     55                         relays,
     56                     }));
     57                 }
     58                 ModifyTask::Full(full) => {
     59                     full.filters = new_filters;
     60                 }
     61             },
     62             OutboxTask::Unsubscribe => {
     63                 self.tasks.insert(
     64                     id,
     65                     OutboxTask::Modify(ModifyTask::Filters(ModifyFiltersTask(new_filters))),
     66                 );
     67             }
     68             OutboxTask::Oneshot(oneshot) => {
     69                 oneshot.filters = new_filters;
     70             }
     71             OutboxTask::Subscribe(subscribe_task) => {
     72                 subscribe_task.filters = new_filters;
     73             }
     74         }
     75     }
     76     #[profiling::function]
     77     pub fn new_relays(&mut self, id: OutboxSubId, new_urls: HashSet<NormRelayUrl>) {
     78         let entry = self.tasks.entry(id);
     79 
     80         let mut entry = match entry {
     81             Entry::Occupied(occupied_entry) => {
     82                 let task = occupied_entry.get();
     83 
     84                 if matches!(task, OutboxTask::Oneshot(_)) {
     85                     // we don't modify oneshots
     86                     return;
     87                 }
     88 
     89                 occupied_entry
     90             }
     91             Entry::Vacant(vacant_entry) => {
     92                 vacant_entry.insert(OutboxTask::Modify(ModifyTask::Relays(ModifyRelaysTask(
     93                     new_urls,
     94                 ))));
     95                 return;
     96             }
     97         };
     98 
     99         match entry.get_mut() {
    100             OutboxTask::Modify(modify) => {
    101                 match modify {
    102                     ModifyTask::Filters(modify_filters_task) => {
    103                         let filters = std::mem::take(&mut modify_filters_task.0); // moves out, leaves empty/default
    104                         *entry.get_mut() =
    105                             OutboxTask::Modify(ModifyTask::Full(FullModificationTask {
    106                                 filters,
    107                                 relays: new_urls,
    108                             }));
    109                     }
    110                     ModifyTask::Relays(_) => {
    111                         self.tasks.insert(
    112                             id,
    113                             OutboxTask::Modify(ModifyTask::Relays(ModifyRelaysTask(new_urls))),
    114                         );
    115                     }
    116                     ModifyTask::Full(full_modification_task) => {
    117                         full_modification_task.relays = new_urls;
    118                     }
    119                 }
    120             }
    121             OutboxTask::Unsubscribe => {
    122                 self.tasks.insert(
    123                     id,
    124                     OutboxTask::Modify(ModifyTask::Relays(ModifyRelaysTask(new_urls))),
    125                 );
    126             }
    127             OutboxTask::Oneshot(oneshot) => {
    128                 oneshot.relays.urls = new_urls;
    129             }
    130             OutboxTask::Subscribe(subscribe_task) => {
    131                 subscribe_task.relays.urls = new_urls;
    132             }
    133         }
    134     }
    135 
    136     pub fn subscribe(&mut self, id: OutboxSubId, mut filters: Vec<Filter>, urls: RelayUrlPkgs) {
    137         filters_prune_empty(&mut filters);
    138         if filters.is_empty() {
    139             return;
    140         }
    141 
    142         self.tasks.insert(
    143             id,
    144             OutboxTask::Subscribe(SubscribeTask {
    145                 filters,
    146                 relays: urls,
    147             }),
    148         );
    149     }
    150 
    151     pub fn oneshot(&mut self, id: OutboxSubId, mut filters: Vec<Filter>, urls: RelayUrlPkgs) {
    152         filters_prune_empty(&mut filters);
    153         if filters.is_empty() {
    154             return;
    155         }
    156 
    157         self.tasks.insert(
    158             id,
    159             OutboxTask::Oneshot(SubscribeTask {
    160                 filters,
    161                 relays: urls,
    162             }),
    163         );
    164     }
    165 
    166     pub fn unsubscribe(&mut self, id: OutboxSubId) {
    167         self.tasks.insert(id, OutboxTask::Unsubscribe);
    168     }
    169 }
    170 
    171 fn filters_prune_empty(filters: &mut Vec<Filter>) {
    172     filters.retain(|f| f.num_elements() != 0);
    173 }
    174 
    175 #[cfg(test)]
    176 mod tests {
    177     use crate::relay::test_utils::{expect_task, trivial_filter};
    178 
    179     use super::*;
    180 
    181     // ==================== OutboxSession tests ====================
    182 
    183     /// Verifies a freshly created session has no pending tasks.
    184     #[test]
    185     fn outbox_session_default_empty() {
    186         let session = OutboxSession::default();
    187         assert!(session.tasks.is_empty());
    188     }
    189 
    190     /// Drops subscribe/oneshot requests that lack meaningful filters/relays.
    191     #[test]
    192     fn outbox_session_subscribe_empty() {
    193         let mut session = OutboxSession::default();
    194         let urls = RelayUrlPkgs::new(HashSet::new());
    195 
    196         session.subscribe(OutboxSubId(0), vec![Filter::new().build()], urls.clone());
    197         assert!(session.tasks.is_empty());
    198 
    199         session.subscribe(OutboxSubId(0), vec![], urls.clone());
    200         assert!(session.tasks.is_empty());
    201 
    202         session.oneshot(OutboxSubId(0), vec![Filter::new().build()], urls.clone());
    203         assert!(session.tasks.is_empty());
    204 
    205         session.oneshot(OutboxSubId(0), vec![], urls);
    206         assert!(session.tasks.is_empty());
    207     }
    208 
    209     /// Stores subscribe tasks when filters and relays are provided.
    210     #[test]
    211     fn outbox_session_subscribe() {
    212         let mut session = OutboxSession::default();
    213         let urls = RelayUrlPkgs::new(HashSet::new());
    214 
    215         session.subscribe(OutboxSubId(0), trivial_filter(), urls);
    216 
    217         assert!(matches!(
    218             expect_task(&session, OutboxSubId(0)),
    219             OutboxTask::Subscribe(_)
    220         ));
    221     }
    222 
    223     /// Stores oneshot tasks when filters and relays are provided.
    224     #[test]
    225     fn outbox_session_oneshot() {
    226         let mut session = OutboxSession::default();
    227         let urls = RelayUrlPkgs::new(HashSet::new());
    228 
    229         session.oneshot(OutboxSubId(0), trivial_filter(), urls);
    230 
    231         assert!(matches!(
    232             expect_task(&session, OutboxSubId(0)),
    233             OutboxTask::Oneshot(_)
    234         ));
    235     }
    236 
    237     /// Records unsubscribe operations on demand.
    238     #[test]
    239     fn outbox_session_unsubscribe() {
    240         let mut session = OutboxSession::default();
    241 
    242         session.unsubscribe(OutboxSubId(42));
    243 
    244         assert!(matches!(
    245             expect_task(&session, OutboxSubId(42)),
    246             OutboxTask::Unsubscribe
    247         ));
    248     }
    249 
    250     /// Pushing filters first results in a Modify(Filters) task.
    251     #[test]
    252     fn outbox_session_new_filters_creates_modify_filters() {
    253         let mut session = OutboxSession::default();
    254 
    255         session.new_filters(OutboxSubId(0), trivial_filter());
    256 
    257         assert!(matches!(
    258             expect_task(&session, OutboxSubId(0)),
    259             OutboxTask::Modify(ModifyTask::Filters(_))
    260         ));
    261     }
    262 
    263     /// Pushing relays first results in a Modify(Relays) task.
    264     #[test]
    265     fn outbox_session_new_relays_creates_modify_relays() {
    266         let mut session = OutboxSession::default();
    267 
    268         session.new_relays(OutboxSubId(0), HashSet::new());
    269 
    270         assert!(matches!(
    271             expect_task(&session, OutboxSubId(0)),
    272             OutboxTask::Modify(ModifyTask::Relays(_))
    273         ));
    274     }
    275 
    276     /// Mixing filters then relays converges to a Modify(Full) task.
    277     #[test]
    278     fn outbox_session_merges_filters_and_relays_to_full_modification() {
    279         let mut session = OutboxSession::default();
    280 
    281         // First add filters
    282         session.new_filters(OutboxSubId(0), trivial_filter());
    283 
    284         // Then add relays - should merge to Full modification
    285         session.new_relays(OutboxSubId(0), HashSet::new());
    286 
    287         assert!(matches!(
    288             expect_task(&session, OutboxSubId(0)),
    289             OutboxTask::Modify(ModifyTask::Full(_))
    290         ));
    291     }
    292 
    293     /// Mixing relays then filters also converges to a Modify(Full) task.
    294     #[test]
    295     fn outbox_session_merges_relays_and_filters_to_full_modification() {
    296         let mut session = OutboxSession::default();
    297 
    298         // First add relays
    299         session.new_relays(OutboxSubId(0), HashSet::new());
    300 
    301         // Then add filters - should merge to Full modification
    302         session.new_filters(OutboxSubId(0), trivial_filter());
    303 
    304         assert!(matches!(
    305             expect_task(&session, OutboxSubId(0)),
    306             OutboxTask::Modify(ModifyTask::Full(_))
    307         ));
    308     }
    309 
    310     // this should never happen in practice though
    311     /// Subscribe commands override previously staged filter changes.
    312     #[test]
    313     fn outbox_session_subscribe_overwrites_modify_filters() {
    314         let mut session = OutboxSession::default();
    315         let urls = RelayUrlPkgs::new(HashSet::new());
    316 
    317         session.new_filters(OutboxSubId(0), trivial_filter());
    318         session.subscribe(
    319             OutboxSubId(0),
    320             vec![Filter::new().kinds(vec![3]).build()],
    321             urls,
    322         );
    323 
    324         assert!(matches!(
    325             expect_task(&session, OutboxSubId(0)),
    326             OutboxTask::Subscribe(_)
    327         ));
    328     }
    329 
    330     /// Unsubscribe issued after subscribe should take precedence.
    331     #[test]
    332     fn outbox_session_unsubscribe_after_subscribe() {
    333         let mut session = OutboxSession::default();
    334         let urls = RelayUrlPkgs::new(HashSet::new());
    335 
    336         session.subscribe(OutboxSubId(0), trivial_filter(), urls);
    337         session.unsubscribe(OutboxSubId(0));
    338 
    339         assert!(matches!(
    340             expect_task(&session, OutboxSubId(0)),
    341             OutboxTask::Unsubscribe
    342         ));
    343     }
    344 
    345     /// Adding filters after an unsubscribe restarts the task as Modify(Filters).
    346     #[test]
    347     fn outbox_session_new_filters_after_unsubscribe() {
    348         let mut session = OutboxSession::default();
    349 
    350         session.unsubscribe(OutboxSubId(0));
    351         session.new_filters(OutboxSubId(0), trivial_filter());
    352 
    353         // Filters should overwrite unsubscribe
    354         assert!(matches!(
    355             expect_task(&session, OutboxSubId(0)),
    356             OutboxTask::Modify(ModifyTask::Filters(_))
    357         ));
    358     }
    359 
    360     /// Updating filters of a Full modification replaces its filter list.
    361     #[test]
    362     fn outbox_session_update_full_modification_filters() {
    363         let mut session = OutboxSession::default();
    364 
    365         // Create full modification
    366         session.new_filters(OutboxSubId(0), trivial_filter());
    367         session.new_relays(OutboxSubId(0), HashSet::new());
    368 
    369         // Update filters on the full modification
    370         session.new_filters(
    371             OutboxSubId(0),
    372             vec![
    373                 Filter::new().kinds(vec![3]).build(),
    374                 Filter::new().kinds(vec![1]).build(),
    375             ],
    376         );
    377 
    378         match expect_task(&session, OutboxSubId(0)) {
    379             OutboxTask::Modify(ModifyTask::Full(fm)) => {
    380                 assert_eq!(fm.filters.len(), 2);
    381             }
    382             _ => panic!("Expected Modify(Full)"),
    383         }
    384     }
    385 
    386     /// Updating relays of a Full modification replaces its relay set.
    387     #[test]
    388     fn outbox_session_update_full_modification_relays() {
    389         let mut session = OutboxSession::default();
    390 
    391         // Create full modification
    392         session.new_filters(OutboxSubId(0), trivial_filter());
    393         session.new_relays(OutboxSubId(0), HashSet::new());
    394 
    395         // Update relays on the full modification
    396         let mut new_urls = HashSet::new();
    397         new_urls.insert(NormRelayUrl::new("wss://relay.example.com").unwrap());
    398         session.new_relays(OutboxSubId(0), new_urls);
    399 
    400         match expect_task(&session, OutboxSubId(0)) {
    401             OutboxTask::Modify(ModifyTask::Full(fm)) => {
    402                 assert!(!fm.relays.is_empty());
    403             }
    404             _ => panic!("Expected Modify(Full)"),
    405         }
    406     }
    407 
    408     /// Attempting to modify oneshot filters leaves them unchanged.
    409     #[test]
    410     fn outbox_session_update_oneshot_filters() {
    411         let mut session = OutboxSession::default();
    412         let urls = RelayUrlPkgs::new(HashSet::new());
    413 
    414         session.oneshot(OutboxSubId(0), trivial_filter(), urls);
    415         session.new_filters(
    416             OutboxSubId(0),
    417             vec![
    418                 Filter::new().kinds([1]).build(),
    419                 Filter::new().kinds([3]).build(),
    420             ],
    421         );
    422 
    423         match expect_task(&session, OutboxSubId(0)) {
    424             OutboxTask::Oneshot(task) => {
    425                 assert_eq!(task.filters.len(), 1);
    426             }
    427             _ => panic!("Expected Oneshot task"),
    428         }
    429     }
    430 
    431     /// Updating filters on a Subscribe task replaces the stored filters.
    432     #[test]
    433     fn outbox_session_update_subscribe_filters() {
    434         let mut session = OutboxSession::default();
    435         let urls = RelayUrlPkgs::new(HashSet::new());
    436 
    437         session.subscribe(OutboxSubId(0), trivial_filter(), urls);
    438         session.new_filters(
    439             OutboxSubId(0),
    440             vec![
    441                 Filter::new().kinds([1]).build(),
    442                 Filter::new().kinds([3]).build(),
    443             ],
    444         );
    445 
    446         match expect_task(&session, OutboxSubId(0)) {
    447             OutboxTask::Subscribe(task) => {
    448                 assert_eq!(task.filters.len(), 2);
    449             }
    450             _ => panic!("Expected Subscribe task"),
    451         }
    452     }
    453 
    454     /// Updating relays on a Subscribe task replaces the stored relays.
    455     #[test]
    456     fn outbox_session_update_subscribe_relays() {
    457         let mut session = OutboxSession::default();
    458         let urls = RelayUrlPkgs::new(HashSet::new());
    459 
    460         session.subscribe(OutboxSubId(0), trivial_filter(), urls);
    461 
    462         let mut new_urls = HashSet::new();
    463         new_urls.insert(NormRelayUrl::new("wss://relay.example.com").unwrap());
    464         session.new_relays(OutboxSubId(0), new_urls);
    465 
    466         match expect_task(&session, OutboxSubId(0)) {
    467             OutboxTask::Subscribe(task) => {
    468                 assert!(!task.relays.urls.is_empty());
    469             }
    470             _ => panic!("Expected Subscribe task"),
    471         }
    472     }
    473 
    474     /// Attempting to modify oneshot relays leaves them unchanged.
    475     #[test]
    476     fn outbox_session_update_oneshot_relays() {
    477         let mut session = OutboxSession::default();
    478         let urls = RelayUrlPkgs::new(HashSet::new());
    479 
    480         session.oneshot(OutboxSubId(0), trivial_filter(), urls);
    481 
    482         let mut new_urls = HashSet::new();
    483         new_urls.insert(NormRelayUrl::new("wss://relay.example.com").unwrap());
    484         session.new_relays(OutboxSubId(0), new_urls);
    485 
    486         match expect_task(&session, OutboxSubId(0)) {
    487             OutboxTask::Oneshot(task) => {
    488                 assert!(
    489                     task.relays.urls.is_empty(),
    490                     "cannot make modifications on oneshot"
    491                 );
    492             }
    493             _ => panic!("Expected Oneshot task"),
    494         }
    495     }
    496 }