notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

cache.rs (8445B)


      1 use crate::{
      2     actionbar::TimelineOpenResult,
      3     multi_subscriber::MultiSubscriber,
      4     profile::Profile,
      5     thread::Thread,
      6     //subscriptions::SubRefs,
      7     timeline::{PubkeySource, Timeline},
      8 };
      9 
     10 use notedeck::{NoteCache, NoteRef, RootNoteId, RootNoteIdBuf};
     11 
     12 use enostr::{Pubkey, PubkeyRef, RelayPool};
     13 use nostrdb::{Filter, FilterBuilder, Ndb, Transaction};
     14 use std::collections::HashMap;
     15 use tracing::{debug, info, warn};
     16 
     17 #[derive(Default)]
     18 pub struct TimelineCache {
     19     pub threads: HashMap<RootNoteIdBuf, Thread>,
     20     pub profiles: HashMap<Pubkey, Profile>,
     21 }
     22 
     23 pub enum Vitality<'a, M> {
     24     Fresh(&'a mut M),
     25     Stale(&'a mut M),
     26 }
     27 
     28 impl<'a, M> Vitality<'a, M> {
     29     pub fn get_ptr(self) -> &'a mut M {
     30         match self {
     31             Self::Fresh(ptr) => ptr,
     32             Self::Stale(ptr) => ptr,
     33         }
     34     }
     35 
     36     pub fn is_stale(&self) -> bool {
     37         match self {
     38             Self::Fresh(_ptr) => false,
     39             Self::Stale(_ptr) => true,
     40         }
     41     }
     42 }
     43 
     44 #[derive(Hash, Debug, Copy, Clone)]
     45 pub enum TimelineCacheKey<'a> {
     46     Profile(PubkeyRef<'a>),
     47     Thread(RootNoteId<'a>),
     48 }
     49 
     50 impl<'a> TimelineCacheKey<'a> {
     51     pub fn profile(pubkey: PubkeyRef<'a>) -> Self {
     52         Self::Profile(pubkey)
     53     }
     54 
     55     pub fn thread(root_id: RootNoteId<'a>) -> Self {
     56         Self::Thread(root_id)
     57     }
     58 
     59     pub fn bytes(&self) -> &[u8; 32] {
     60         match self {
     61             Self::Profile(pk) => pk.bytes(),
     62             Self::Thread(root_id) => root_id.bytes(),
     63         }
     64     }
     65 
     66     /// The filters used to update our timeline cache
     67     pub fn filters_raw(&self) -> Vec<FilterBuilder> {
     68         match self {
     69             TimelineCacheKey::Thread(root_id) => Thread::filters_raw(*root_id),
     70 
     71             TimelineCacheKey::Profile(pubkey) => vec![Filter::new()
     72                 .authors([pubkey.bytes()])
     73                 .kinds([1])
     74                 .limit(notedeck::filter::default_limit())],
     75         }
     76     }
     77 
     78     pub fn filters_since(&self, since: u64) -> Vec<Filter> {
     79         self.filters_raw()
     80             .into_iter()
     81             .map(|fb| fb.since(since).build())
     82             .collect()
     83     }
     84 
     85     pub fn filters(&self) -> Vec<Filter> {
     86         self.filters_raw()
     87             .into_iter()
     88             .map(|mut fb| fb.build())
     89             .collect()
     90     }
     91 }
     92 
     93 impl TimelineCache {
     94     fn contains_key(&self, key: TimelineCacheKey<'_>) -> bool {
     95         match key {
     96             TimelineCacheKey::Profile(pubkey) => self.profiles.contains_key(pubkey.bytes()),
     97             TimelineCacheKey::Thread(root_id) => self.threads.contains_key(root_id.bytes()),
     98         }
     99     }
    100 
    101     fn get_expected_mut(&mut self, key: TimelineCacheKey<'_>) -> &mut Timeline {
    102         match key {
    103             TimelineCacheKey::Profile(pubkey) => self
    104                 .profiles
    105                 .get_mut(pubkey.bytes())
    106                 .map(|p| &mut p.timeline),
    107             TimelineCacheKey::Thread(root_id) => self
    108                 .threads
    109                 .get_mut(root_id.bytes())
    110                 .map(|t| &mut t.timeline),
    111         }
    112         .expect("expected notes in timline cache")
    113     }
    114 
    115     /// Insert a new profile or thread into the cache, based on the TimelineCacheKey
    116     #[allow(clippy::too_many_arguments)]
    117     fn insert_new(
    118         &mut self,
    119         id: TimelineCacheKey<'_>,
    120         txn: &Transaction,
    121         ndb: &Ndb,
    122         notes: &[NoteRef],
    123         note_cache: &mut NoteCache,
    124         filters: Vec<Filter>,
    125     ) {
    126         match id {
    127             TimelineCacheKey::Profile(pubkey) => {
    128                 let mut profile = Profile::new(PubkeySource::Explicit(pubkey.to_owned()), filters);
    129                 // insert initial notes into timeline
    130                 profile.timeline.insert_new(txn, ndb, note_cache, notes);
    131                 self.profiles.insert(pubkey.to_owned(), profile);
    132             }
    133 
    134             TimelineCacheKey::Thread(root_id) => {
    135                 let mut thread = Thread::new(root_id.to_owned());
    136                 thread.timeline.insert_new(txn, ndb, note_cache, notes);
    137                 self.threads.insert(root_id.to_owned(), thread);
    138             }
    139         }
    140     }
    141 
    142     /// Get and/or update the notes associated with this timeline
    143     pub fn notes<'a>(
    144         &'a mut self,
    145         ndb: &Ndb,
    146         note_cache: &mut NoteCache,
    147         txn: &Transaction,
    148         id: TimelineCacheKey<'a>,
    149     ) -> Vitality<'a, Timeline> {
    150         // we can't use the naive hashmap entry API here because lookups
    151         // require a copy, wait until we have a raw entry api. We could
    152         // also use hashbrown?
    153 
    154         if self.contains_key(id) {
    155             return Vitality::Stale(self.get_expected_mut(id));
    156         }
    157 
    158         let filters = id.filters();
    159         let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) {
    160             results
    161                 .into_iter()
    162                 .map(NoteRef::from_query_result)
    163                 .collect()
    164         } else {
    165             debug!("got no results from TimelineCache lookup for {:?}", id);
    166             vec![]
    167         };
    168 
    169         if notes.is_empty() {
    170             warn!("NotesHolder query returned 0 notes? ")
    171         } else {
    172             info!("found NotesHolder with {} notes", notes.len());
    173         }
    174 
    175         self.insert_new(id, txn, ndb, &notes, note_cache, filters);
    176 
    177         Vitality::Fresh(self.get_expected_mut(id))
    178     }
    179 
    180     pub fn subscription(
    181         &mut self,
    182         id: TimelineCacheKey<'_>,
    183     ) -> Option<&mut Option<MultiSubscriber>> {
    184         match id {
    185             TimelineCacheKey::Profile(pubkey) => self
    186                 .profiles
    187                 .get_mut(pubkey.bytes())
    188                 .map(|p| &mut p.subscription),
    189             TimelineCacheKey::Thread(root_id) => self
    190                 .threads
    191                 .get_mut(root_id.bytes())
    192                 .map(|t| &mut t.subscription),
    193         }
    194     }
    195 
    196     pub fn open<'a>(
    197         &mut self,
    198         ndb: &Ndb,
    199         note_cache: &mut NoteCache,
    200         txn: &Transaction,
    201         pool: &mut RelayPool,
    202         id: TimelineCacheKey<'a>,
    203     ) -> Option<TimelineOpenResult<'a>> {
    204         let result = match self.notes(ndb, note_cache, txn, id) {
    205             Vitality::Stale(timeline) => {
    206                 // The timeline cache is stale, let's update it
    207                 let notes = find_new_notes(timeline.all_or_any_notes(), id, txn, ndb);
    208                 let cached_timeline_result = if notes.is_empty() {
    209                     None
    210                 } else {
    211                     let new_notes = notes.iter().map(|n| n.key).collect();
    212                     Some(TimelineOpenResult::new_notes(new_notes, id))
    213                 };
    214 
    215                 // we can't insert and update the VirtualList now, because we
    216                 // are already borrowing it mutably. Let's pass it as a
    217                 // result instead
    218                 //
    219                 // holder.get_view().insert(&notes); <-- no
    220                 cached_timeline_result
    221             }
    222 
    223             Vitality::Fresh(_timeline) => None,
    224         };
    225 
    226         let sub_id = if let Some(sub) = self.subscription(id) {
    227             if let Some(multi_subscriber) = sub {
    228                 multi_subscriber.subscribe(ndb, pool);
    229                 multi_subscriber.sub.as_ref().map(|s| s.local)
    230             } else {
    231                 let mut multi_sub = MultiSubscriber::new(id.filters());
    232                 multi_sub.subscribe(ndb, pool);
    233                 let sub_id = multi_sub.sub.as_ref().map(|s| s.local);
    234                 *sub = Some(multi_sub);
    235                 sub_id
    236             }
    237         } else {
    238             None
    239         };
    240 
    241         let timeline = self.get_expected_mut(id);
    242         if let Some(sub_id) = sub_id {
    243             timeline.subscription = Some(sub_id);
    244         }
    245 
    246         // TODO: We have subscription ids tracked in different places. Fix this
    247 
    248         result
    249     }
    250 }
    251 
    252 /// Look for new thread notes since our last fetch
    253 fn find_new_notes(
    254     notes: &[NoteRef],
    255     id: TimelineCacheKey<'_>,
    256     txn: &Transaction,
    257     ndb: &Ndb,
    258 ) -> Vec<NoteRef> {
    259     if notes.is_empty() {
    260         return vec![];
    261     }
    262 
    263     let last_note = notes[0];
    264     let filters = id.filters_since(last_note.created_at + 1);
    265 
    266     if let Ok(results) = ndb.query(txn, &filters, 1000) {
    267         debug!("got {} results from NotesHolder update", results.len());
    268         results
    269             .into_iter()
    270             .map(NoteRef::from_query_result)
    271             .collect()
    272     } else {
    273         debug!("got no results from NotesHolder update",);
    274         vec![]
    275     }
    276 }