cache.rs (8445B)
1 use crate::{ 2 actionbar::TimelineOpenResult, 3 multi_subscriber::MultiSubscriber, 4 profile::Profile, 5 thread::Thread, 6 //subscriptions::SubRefs, 7 timeline::{PubkeySource, Timeline}, 8 }; 9 10 use notedeck::{NoteCache, NoteRef, RootNoteId, RootNoteIdBuf}; 11 12 use enostr::{Pubkey, PubkeyRef, RelayPool}; 13 use nostrdb::{Filter, FilterBuilder, Ndb, Transaction}; 14 use std::collections::HashMap; 15 use tracing::{debug, info, warn}; 16 17 #[derive(Default)] 18 pub struct TimelineCache { 19 pub threads: HashMap<RootNoteIdBuf, Thread>, 20 pub profiles: HashMap<Pubkey, Profile>, 21 } 22 23 pub enum Vitality<'a, M> { 24 Fresh(&'a mut M), 25 Stale(&'a mut M), 26 } 27 28 impl<'a, M> Vitality<'a, M> { 29 pub fn get_ptr(self) -> &'a mut M { 30 match self { 31 Self::Fresh(ptr) => ptr, 32 Self::Stale(ptr) => ptr, 33 } 34 } 35 36 pub fn is_stale(&self) -> bool { 37 match self { 38 Self::Fresh(_ptr) => false, 39 Self::Stale(_ptr) => true, 40 } 41 } 42 } 43 44 #[derive(Hash, Debug, Copy, Clone)] 45 pub enum TimelineCacheKey<'a> { 46 Profile(PubkeyRef<'a>), 47 Thread(RootNoteId<'a>), 48 } 49 50 impl<'a> TimelineCacheKey<'a> { 51 pub fn profile(pubkey: PubkeyRef<'a>) -> Self { 52 Self::Profile(pubkey) 53 } 54 55 pub fn thread(root_id: RootNoteId<'a>) -> Self { 56 Self::Thread(root_id) 57 } 58 59 pub fn bytes(&self) -> &[u8; 32] { 60 match self { 61 Self::Profile(pk) => pk.bytes(), 62 Self::Thread(root_id) => root_id.bytes(), 63 } 64 } 65 66 /// The filters used to update our timeline cache 67 pub fn filters_raw(&self) -> Vec<FilterBuilder> { 68 match self { 69 TimelineCacheKey::Thread(root_id) => Thread::filters_raw(*root_id), 70 71 TimelineCacheKey::Profile(pubkey) => vec![Filter::new() 72 .authors([pubkey.bytes()]) 73 .kinds([1]) 74 .limit(notedeck::filter::default_limit())], 75 } 76 } 77 78 pub fn filters_since(&self, since: u64) -> Vec<Filter> { 79 self.filters_raw() 80 .into_iter() 81 .map(|fb| fb.since(since).build()) 82 .collect() 83 } 84 85 pub fn filters(&self) -> Vec<Filter> { 86 self.filters_raw() 87 .into_iter() 88 .map(|mut fb| fb.build()) 89 .collect() 90 } 91 } 92 93 impl TimelineCache { 94 fn contains_key(&self, key: TimelineCacheKey<'_>) -> bool { 95 match key { 96 TimelineCacheKey::Profile(pubkey) => self.profiles.contains_key(pubkey.bytes()), 97 TimelineCacheKey::Thread(root_id) => self.threads.contains_key(root_id.bytes()), 98 } 99 } 100 101 fn get_expected_mut(&mut self, key: TimelineCacheKey<'_>) -> &mut Timeline { 102 match key { 103 TimelineCacheKey::Profile(pubkey) => self 104 .profiles 105 .get_mut(pubkey.bytes()) 106 .map(|p| &mut p.timeline), 107 TimelineCacheKey::Thread(root_id) => self 108 .threads 109 .get_mut(root_id.bytes()) 110 .map(|t| &mut t.timeline), 111 } 112 .expect("expected notes in timline cache") 113 } 114 115 /// Insert a new profile or thread into the cache, based on the TimelineCacheKey 116 #[allow(clippy::too_many_arguments)] 117 fn insert_new( 118 &mut self, 119 id: TimelineCacheKey<'_>, 120 txn: &Transaction, 121 ndb: &Ndb, 122 notes: &[NoteRef], 123 note_cache: &mut NoteCache, 124 filters: Vec<Filter>, 125 ) { 126 match id { 127 TimelineCacheKey::Profile(pubkey) => { 128 let mut profile = Profile::new(PubkeySource::Explicit(pubkey.to_owned()), filters); 129 // insert initial notes into timeline 130 profile.timeline.insert_new(txn, ndb, note_cache, notes); 131 self.profiles.insert(pubkey.to_owned(), profile); 132 } 133 134 TimelineCacheKey::Thread(root_id) => { 135 let mut thread = Thread::new(root_id.to_owned()); 136 thread.timeline.insert_new(txn, ndb, note_cache, notes); 137 self.threads.insert(root_id.to_owned(), thread); 138 } 139 } 140 } 141 142 /// Get and/or update the notes associated with this timeline 143 pub fn notes<'a>( 144 &'a mut self, 145 ndb: &Ndb, 146 note_cache: &mut NoteCache, 147 txn: &Transaction, 148 id: TimelineCacheKey<'a>, 149 ) -> Vitality<'a, Timeline> { 150 // we can't use the naive hashmap entry API here because lookups 151 // require a copy, wait until we have a raw entry api. We could 152 // also use hashbrown? 153 154 if self.contains_key(id) { 155 return Vitality::Stale(self.get_expected_mut(id)); 156 } 157 158 let filters = id.filters(); 159 let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) { 160 results 161 .into_iter() 162 .map(NoteRef::from_query_result) 163 .collect() 164 } else { 165 debug!("got no results from TimelineCache lookup for {:?}", id); 166 vec![] 167 }; 168 169 if notes.is_empty() { 170 warn!("NotesHolder query returned 0 notes? ") 171 } else { 172 info!("found NotesHolder with {} notes", notes.len()); 173 } 174 175 self.insert_new(id, txn, ndb, ¬es, note_cache, filters); 176 177 Vitality::Fresh(self.get_expected_mut(id)) 178 } 179 180 pub fn subscription( 181 &mut self, 182 id: TimelineCacheKey<'_>, 183 ) -> Option<&mut Option<MultiSubscriber>> { 184 match id { 185 TimelineCacheKey::Profile(pubkey) => self 186 .profiles 187 .get_mut(pubkey.bytes()) 188 .map(|p| &mut p.subscription), 189 TimelineCacheKey::Thread(root_id) => self 190 .threads 191 .get_mut(root_id.bytes()) 192 .map(|t| &mut t.subscription), 193 } 194 } 195 196 pub fn open<'a>( 197 &mut self, 198 ndb: &Ndb, 199 note_cache: &mut NoteCache, 200 txn: &Transaction, 201 pool: &mut RelayPool, 202 id: TimelineCacheKey<'a>, 203 ) -> Option<TimelineOpenResult<'a>> { 204 let result = match self.notes(ndb, note_cache, txn, id) { 205 Vitality::Stale(timeline) => { 206 // The timeline cache is stale, let's update it 207 let notes = find_new_notes(timeline.all_or_any_notes(), id, txn, ndb); 208 let cached_timeline_result = if notes.is_empty() { 209 None 210 } else { 211 let new_notes = notes.iter().map(|n| n.key).collect(); 212 Some(TimelineOpenResult::new_notes(new_notes, id)) 213 }; 214 215 // we can't insert and update the VirtualList now, because we 216 // are already borrowing it mutably. Let's pass it as a 217 // result instead 218 // 219 // holder.get_view().insert(¬es); <-- no 220 cached_timeline_result 221 } 222 223 Vitality::Fresh(_timeline) => None, 224 }; 225 226 let sub_id = if let Some(sub) = self.subscription(id) { 227 if let Some(multi_subscriber) = sub { 228 multi_subscriber.subscribe(ndb, pool); 229 multi_subscriber.sub.as_ref().map(|s| s.local) 230 } else { 231 let mut multi_sub = MultiSubscriber::new(id.filters()); 232 multi_sub.subscribe(ndb, pool); 233 let sub_id = multi_sub.sub.as_ref().map(|s| s.local); 234 *sub = Some(multi_sub); 235 sub_id 236 } 237 } else { 238 None 239 }; 240 241 let timeline = self.get_expected_mut(id); 242 if let Some(sub_id) = sub_id { 243 timeline.subscription = Some(sub_id); 244 } 245 246 // TODO: We have subscription ids tracked in different places. Fix this 247 248 result 249 } 250 } 251 252 /// Look for new thread notes since our last fetch 253 fn find_new_notes( 254 notes: &[NoteRef], 255 id: TimelineCacheKey<'_>, 256 txn: &Transaction, 257 ndb: &Ndb, 258 ) -> Vec<NoteRef> { 259 if notes.is_empty() { 260 return vec![]; 261 } 262 263 let last_note = notes[0]; 264 let filters = id.filters_since(last_note.created_at + 1); 265 266 if let Ok(results) = ndb.query(txn, &filters, 1000) { 267 debug!("got {} results from NotesHolder update", results.len()); 268 results 269 .into_iter() 270 .map(NoteRef::from_query_result) 271 .collect() 272 } else { 273 debug!("got no results from NotesHolder update",); 274 vec![] 275 } 276 }