notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

multi_subscriber.rs (15852B)


      1 use egui_nav::ReturnType;
      2 use enostr::{Filter, NoteId, RelayPool};
      3 use hashbrown::HashMap;
      4 use nostrdb::{Ndb, Subscription};
      5 use notedeck::{filter::HybridFilter, UnifiedSubscription};
      6 use uuid::Uuid;
      7 
      8 use crate::{subscriptions, timeline::ThreadSelection};
      9 
     10 type RootNoteId = NoteId;
     11 
     12 #[derive(Default)]
     13 pub struct ThreadSubs {
     14     pub remotes: HashMap<RootNoteId, Remote>,
     15     scopes: HashMap<MetaId, Vec<Scope>>,
     16 }
     17 
     18 // column id
     19 type MetaId = usize;
     20 
     21 pub struct Remote {
     22     pub filter: Vec<Filter>,
     23     subid: String,
     24     dependers: usize,
     25 }
     26 
     27 impl std::fmt::Debug for Remote {
     28     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     29         f.debug_struct("Remote")
     30             .field("subid", &self.subid)
     31             .field("dependers", &self.dependers)
     32             .finish()
     33     }
     34 }
     35 
     36 struct Scope {
     37     pub root_id: NoteId,
     38     stack: Vec<Sub>,
     39 }
     40 
     41 pub struct Sub {
     42     pub selected_id: NoteId,
     43     pub sub: Subscription,
     44     pub filter: Vec<Filter>,
     45 }
     46 
     47 impl ThreadSubs {
     48     #[allow(clippy::too_many_arguments)]
     49     pub fn subscribe(
     50         &mut self,
     51         ndb: &mut Ndb,
     52         pool: &mut RelayPool,
     53         meta_id: usize,
     54         id: &ThreadSelection,
     55         local_sub_filter: Vec<Filter>,
     56         new_scope: bool,
     57         remote_sub_filter: impl FnOnce() -> Vec<Filter>,
     58     ) {
     59         let cur_scopes = self.scopes.entry(meta_id).or_default();
     60 
     61         let new_subs = if new_scope || cur_scopes.is_empty() {
     62             local_sub_new_scope(ndb, id, local_sub_filter, cur_scopes)
     63         } else {
     64             let cur_scope = cur_scopes.last_mut().expect("can't be empty");
     65             sub_current_scope(ndb, id, local_sub_filter, cur_scope)
     66         };
     67 
     68         let remote = match self.remotes.raw_entry_mut().from_key(&id.root_id.bytes()) {
     69             hashbrown::hash_map::RawEntryMut::Occupied(entry) => entry.into_mut(),
     70             hashbrown::hash_map::RawEntryMut::Vacant(entry) => {
     71                 let (_, res) = entry.insert(
     72                     NoteId::new(*id.root_id.bytes()),
     73                     sub_remote(pool, remote_sub_filter, id),
     74                 );
     75 
     76                 res
     77             }
     78         };
     79 
     80         remote.dependers = remote.dependers.saturating_add_signed(new_subs);
     81         let num_dependers = remote.dependers;
     82         tracing::debug!(
     83             "Sub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}",
     84             self.remotes.len(),
     85             self.scopes.len(),
     86             num_dependers,
     87         );
     88     }
     89 
     90     pub fn unsubscribe(
     91         &mut self,
     92         ndb: &mut Ndb,
     93         pool: &mut RelayPool,
     94         meta_id: usize,
     95         id: &ThreadSelection,
     96         return_type: ReturnType,
     97     ) {
     98         let Some(scopes) = self.scopes.get_mut(&meta_id) else {
     99             return;
    100         };
    101 
    102         let Some(remote) = self.remotes.get_mut(&id.root_id.bytes()) else {
    103             tracing::error!("somehow we're unsubscribing but we don't have a remote");
    104             return;
    105         };
    106 
    107         match return_type {
    108             ReturnType::Drag => {
    109                 if let Some(scope) = scopes.last_mut() {
    110                     let Some(cur_sub) = scope.stack.pop() else {
    111                         tracing::error!("expected a scope to be left");
    112                         return;
    113                     };
    114 
    115                     if scope.root_id.bytes() != id.root_id.bytes() {
    116                         tracing::error!(
    117                             "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}",
    118                             scope.root_id.hex(),
    119                             id.root_id.bytes()
    120                         );
    121                     }
    122 
    123                     if ndb_unsub(ndb, cur_sub.sub, id) {
    124                         remote.dependers = remote.dependers.saturating_sub(1);
    125                     }
    126 
    127                     if scope.stack.is_empty() {
    128                         scopes.pop();
    129                     }
    130                 }
    131             }
    132             ReturnType::Click => {
    133                 let Some(scope) = scopes.pop() else {
    134                     tracing::error!("called unsubscribe but there aren't any scopes left");
    135                     return;
    136                 };
    137 
    138                 if scope.root_id.bytes() != id.root_id.bytes() {
    139                     tracing::error!(
    140                         "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}",
    141                         scope.root_id.hex(),
    142                         id.root_id.bytes()
    143                     );
    144                 }
    145                 for sub in scope.stack {
    146                     if ndb_unsub(ndb, sub.sub, id) {
    147                         remote.dependers = remote.dependers.saturating_sub(1);
    148                     }
    149                 }
    150             }
    151         }
    152 
    153         if scopes.is_empty() {
    154             self.scopes.remove(&meta_id);
    155         }
    156 
    157         let num_dependers = remote.dependers;
    158 
    159         if remote.dependers == 0 {
    160             let remote = self
    161                 .remotes
    162                 .remove(&id.root_id.bytes())
    163                 .expect("code above should guarentee existence");
    164             tracing::debug!("Remotely unsubscribed: {}", remote.subid);
    165             pool.unsubscribe(remote.subid);
    166         }
    167 
    168         tracing::debug!(
    169             "unsub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}",
    170             self.remotes.len(),
    171             self.scopes.len(),
    172             num_dependers,
    173         );
    174     }
    175 
    176     pub fn get_local(&self, meta_id: usize) -> Option<&Sub> {
    177         self.scopes
    178             .get(&meta_id)
    179             .as_ref()
    180             .and_then(|s| s.last())
    181             .and_then(|s| s.stack.last())
    182     }
    183 }
    184 
    185 fn sub_current_scope(
    186     ndb: &mut Ndb,
    187     selection: &ThreadSelection,
    188     local_sub_filter: Vec<Filter>,
    189     cur_scope: &mut Scope,
    190 ) -> isize {
    191     let mut new_subs = 0;
    192 
    193     if selection.root_id.bytes() != cur_scope.root_id.bytes() {
    194         tracing::error!(
    195             "Somehow the current scope's root is not equal to the selected note's root"
    196         );
    197     }
    198 
    199     if let Some(sub) = ndb_sub(ndb, &local_sub_filter, selection) {
    200         cur_scope.stack.push(Sub {
    201             selected_id: NoteId::new(*selection.selected_or_root()),
    202             sub,
    203             filter: local_sub_filter,
    204         });
    205         new_subs += 1;
    206     }
    207 
    208     new_subs
    209 }
    210 
    211 fn ndb_sub(ndb: &Ndb, filter: &[Filter], id: impl std::fmt::Debug) -> Option<Subscription> {
    212     match ndb.subscribe(filter) {
    213         Ok(s) => Some(s),
    214         Err(e) => {
    215             tracing::error!("Failed to get subscription for {:?}: {e}", id);
    216             None
    217         }
    218     }
    219 }
    220 
    221 fn ndb_unsub(ndb: &mut Ndb, sub: Subscription, id: impl std::fmt::Debug) -> bool {
    222     match ndb.unsubscribe(sub) {
    223         Ok(_) => true,
    224         Err(e) => {
    225             tracing::error!("Failed to unsub {:?}: {e}", id);
    226             false
    227         }
    228     }
    229 }
    230 
    231 fn sub_remote(
    232     pool: &mut RelayPool,
    233     remote_sub_filter: impl FnOnce() -> Vec<Filter>,
    234     id: impl std::fmt::Debug,
    235 ) -> Remote {
    236     let subid = Uuid::new_v4().to_string();
    237 
    238     let filter = remote_sub_filter();
    239 
    240     let remote = Remote {
    241         filter: filter.clone(),
    242         subid: subid.clone(),
    243         dependers: 0,
    244     };
    245 
    246     tracing::debug!("Remote subscribe for {:?}", id);
    247 
    248     pool.subscribe(subid, filter);
    249 
    250     remote
    251 }
    252 
    253 fn local_sub_new_scope(
    254     ndb: &mut Ndb,
    255     id: &ThreadSelection,
    256     local_sub_filter: Vec<Filter>,
    257     scopes: &mut Vec<Scope>,
    258 ) -> isize {
    259     let Some(sub) = ndb_sub(ndb, &local_sub_filter, id) else {
    260         return 0;
    261     };
    262 
    263     scopes.push(Scope {
    264         root_id: id.root_id.to_note_id(),
    265         stack: vec![Sub {
    266             selected_id: NoteId::new(*id.selected_or_root()),
    267             sub,
    268             filter: local_sub_filter,
    269         }],
    270     });
    271 
    272     1
    273 }
    274 
    275 #[derive(Debug)]
    276 pub struct TimelineSub {
    277     filter: Option<HybridFilter>,
    278     state: SubState,
    279 }
    280 
    281 #[derive(Debug, Clone)]
    282 enum SubState {
    283     NoSub {
    284         dependers: usize,
    285     },
    286     LocalOnly {
    287         local: Subscription,
    288         dependers: usize,
    289     },
    290     RemoteOnly {
    291         remote: String,
    292         dependers: usize,
    293     },
    294     Unified {
    295         unified: UnifiedSubscription,
    296         dependers: usize,
    297     },
    298 }
    299 
    300 impl Default for TimelineSub {
    301     fn default() -> Self {
    302         Self {
    303             state: SubState::NoSub { dependers: 0 },
    304             filter: None,
    305         }
    306     }
    307 }
    308 
    309 impl TimelineSub {
    310     pub fn try_add_local(&mut self, ndb: &Ndb, filter: &HybridFilter) {
    311         let before = self.state.clone();
    312         match &mut self.state {
    313             SubState::NoSub { dependers } => {
    314                 let Some(sub) = ndb_sub(ndb, filter.local(), "") else {
    315                     return;
    316                 };
    317 
    318                 self.filter = Some(filter.to_owned());
    319                 self.state = SubState::LocalOnly {
    320                     local: sub,
    321                     dependers: *dependers,
    322                 }
    323             }
    324             SubState::LocalOnly {
    325                 local: _,
    326                 dependers: _,
    327             } => {}
    328             SubState::RemoteOnly { remote, dependers } => {
    329                 let Some(local) = ndb_sub(ndb, filter.local(), "") else {
    330                     return;
    331                 };
    332                 self.state = SubState::Unified {
    333                     unified: UnifiedSubscription {
    334                         local,
    335                         remote: remote.to_owned(),
    336                     },
    337                     dependers: *dependers,
    338                 };
    339             }
    340             SubState::Unified {
    341                 unified: _,
    342                 dependers: _,
    343             } => {}
    344         }
    345         tracing::debug!(
    346             "TimelineSub::try_add_local: {:?} => {:?}",
    347             before,
    348             self.state
    349         );
    350     }
    351 
    352     pub fn force_add_remote(&mut self, subid: String) {
    353         let before = self.state.clone();
    354         match &mut self.state {
    355             SubState::NoSub { dependers } => {
    356                 self.state = SubState::RemoteOnly {
    357                     remote: subid,
    358                     dependers: *dependers,
    359                 }
    360             }
    361             SubState::LocalOnly { local, dependers } => {
    362                 self.state = SubState::Unified {
    363                     unified: UnifiedSubscription {
    364                         local: *local,
    365                         remote: subid,
    366                     },
    367                     dependers: *dependers,
    368                 }
    369             }
    370             SubState::RemoteOnly {
    371                 remote: _,
    372                 dependers: _,
    373             } => {}
    374             SubState::Unified {
    375                 unified: _,
    376                 dependers: _,
    377             } => {}
    378         }
    379         tracing::debug!(
    380             "TimelineSub::force_add_remote: {:?} => {:?}",
    381             before,
    382             self.state
    383         );
    384     }
    385 
    386     pub fn try_add_remote(&mut self, pool: &mut RelayPool, filter: &HybridFilter) {
    387         let before = self.state.clone();
    388         match &mut self.state {
    389             SubState::NoSub { dependers } => {
    390                 let subid = subscriptions::new_sub_id();
    391                 pool.subscribe(subid.clone(), filter.remote().to_vec());
    392                 self.filter = Some(filter.to_owned());
    393                 self.state = SubState::RemoteOnly {
    394                     remote: subid,
    395                     dependers: *dependers,
    396                 };
    397             }
    398             SubState::LocalOnly { local, dependers } => {
    399                 let subid = subscriptions::new_sub_id();
    400                 pool.subscribe(subid.clone(), filter.remote().to_vec());
    401                 self.filter = Some(filter.to_owned());
    402                 self.state = SubState::Unified {
    403                     unified: UnifiedSubscription {
    404                         local: *local,
    405                         remote: subid,
    406                     },
    407                     dependers: *dependers,
    408                 }
    409             }
    410             SubState::RemoteOnly {
    411                 remote: _,
    412                 dependers: _,
    413             } => {}
    414             SubState::Unified {
    415                 unified: _,
    416                 dependers: _,
    417             } => {}
    418         }
    419         tracing::debug!(
    420             "TimelineSub::try_add_remote: {:?} => {:?}",
    421             before,
    422             self.state
    423         );
    424     }
    425 
    426     pub fn increment(&mut self) {
    427         let before = self.state.clone();
    428         match &mut self.state {
    429             SubState::NoSub { dependers } => {
    430                 *dependers += 1;
    431             }
    432             SubState::LocalOnly {
    433                 local: _,
    434                 dependers,
    435             } => {
    436                 *dependers += 1;
    437             }
    438             SubState::RemoteOnly {
    439                 remote: _,
    440                 dependers,
    441             } => {
    442                 *dependers += 1;
    443             }
    444             SubState::Unified {
    445                 unified: _,
    446                 dependers,
    447             } => {
    448                 *dependers += 1;
    449             }
    450         }
    451 
    452         tracing::debug!("TimelineSub::increment: {:?} => {:?}", before, self.state);
    453     }
    454 
    455     pub fn get_local(&self) -> Option<Subscription> {
    456         match &self.state {
    457             SubState::NoSub { dependers: _ } => None,
    458             SubState::LocalOnly {
    459                 local,
    460                 dependers: _,
    461             } => Some(*local),
    462             SubState::RemoteOnly {
    463                 remote: _,
    464                 dependers: _,
    465             } => None,
    466             SubState::Unified {
    467                 unified,
    468                 dependers: _,
    469             } => Some(unified.local),
    470         }
    471     }
    472 
    473     pub fn unsubscribe_or_decrement(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) {
    474         let before = self.state.clone();
    475         's: {
    476             match &mut self.state {
    477                 SubState::NoSub { dependers } => *dependers = dependers.saturating_sub(1),
    478                 SubState::LocalOnly { local, dependers } => {
    479                     if *dependers > 1 {
    480                         *dependers = dependers.saturating_sub(1);
    481                         break 's;
    482                     }
    483 
    484                     if let Err(e) = ndb.unsubscribe(*local) {
    485                         tracing::error!("Could not unsub ndb: {e}");
    486                         break 's;
    487                     }
    488 
    489                     self.state = SubState::NoSub { dependers: 0 };
    490                 }
    491                 SubState::RemoteOnly { remote, dependers } => {
    492                     if *dependers > 1 {
    493                         *dependers = dependers.saturating_sub(1);
    494                         break 's;
    495                     }
    496 
    497                     pool.unsubscribe(remote.to_owned());
    498 
    499                     self.state = SubState::NoSub { dependers: 0 };
    500                 }
    501                 SubState::Unified { unified, dependers } => {
    502                     if *dependers > 1 {
    503                         *dependers = dependers.saturating_sub(1);
    504                         break 's;
    505                     }
    506 
    507                     pool.unsubscribe(unified.remote.to_owned());
    508 
    509                     if let Err(e) = ndb.unsubscribe(unified.local) {
    510                         tracing::error!("could not unsub ndb: {e}");
    511                         self.state = SubState::LocalOnly {
    512                             local: unified.local,
    513                             dependers: *dependers,
    514                         }
    515                     } else {
    516                         self.state = SubState::NoSub { dependers: 0 };
    517                     }
    518                 }
    519             }
    520         }
    521         tracing::debug!(
    522             "TimelineSub::unsubscribe_or_decrement: {:?} => {:?}",
    523             before,
    524             self.state
    525         );
    526     }
    527 
    528     pub fn get_filter(&self) -> Option<&HybridFilter> {
    529         self.filter.as_ref()
    530     }
    531 
    532     pub fn no_sub(&self) -> bool {
    533         matches!(self.state, SubState::NoSub { dependers: _ })
    534     }
    535 }