notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

timeline_sub.rs (4178B)


      1 use enostr::Pubkey;
      2 use hashbrown::HashMap;
      3 use nostrdb::{Ndb, Subscription};
      4 use notedeck::filter::HybridFilter;
      5 
      6 use crate::timeline::sub::ndb_sub;
      7 
      8 /// Per-account local timeline subscription state with ref-counting.
      9 ///
     10 /// Remote timeline relay subscriptions are managed by scoped subs; this type
     11 /// only tracks local NostrDB subscriptions and active dependers.
     12 #[derive(Debug, Default)]
     13 pub struct TimelineSub {
     14     by_account: HashMap<Pubkey, AccountSubState>,
     15 }
     16 
     17 #[derive(Debug, Clone, Copy, Default)]
     18 struct AccountSubState {
     19     local: Option<Subscription>,
     20     dependers: usize,
     21     remote_seeded: bool,
     22 }
     23 
     24 fn should_remove_account_state(state: &AccountSubState) -> bool {
     25     state.dependers == 0 && state.local.is_none()
     26 }
     27 
     28 fn unsubscribe_local_with_rollback(ndb: &mut Ndb, local: &mut Option<Subscription>, context: &str) {
     29     let Some(local_sub) = local.take() else {
     30         return;
     31     };
     32 
     33     if let Err(e) = ndb.unsubscribe(local_sub) {
     34         tracing::error!("{context}: ndb unsubscribe failed: {e}");
     35         *local = Some(local_sub);
     36     }
     37 }
     38 
     39 impl TimelineSub {
     40     fn state_for_account(&self, account_pk: &Pubkey) -> AccountSubState {
     41         self.by_account.get(account_pk).copied().unwrap_or_default()
     42     }
     43 
     44     fn state_for_account_mut(&mut self, account_pk: Pubkey) -> &mut AccountSubState {
     45         self.by_account.entry(account_pk).or_default()
     46     }
     47 
     48     /// Reset one account's local subscription state while preserving its depender count.
     49     pub fn reset_for_account(&mut self, account_pk: Pubkey, ndb: &mut Ndb) {
     50         let mut remove_account_state = false;
     51 
     52         if let Some(state) = self.by_account.get_mut(&account_pk) {
     53             unsubscribe_local_with_rollback(
     54                 ndb,
     55                 &mut state.local,
     56                 "TimelineSub::reset_for_account",
     57             );
     58             remove_account_state = should_remove_account_state(state);
     59         }
     60 
     61         if remove_account_state {
     62             self.by_account.remove(&account_pk);
     63         }
     64     }
     65 
     66     pub fn try_add_local(&mut self, account_pk: Pubkey, ndb: &Ndb, filter: &HybridFilter) {
     67         let state = self.state_for_account_mut(account_pk);
     68         if state.local.is_some() {
     69             return;
     70         }
     71 
     72         if let Some(sub) = ndb_sub(ndb, &filter.local().combined(), "") {
     73             state.local = Some(sub);
     74         }
     75     }
     76 
     77     pub fn increment(&mut self, account_pk: Pubkey) {
     78         self.state_for_account_mut(account_pk).dependers += 1;
     79     }
     80 
     81     pub fn remote_seeded(&self, account_pk: &Pubkey) -> bool {
     82         self.state_for_account(account_pk).remote_seeded
     83     }
     84 
     85     pub fn mark_remote_seeded(&mut self, account_pk: Pubkey) {
     86         self.state_for_account_mut(account_pk).remote_seeded = true;
     87     }
     88 
     89     pub fn clear_remote_seeded(&mut self, account_pk: Pubkey) {
     90         self.state_for_account_mut(account_pk).remote_seeded = false;
     91     }
     92 
     93     pub fn get_local(&self, account_pk: &Pubkey) -> Option<Subscription> {
     94         self.state_for_account(account_pk).local
     95     }
     96 
     97     pub fn unsubscribe_or_decrement(&mut self, account_pk: Pubkey, ndb: &mut Ndb) {
     98         let mut remove_account_state = false;
     99         if let Some(state) = self.by_account.get_mut(&account_pk) {
    100             if state.dependers > 1 {
    101                 state.dependers = state.dependers.saturating_sub(1);
    102                 return;
    103             }
    104 
    105             state.dependers = state.dependers.saturating_sub(1);
    106             state.remote_seeded = false;
    107             unsubscribe_local_with_rollback(
    108                 ndb,
    109                 &mut state.local,
    110                 "TimelineSub::unsubscribe_or_decrement",
    111             );
    112             remove_account_state = should_remove_account_state(state);
    113         }
    114 
    115         if remove_account_state {
    116             self.by_account.remove(&account_pk);
    117         }
    118     }
    119 
    120     pub fn no_sub(&self, account_pk: &Pubkey) -> bool {
    121         let state = self.state_for_account(account_pk);
    122         state.dependers == 0
    123     }
    124 
    125     pub fn has_any_subs(&self) -> bool {
    126         !self.by_account.is_empty()
    127     }
    128 
    129     pub fn dependers(&self, account_pk: &Pubkey) -> usize {
    130         self.state_for_account(account_pk).dependers
    131     }
    132 }