notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

multi_subscriber.rs (17359B)


      1 use egui_nav::ReturnType;
      2 use enostr::{Filter, NoteId, RelayPool};
      3 use hashbrown::HashMap;
      4 use nostrdb::{Ndb, Subscription};
      5 use notedeck::{filter::HybridFilter, UnifiedSubscription};
      6 use uuid::Uuid;
      7 
      8 use crate::{subscriptions, timeline::ThreadSelection};
      9 
     10 type RootNoteId = NoteId;
     11 
     12 #[derive(Default)]
     13 pub struct ThreadSubs {
     14     pub remotes: HashMap<RootNoteId, Remote>,
     15     scopes: HashMap<MetaId, Vec<Scope>>,
     16 }
     17 
     18 // column id
     19 type MetaId = usize;
     20 
     21 pub struct Remote {
     22     pub filter: Vec<Filter>,
     23     subid: String,
     24     dependers: usize,
     25 }
     26 
     27 impl std::fmt::Debug for Remote {
     28     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     29         f.debug_struct("Remote")
     30             .field("subid", &self.subid)
     31             .field("dependers", &self.dependers)
     32             .finish()
     33     }
     34 }
     35 
     36 struct Scope {
     37     pub root_id: NoteId,
     38     stack: Vec<Sub>,
     39 }
     40 
     41 pub struct Sub {
     42     pub selected_id: NoteId,
     43     pub sub: Subscription,
     44     pub filter: Vec<Filter>,
     45 }
     46 
     47 impl ThreadSubs {
     48     #[allow(clippy::too_many_arguments)]
     49     pub fn subscribe(
     50         &mut self,
     51         ndb: &mut Ndb,
     52         pool: &mut RelayPool,
     53         meta_id: usize,
     54         id: &ThreadSelection,
     55         local_sub_filter: Vec<Filter>,
     56         new_scope: bool,
     57         remote_sub_filter: impl FnOnce() -> Vec<Filter>,
     58     ) {
     59         let cur_scopes = self.scopes.entry(meta_id).or_default();
     60 
     61         let new_subs = if new_scope || cur_scopes.is_empty() {
     62             local_sub_new_scope(ndb, id, local_sub_filter, cur_scopes)
     63         } else {
     64             let cur_scope = cur_scopes.last_mut().expect("can't be empty");
     65             sub_current_scope(ndb, id, local_sub_filter, cur_scope)
     66         };
     67 
     68         let remote = match self.remotes.raw_entry_mut().from_key(&id.root_id.bytes()) {
     69             hashbrown::hash_map::RawEntryMut::Occupied(entry) => entry.into_mut(),
     70             hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
     71                 let (_, res) = entry.insert(
     72                     NoteId::new(*id.root_id.bytes()),
     73                     sub_remote(pool, remote_sub_filter, id),
     74                 );
     75 
     76                 res
     77             }
     78         };
     79 
     80         remote.dependers = remote.dependers.saturating_add_signed(new_subs);
     81         let num_dependers = remote.dependers;
     82         tracing::debug!(
     83             "Sub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}",
     84             self.remotes.len(),
     85             self.scopes.len(),
     86             num_dependers,
     87         );
     88     }
     89 
     90     pub fn unsubscribe(
     91         &mut self,
     92         ndb: &mut Ndb,
     93         pool: &mut RelayPool,
     94         meta_id: usize,
     95         id: &ThreadSelection,
     96         return_type: ReturnType,
     97     ) {
     98         let Some(scopes) = self.scopes.get_mut(&meta_id) else {
     99             return;
    100         };
    101 
    102         let Some(remote) = self.remotes.get_mut(&id.root_id.bytes()) else {
    103             tracing::error!("somehow we're unsubscribing but we don't have a remote");
    104             return;
    105         };
    106 
    107         match return_type {
    108             ReturnType::Drag => {
    109                 if let Some(scope) = scopes.last_mut() {
    110                     let Some(cur_sub) = scope.stack.pop() else {
    111                         tracing::error!("expected a scope to be left");
    112                         return;
    113                     };
    114 
    115                     if scope.root_id.bytes() != id.root_id.bytes() {
    116                         tracing::error!(
    117                             "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}",
    118                             scope.root_id.hex(),
    119                             id.root_id.bytes()
    120                         );
    121                     }
    122 
    123                     if ndb_unsub(ndb, cur_sub.sub, id) {
    124                         remote.dependers = remote.dependers.saturating_sub(1);
    125                     }
    126 
    127                     if scope.stack.is_empty() {
    128                         scopes.pop();
    129                     }
    130                 }
    131             }
    132             ReturnType::Click => {
    133                 let Some(scope) = scopes.pop() else {
    134                     tracing::error!("called unsubscribe but there aren't any scopes left");
    135                     return;
    136                 };
    137 
    138                 if scope.root_id.bytes() != id.root_id.bytes() {
    139                     tracing::error!(
    140                         "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}",
    141                         scope.root_id.hex(),
    142                         id.root_id.bytes()
    143                     );
    144                 }
    145                 for sub in scope.stack {
    146                     if ndb_unsub(ndb, sub.sub, id) {
    147                         remote.dependers = remote.dependers.saturating_sub(1);
    148                     }
    149                 }
    150             }
    151         }
    152 
    153         if scopes.is_empty() {
    154             self.scopes.remove(&meta_id);
    155         }
    156 
    157         let num_dependers = remote.dependers;
    158 
    159         if remote.dependers == 0 {
    160             let remote = self
    161                 .remotes
    162                 .remove(&id.root_id.bytes())
    163                 .expect("code above should guarentee existence");
    164             tracing::debug!("Remotely unsubscribed: {}", remote.subid);
    165             pool.unsubscribe(remote.subid);
    166         }
    167 
    168         tracing::debug!(
    169             "unsub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}",
    170             self.remotes.len(),
    171             self.scopes.len(),
    172             num_dependers,
    173         );
    174     }
    175 
    176     pub fn get_local(&self, meta_id: usize) -> Option<&Sub> {
    177         self.scopes
    178             .get(&meta_id)
    179             .as_ref()
    180             .and_then(|s| s.last())
    181             .and_then(|s| s.stack.last())
    182     }
    183 }
    184 
    185 fn sub_current_scope(
    186     ndb: &mut Ndb,
    187     selection: &ThreadSelection,
    188     local_sub_filter: Vec<Filter>,
    189     cur_scope: &mut Scope,
    190 ) -> isize {
    191     let mut new_subs = 0;
    192 
    193     if selection.root_id.bytes() != cur_scope.root_id.bytes() {
    194         tracing::error!(
    195             "Somehow the current scope's root is not equal to the selected note's root"
    196         );
    197     }
    198 
    199     if let Some(sub) = ndb_sub(ndb, &local_sub_filter, selection) {
    200         cur_scope.stack.push(Sub {
    201             selected_id: NoteId::new(*selection.selected_or_root()),
    202             sub,
    203             filter: local_sub_filter,
    204         });
    205         new_subs += 1;
    206     }
    207 
    208     new_subs
    209 }
    210 
    211 fn ndb_sub(ndb: &Ndb, filter: &[Filter], id: impl std::fmt::Debug) -> Option<Subscription> {
    212     match ndb.subscribe(filter) {
    213         Ok(s) => Some(s),
    214         Err(e) => {
    215             tracing::error!("Failed to get subscription for {:?}: {e}", id);
    216             None
    217         }
    218     }
    219 }
    220 
    221 fn ndb_unsub(ndb: &mut Ndb, sub: Subscription, id: impl std::fmt::Debug) -> bool {
    222     match ndb.unsubscribe(sub) {
    223         Ok(_) => true,
    224         Err(e) => {
    225             tracing::error!("Failed to unsub {:?}: {e}", id);
    226             false
    227         }
    228     }
    229 }
    230 
    231 fn sub_remote(
    232     pool: &mut RelayPool,
    233     remote_sub_filter: impl FnOnce() -> Vec<Filter>,
    234     id: impl std::fmt::Debug,
    235 ) -> Remote {
    236     let subid = Uuid::new_v4().to_string();
    237 
    238     let filter = remote_sub_filter();
    239 
    240     let remote = Remote {
    241         filter: filter.clone(),
    242         subid: subid.clone(),
    243         dependers: 0,
    244     };
    245 
    246     tracing::debug!("Remote subscribe for {:?}", id);
    247 
    248     pool.subscribe(subid, filter);
    249 
    250     remote
    251 }
    252 
    253 fn local_sub_new_scope(
    254     ndb: &mut Ndb,
    255     id: &ThreadSelection,
    256     local_sub_filter: Vec<Filter>,
    257     scopes: &mut Vec<Scope>,
    258 ) -> isize {
    259     let Some(sub) = ndb_sub(ndb, &local_sub_filter, id) else {
    260         return 0;
    261     };
    262 
    263     scopes.push(Scope {
    264         root_id: id.root_id.to_note_id(),
    265         stack: vec![Sub {
    266             selected_id: NoteId::new(*id.selected_or_root()),
    267             sub,
    268             filter: local_sub_filter,
    269         }],
    270     });
    271 
    272     1
    273 }
    274 
    275 #[derive(Debug)]
    276 pub struct TimelineSub {
    277     filter: Option<HybridFilter>,
    278     state: SubState,
    279 }
    280 
    281 #[derive(Debug, Clone)]
    282 enum SubState {
    283     NoSub {
    284         dependers: usize,
    285     },
    286     LocalOnly {
    287         local: Subscription,
    288         dependers: usize,
    289     },
    290     RemoteOnly {
    291         remote: String,
    292         dependers: usize,
    293     },
    294     Unified {
    295         unified: UnifiedSubscription,
    296         dependers: usize,
    297     },
    298 }
    299 
    300 impl Default for TimelineSub {
    301     fn default() -> Self {
    302         Self {
    303             state: SubState::NoSub { dependers: 0 },
    304             filter: None,
    305         }
    306     }
    307 }
    308 
    309 impl TimelineSub {
    310     /// Reset the subscription state, properly unsubscribing from ndb and
    311     /// relay pool before clearing.
    312     ///
    313     /// Used when the contact list changes and we need to rebuild the
    314     /// timeline with a new filter. Preserves the depender count so that
    315     /// shared subscription reference counting remains correct.
    316     pub fn reset(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) {
    317         let before = self.state.clone();
    318 
    319         let dependers = match &self.state {
    320             SubState::NoSub { dependers } => *dependers,
    321 
    322             SubState::LocalOnly { local, dependers } => {
    323                 if let Err(e) = ndb.unsubscribe(*local) {
    324                     tracing::error!("TimelineSub::reset: failed to unsubscribe from ndb: {e}");
    325                 }
    326                 *dependers
    327             }
    328 
    329             SubState::RemoteOnly { remote, dependers } => {
    330                 pool.unsubscribe(remote.to_owned());
    331                 *dependers
    332             }
    333 
    334             SubState::Unified { unified, dependers } => {
    335                 pool.unsubscribe(unified.remote.to_owned());
    336                 if let Err(e) = ndb.unsubscribe(unified.local) {
    337                     tracing::error!("TimelineSub::reset: failed to unsubscribe from ndb: {e}");
    338                 }
    339                 *dependers
    340             }
    341         };
    342 
    343         self.state = SubState::NoSub { dependers };
    344         self.filter = None;
    345 
    346         tracing::debug!("TimelineSub::reset: {:?} => {:?}", before, self.state);
    347     }
    348 
    349     pub fn try_add_local(&mut self, ndb: &Ndb, filter: &HybridFilter) {
    350         let before = self.state.clone();
    351         match &mut self.state {
    352             SubState::NoSub { dependers } => {
    353                 let Some(sub) = ndb_sub(ndb, &filter.local().combined(), "") else {
    354                     return;
    355                 };
    356 
    357                 self.filter = Some(filter.to_owned());
    358                 self.state = SubState::LocalOnly {
    359                     local: sub,
    360                     dependers: *dependers,
    361                 }
    362             }
    363             SubState::LocalOnly {
    364                 local: _,
    365                 dependers: _,
    366             } => {}
    367             SubState::RemoteOnly { remote, dependers } => {
    368                 let Some(local) = ndb_sub(ndb, &filter.local().combined(), "") else {
    369                     return;
    370                 };
    371                 self.state = SubState::Unified {
    372                     unified: UnifiedSubscription {
    373                         local,
    374                         remote: remote.to_owned(),
    375                     },
    376                     dependers: *dependers,
    377                 };
    378             }
    379             SubState::Unified {
    380                 unified: _,
    381                 dependers: _,
    382             } => {}
    383         }
    384         tracing::debug!(
    385             "TimelineSub::try_add_local: {:?} => {:?}",
    386             before,
    387             self.state
    388         );
    389     }
    390 
    391     pub fn force_add_remote(&mut self, subid: String) {
    392         let before = self.state.clone();
    393         match &mut self.state {
    394             SubState::NoSub { dependers } => {
    395                 self.state = SubState::RemoteOnly {
    396                     remote: subid,
    397                     dependers: *dependers,
    398                 }
    399             }
    400             SubState::LocalOnly { local, dependers } => {
    401                 self.state = SubState::Unified {
    402                     unified: UnifiedSubscription {
    403                         local: *local,
    404                         remote: subid,
    405                     },
    406                     dependers: *dependers,
    407                 }
    408             }
    409             SubState::RemoteOnly {
    410                 remote: _,
    411                 dependers: _,
    412             } => {}
    413             SubState::Unified {
    414                 unified: _,
    415                 dependers: _,
    416             } => {}
    417         }
    418         tracing::debug!(
    419             "TimelineSub::force_add_remote: {:?} => {:?}",
    420             before,
    421             self.state
    422         );
    423     }
    424 
    425     pub fn try_add_remote(&mut self, pool: &mut RelayPool, filter: &HybridFilter) {
    426         let before = self.state.clone();
    427         match &mut self.state {
    428             SubState::NoSub { dependers } => {
    429                 let subid = subscriptions::new_sub_id();
    430                 pool.subscribe(subid.clone(), filter.remote().to_vec());
    431                 self.filter = Some(filter.to_owned());
    432                 self.state = SubState::RemoteOnly {
    433                     remote: subid,
    434                     dependers: *dependers,
    435                 };
    436             }
    437             SubState::LocalOnly { local, dependers } => {
    438                 let subid = subscriptions::new_sub_id();
    439                 pool.subscribe(subid.clone(), filter.remote().to_vec());
    440                 self.filter = Some(filter.to_owned());
    441                 self.state = SubState::Unified {
    442                     unified: UnifiedSubscription {
    443                         local: *local,
    444                         remote: subid,
    445                     },
    446                     dependers: *dependers,
    447                 }
    448             }
    449             SubState::RemoteOnly {
    450                 remote: _,
    451                 dependers: _,
    452             } => {}
    453             SubState::Unified {
    454                 unified: _,
    455                 dependers: _,
    456             } => {}
    457         }
    458         tracing::debug!(
    459             "TimelineSub::try_add_remote: {:?} => {:?}",
    460             before,
    461             self.state
    462         );
    463     }
    464 
    465     pub fn increment(&mut self) {
    466         let before = self.state.clone();
    467         match &mut self.state {
    468             SubState::NoSub { dependers } => {
    469                 *dependers += 1;
    470             }
    471             SubState::LocalOnly {
    472                 local: _,
    473                 dependers,
    474             } => {
    475                 *dependers += 1;
    476             }
    477             SubState::RemoteOnly {
    478                 remote: _,
    479                 dependers,
    480             } => {
    481                 *dependers += 1;
    482             }
    483             SubState::Unified {
    484                 unified: _,
    485                 dependers,
    486             } => {
    487                 *dependers += 1;
    488             }
    489         }
    490 
    491         tracing::debug!("TimelineSub::increment: {:?} => {:?}", before, self.state);
    492     }
    493 
    494     pub fn get_local(&self) -> Option<Subscription> {
    495         match &self.state {
    496             SubState::NoSub { dependers: _ } => None,
    497             SubState::LocalOnly {
    498                 local,
    499                 dependers: _,
    500             } => Some(*local),
    501             SubState::RemoteOnly {
    502                 remote: _,
    503                 dependers: _,
    504             } => None,
    505             SubState::Unified {
    506                 unified,
    507                 dependers: _,
    508             } => Some(unified.local),
    509         }
    510     }
    511 
    512     pub fn unsubscribe_or_decrement(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) {
    513         let before = self.state.clone();
    514         's: {
    515             match &mut self.state {
    516                 SubState::NoSub { dependers } => *dependers = dependers.saturating_sub(1),
    517                 SubState::LocalOnly { local, dependers } => {
    518                     if *dependers > 1 {
    519                         *dependers = dependers.saturating_sub(1);
    520                         break 's;
    521                     }
    522 
    523                     if let Err(e) = ndb.unsubscribe(*local) {
    524                         tracing::error!("Could not unsub ndb: {e}");
    525                         break 's;
    526                     }
    527 
    528                     self.state = SubState::NoSub { dependers: 0 };
    529                 }
    530                 SubState::RemoteOnly { remote, dependers } => {
    531                     if *dependers > 1 {
    532                         *dependers = dependers.saturating_sub(1);
    533                         break 's;
    534                     }
    535 
    536                     pool.unsubscribe(remote.to_owned());
    537 
    538                     self.state = SubState::NoSub { dependers: 0 };
    539                 }
    540                 SubState::Unified { unified, dependers } => {
    541                     if *dependers > 1 {
    542                         *dependers = dependers.saturating_sub(1);
    543                         break 's;
    544                     }
    545 
    546                     pool.unsubscribe(unified.remote.to_owned());
    547 
    548                     if let Err(e) = ndb.unsubscribe(unified.local) {
    549                         tracing::error!("could not unsub ndb: {e}");
    550                         self.state = SubState::LocalOnly {
    551                             local: unified.local,
    552                             dependers: *dependers,
    553                         }
    554                     } else {
    555                         self.state = SubState::NoSub { dependers: 0 };
    556                     }
    557                 }
    558             }
    559         }
    560         tracing::debug!(
    561             "TimelineSub::unsubscribe_or_decrement: {:?} => {:?}",
    562             before,
    563             self.state
    564         );
    565     }
    566 
    567     pub fn get_filter(&self) -> Option<&HybridFilter> {
    568         self.filter.as_ref()
    569     }
    570 
    571     pub fn no_sub(&self) -> bool {
    572         matches!(self.state, SubState::NoSub { dependers: _ })
    573     }
    574 }