notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

cache.rs (11170B)


      1 use crate::{
      2     actionbar::TimelineOpenResult,
      3     error::Error,
      4     timeline::{
      5         drop_timeline_remote_owner, ensure_remote_timeline_subscription, Timeline, TimelineKind,
      6         UnknownPksOwned,
      7     },
      8 };
      9 
     10 use notedeck::ScopedSubApi;
     11 use notedeck::{filter, FilterState, NoteCache, NoteRef};
     12 
     13 use enostr::Pubkey;
     14 use nostrdb::{Filter, Ndb, Transaction};
     15 use std::collections::HashMap;
     16 use tracing::{debug, error, info, warn};
     17 
     18 #[derive(Default)]
     19 pub struct TimelineCache {
     20     timelines: HashMap<TimelineKind, Timeline>,
     21 }
     22 
     23 pub enum Vitality<'a, M> {
     24     Fresh(&'a mut M),
     25     Stale(&'a mut M),
     26 }
     27 
     28 impl<'a, M> Vitality<'a, M> {
     29     pub fn get_ptr(self) -> &'a mut M {
     30         match self {
     31             Self::Fresh(ptr) => ptr,
     32             Self::Stale(ptr) => ptr,
     33         }
     34     }
     35 
     36     pub fn is_stale(&self) -> bool {
     37         match self {
     38             Self::Fresh(_ptr) => false,
     39             Self::Stale(_ptr) => true,
     40         }
     41     }
     42 }
     43 
     44 impl<'a> IntoIterator for &'a mut TimelineCache {
     45     type Item = (&'a TimelineKind, &'a mut Timeline);
     46     type IntoIter = std::collections::hash_map::IterMut<'a, TimelineKind, Timeline>;
     47 
     48     fn into_iter(self) -> Self::IntoIter {
     49         self.timelines.iter_mut()
     50     }
     51 }
     52 
     53 impl TimelineCache {
     54     /// Pop a timeline from the timeline cache. This only removes the timeline
     55     /// if it has reached 0 subscribers, meaning it was the last one to be
     56     /// removed
     57     pub fn pop(
     58         &mut self,
     59         id: &TimelineKind,
     60         ndb: &mut Ndb,
     61         scoped_subs: &mut ScopedSubApi<'_, '_>,
     62     ) -> Result<(), Error> {
     63         let timeline = if let Some(timeline) = self.timelines.get_mut(id) {
     64             timeline
     65         } else {
     66             return Err(Error::TimelineNotFound);
     67         };
     68 
     69         let account_pk = scoped_subs.selected_account_pubkey();
     70         timeline
     71             .subscription
     72             .unsubscribe_or_decrement(account_pk, ndb);
     73 
     74         if timeline.subscription.no_sub(&account_pk) {
     75             timeline.subscription.clear_remote_seeded(account_pk);
     76             drop_timeline_remote_owner(timeline, account_pk, scoped_subs);
     77         }
     78 
     79         if !timeline.subscription.has_any_subs() {
     80             debug!(
     81                 "popped last timeline {:?}, removing from timeline cache",
     82                 id
     83             );
     84             self.timelines.remove(id);
     85         }
     86 
     87         Ok(())
     88     }
     89 
     90     fn get_expected_mut(&mut self, key: &TimelineKind) -> &mut Timeline {
     91         self.timelines
     92             .get_mut(key)
     93             .expect("expected notes in timline cache")
     94     }
     95 
     96     /// Insert a new timeline into the cache, based on the TimelineKind
     97     #[allow(clippy::too_many_arguments)]
     98     fn insert_new(
     99         &mut self,
    100         id: TimelineKind,
    101         txn: &Transaction,
    102         ndb: &Ndb,
    103         notes: &[NoteRef],
    104         note_cache: &mut NoteCache,
    105     ) -> Option<UnknownPksOwned> {
    106         let mut timeline = if let Some(timeline) = id.clone().into_timeline(txn, ndb) {
    107             timeline
    108         } else {
    109             error!("Error creating timeline from {:?}", &id);
    110             return None;
    111         };
    112 
    113         // insert initial notes into timeline
    114         let res = timeline.insert_new(txn, ndb, note_cache, notes);
    115         self.timelines.insert(id, timeline);
    116 
    117         res
    118     }
    119 
    120     pub fn insert(&mut self, id: TimelineKind, account_pk: Pubkey, mut timeline: Timeline) {
    121         if let Some(cur_timeline) = self.timelines.get_mut(&id) {
    122             cur_timeline.subscription.increment(account_pk);
    123             return;
    124         };
    125 
    126         timeline.subscription.increment(account_pk);
    127         self.timelines.insert(id, timeline);
    128     }
    129 
    130     /// Get and/or update the notes associated with this timeline
    131     #[profiling::function]
    132     fn notes<'a>(
    133         &'a mut self,
    134         ndb: &Ndb,
    135         note_cache: &mut NoteCache,
    136         txn: &Transaction,
    137         id: &TimelineKind,
    138     ) -> GetNotesResponse<'a> {
    139         // we can't use the naive hashmap entry API here because lookups
    140         // require a copy, wait until we have a raw entry api. We could
    141         // also use hashbrown?
    142 
    143         if self.timelines.contains_key(id) {
    144             return GetNotesResponse {
    145                 vitality: Vitality::Stale(self.get_expected_mut(id)),
    146                 unknown_pks: None,
    147             };
    148         }
    149 
    150         let notes = if let FilterState::Ready(filters) = id.filters(txn, ndb) {
    151             let mut notes = Vec::new();
    152 
    153             for package in filters.local().packages {
    154                 profiling::scope!("ndb query");
    155                 if let Ok(results) = ndb.query(txn, package.filters, 1000) {
    156                     let cur_notes: Vec<NoteRef> = results
    157                         .into_iter()
    158                         .map(NoteRef::from_query_result)
    159                         .collect();
    160 
    161                     notes.extend(cur_notes);
    162                 } else {
    163                     debug!("got no results from TimelineCache lookup for {:?}", id);
    164                 }
    165             }
    166 
    167             notes
    168         } else {
    169             // filter is not ready yet
    170             vec![]
    171         };
    172 
    173         if notes.is_empty() {
    174             warn!("NotesHolder query returned 0 notes? ")
    175         } else {
    176             info!("found NotesHolder with {} notes", notes.len());
    177         }
    178 
    179         let unknown_pks = self.insert_new(id.to_owned(), txn, ndb, &notes, note_cache);
    180 
    181         GetNotesResponse {
    182             vitality: Vitality::Fresh(self.get_expected_mut(id)),
    183             unknown_pks,
    184         }
    185     }
    186 
    187     /// Open a timeline, optionally loading local notes.
    188     ///
    189     /// When `load_local` is false, the timeline is created and subscribed
    190     /// without running a blocking local query. Use this for startup paths
    191     /// where initial notes are loaded asynchronously.
    192     #[profiling::function]
    193     #[allow(clippy::too_many_arguments)]
    194     pub fn open(
    195         &mut self,
    196         ndb: &Ndb,
    197         note_cache: &mut NoteCache,
    198         txn: &Transaction,
    199         scoped_subs: &mut ScopedSubApi<'_, '_>,
    200         id: &TimelineKind,
    201         account_pk: Pubkey,
    202         load_local: bool,
    203     ) -> Option<TimelineOpenResult> {
    204         if !load_local {
    205             let timeline = if let Some(timeline) = self.timelines.get_mut(id) {
    206                 timeline
    207             } else {
    208                 let Some(timeline) = id.clone().into_timeline(txn, ndb) else {
    209                     error!("Error creating timeline from {:?}", id);
    210                     return None;
    211                 };
    212                 self.timelines.insert(id.clone(), timeline);
    213                 self.timelines.get_mut(id).expect("timeline inserted")
    214             };
    215 
    216             if let FilterState::Ready(filter) = &timeline.filter {
    217                 debug!("got open with subscription for {:?}", &timeline.kind);
    218                 timeline.subscription.try_add_local(account_pk, ndb, filter);
    219                 ensure_remote_timeline_subscription(
    220                     timeline,
    221                     account_pk,
    222                     filter.remote().to_vec(),
    223                     scoped_subs,
    224                 );
    225             } else {
    226                 debug!(
    227                     "open skipped subscription; filter not ready for {:?}",
    228                     &timeline.kind
    229                 );
    230             }
    231 
    232             timeline.subscription.increment(account_pk);
    233             return None;
    234         }
    235 
    236         let account_pk = scoped_subs.selected_account_pubkey();
    237         let notes_resp = self.notes(ndb, note_cache, txn, id);
    238         let (mut open_result, timeline) = match notes_resp.vitality {
    239             Vitality::Stale(timeline) => {
    240                 // The timeline cache is stale, let's update it
    241                 let notes = collect_stale_notes(timeline, txn, ndb);
    242 
    243                 let open_result = if notes.is_empty() {
    244                     None
    245                 } else {
    246                     let new_notes = notes.iter().map(|n| n.key).collect();
    247                     Some(TimelineOpenResult::new_notes(new_notes, id.clone()))
    248                 };
    249 
    250                 // we can't insert and update the VirtualList now, because we
    251                 // are already borrowing it mutably. Let's pass it as a
    252                 // result instead
    253                 //
    254                 // holder.get_view().insert(&notes); <-- no
    255                 (open_result, timeline)
    256             }
    257 
    258             Vitality::Fresh(timeline) => (None, timeline),
    259         };
    260 
    261         if let FilterState::Ready(filter) = &timeline.filter {
    262             debug!("got open with *new* subscription for {:?}", &timeline.kind);
    263             timeline.subscription.try_add_local(account_pk, ndb, filter);
    264             ensure_remote_timeline_subscription(
    265                 timeline,
    266                 account_pk,
    267                 filter.remote().to_vec(),
    268                 scoped_subs,
    269             );
    270         } else {
    271             // This should never happen reasoning, self.notes would have
    272             // failed above if the filter wasn't ready
    273             error!(
    274                 "open: filter not ready, so could not setup subscription. this should never happen"
    275             );
    276         };
    277 
    278         timeline.subscription.increment(account_pk);
    279 
    280         if let Some(unknowns) = notes_resp.unknown_pks {
    281             match &mut open_result {
    282                 Some(o) => o.insert_pks(unknowns.pks),
    283                 None => open_result = Some(TimelineOpenResult::new_pks(unknowns.pks)),
    284             }
    285         }
    286 
    287         open_result
    288     }
    289 
    290     pub fn get(&self, id: &TimelineKind) -> Option<&Timeline> {
    291         self.timelines.get(id)
    292     }
    293 
    294     pub fn get_mut(&mut self, id: &TimelineKind) -> Option<&mut Timeline> {
    295         self.timelines.get_mut(id)
    296     }
    297 
    298     pub fn num_timelines(&self) -> usize {
    299         self.timelines.len()
    300     }
    301 
    302     pub fn set_fresh(&mut self, kind: &TimelineKind) {
    303         let Some(tl) = self.get_mut(kind) else {
    304             return;
    305         };
    306 
    307         tl.seen_latest_notes = true;
    308     }
    309 }
    310 
    311 fn collect_stale_notes(timeline: &Timeline, txn: &Transaction, ndb: &Ndb) -> Vec<NoteRef> {
    312     let FilterState::Ready(filter) = &timeline.filter else {
    313         return Vec::new();
    314     };
    315 
    316     let mut notes = Vec::new();
    317     for package in filter.local().packages {
    318         let cur_notes = find_new_notes(
    319             timeline.all_or_any_entries().latest(),
    320             package.filters,
    321             txn,
    322             ndb,
    323         );
    324         notes.extend(cur_notes);
    325     }
    326     notes
    327 }
    328 
    329 pub struct GetNotesResponse<'a> {
    330     vitality: Vitality<'a, Timeline>,
    331     unknown_pks: Option<UnknownPksOwned>,
    332 }
    333 
    334 /// Look for new thread notes since our last fetch
    335 fn find_new_notes(
    336     latest: Option<&NoteRef>,
    337     filters: &[Filter],
    338     txn: &Transaction,
    339     ndb: &Ndb,
    340 ) -> Vec<NoteRef> {
    341     let Some(last_note) = latest else {
    342         return vec![];
    343     };
    344 
    345     let filters = filter::make_filters_since(filters, last_note.created_at + 1);
    346 
    347     if let Ok(results) = ndb.query(txn, &filters, 1000) {
    348         debug!("got {} results from NotesHolder update", results.len());
    349         results
    350             .into_iter()
    351             .map(NoteRef::from_query_result)
    352             .collect()
    353     } else {
    354         debug!("got no results from NotesHolder update",);
    355         vec![]
    356     }
    357 }