cache.rs (11170B)
1 use crate::{ 2 actionbar::TimelineOpenResult, 3 error::Error, 4 timeline::{ 5 drop_timeline_remote_owner, ensure_remote_timeline_subscription, Timeline, TimelineKind, 6 UnknownPksOwned, 7 }, 8 }; 9 10 use notedeck::ScopedSubApi; 11 use notedeck::{filter, FilterState, NoteCache, NoteRef}; 12 13 use enostr::Pubkey; 14 use nostrdb::{Filter, Ndb, Transaction}; 15 use std::collections::HashMap; 16 use tracing::{debug, error, info, warn}; 17 18 #[derive(Default)] 19 pub struct TimelineCache { 20 timelines: HashMap<TimelineKind, Timeline>, 21 } 22 23 pub enum Vitality<'a, M> { 24 Fresh(&'a mut M), 25 Stale(&'a mut M), 26 } 27 28 impl<'a, M> Vitality<'a, M> { 29 pub fn get_ptr(self) -> &'a mut M { 30 match self { 31 Self::Fresh(ptr) => ptr, 32 Self::Stale(ptr) => ptr, 33 } 34 } 35 36 pub fn is_stale(&self) -> bool { 37 match self { 38 Self::Fresh(_ptr) => false, 39 Self::Stale(_ptr) => true, 40 } 41 } 42 } 43 44 impl<'a> IntoIterator for &'a mut TimelineCache { 45 type Item = (&'a TimelineKind, &'a mut Timeline); 46 type IntoIter = std::collections::hash_map::IterMut<'a, TimelineKind, Timeline>; 47 48 fn into_iter(self) -> Self::IntoIter { 49 self.timelines.iter_mut() 50 } 51 } 52 53 impl TimelineCache { 54 /// Pop a timeline from the timeline cache. This only removes the timeline 55 /// if it has reached 0 subscribers, meaning it was the last one to be 56 /// removed 57 pub fn pop( 58 &mut self, 59 id: &TimelineKind, 60 ndb: &mut Ndb, 61 scoped_subs: &mut ScopedSubApi<'_, '_>, 62 ) -> Result<(), Error> { 63 let timeline = if let Some(timeline) = self.timelines.get_mut(id) { 64 timeline 65 } else { 66 return Err(Error::TimelineNotFound); 67 }; 68 69 let account_pk = scoped_subs.selected_account_pubkey(); 70 timeline 71 .subscription 72 .unsubscribe_or_decrement(account_pk, ndb); 73 74 if timeline.subscription.no_sub(&account_pk) { 75 timeline.subscription.clear_remote_seeded(account_pk); 76 drop_timeline_remote_owner(timeline, account_pk, scoped_subs); 77 } 78 79 if !timeline.subscription.has_any_subs() { 80 debug!( 81 "popped last timeline {:?}, removing from timeline cache", 82 id 83 ); 84 self.timelines.remove(id); 85 } 86 87 Ok(()) 88 } 89 90 fn get_expected_mut(&mut self, key: &TimelineKind) -> &mut Timeline { 91 self.timelines 92 .get_mut(key) 93 .expect("expected notes in timline cache") 94 } 95 96 /// Insert a new timeline into the cache, based on the TimelineKind 97 #[allow(clippy::too_many_arguments)] 98 fn insert_new( 99 &mut self, 100 id: TimelineKind, 101 txn: &Transaction, 102 ndb: &Ndb, 103 notes: &[NoteRef], 104 note_cache: &mut NoteCache, 105 ) -> Option<UnknownPksOwned> { 106 let mut timeline = if let Some(timeline) = id.clone().into_timeline(txn, ndb) { 107 timeline 108 } else { 109 error!("Error creating timeline from {:?}", &id); 110 return None; 111 }; 112 113 // insert initial notes into timeline 114 let res = timeline.insert_new(txn, ndb, note_cache, notes); 115 self.timelines.insert(id, timeline); 116 117 res 118 } 119 120 pub fn insert(&mut self, id: TimelineKind, account_pk: Pubkey, mut timeline: Timeline) { 121 if let Some(cur_timeline) = self.timelines.get_mut(&id) { 122 cur_timeline.subscription.increment(account_pk); 123 return; 124 }; 125 126 timeline.subscription.increment(account_pk); 127 self.timelines.insert(id, timeline); 128 } 129 130 /// Get and/or update the notes associated with this timeline 131 #[profiling::function] 132 fn notes<'a>( 133 &'a mut self, 134 ndb: &Ndb, 135 note_cache: &mut NoteCache, 136 txn: &Transaction, 137 id: &TimelineKind, 138 ) -> GetNotesResponse<'a> { 139 // we can't use the naive hashmap entry API here because lookups 140 // require a copy, wait until we have a raw entry api. We could 141 // also use hashbrown? 142 143 if self.timelines.contains_key(id) { 144 return GetNotesResponse { 145 vitality: Vitality::Stale(self.get_expected_mut(id)), 146 unknown_pks: None, 147 }; 148 } 149 150 let notes = if let FilterState::Ready(filters) = id.filters(txn, ndb) { 151 let mut notes = Vec::new(); 152 153 for package in filters.local().packages { 154 profiling::scope!("ndb query"); 155 if let Ok(results) = ndb.query(txn, package.filters, 1000) { 156 let cur_notes: Vec<NoteRef> = results 157 .into_iter() 158 .map(NoteRef::from_query_result) 159 .collect(); 160 161 notes.extend(cur_notes); 162 } else { 163 debug!("got no results from TimelineCache lookup for {:?}", id); 164 } 165 } 166 167 notes 168 } else { 169 // filter is not ready yet 170 vec![] 171 }; 172 173 if notes.is_empty() { 174 warn!("NotesHolder query returned 0 notes? ") 175 } else { 176 info!("found NotesHolder with {} notes", notes.len()); 177 } 178 179 let unknown_pks = self.insert_new(id.to_owned(), txn, ndb, ¬es, note_cache); 180 181 GetNotesResponse { 182 vitality: Vitality::Fresh(self.get_expected_mut(id)), 183 unknown_pks, 184 } 185 } 186 187 /// Open a timeline, optionally loading local notes. 188 /// 189 /// When `load_local` is false, the timeline is created and subscribed 190 /// without running a blocking local query. Use this for startup paths 191 /// where initial notes are loaded asynchronously. 192 #[profiling::function] 193 #[allow(clippy::too_many_arguments)] 194 pub fn open( 195 &mut self, 196 ndb: &Ndb, 197 note_cache: &mut NoteCache, 198 txn: &Transaction, 199 scoped_subs: &mut ScopedSubApi<'_, '_>, 200 id: &TimelineKind, 201 account_pk: Pubkey, 202 load_local: bool, 203 ) -> Option<TimelineOpenResult> { 204 if !load_local { 205 let timeline = if let Some(timeline) = self.timelines.get_mut(id) { 206 timeline 207 } else { 208 let Some(timeline) = id.clone().into_timeline(txn, ndb) else { 209 error!("Error creating timeline from {:?}", id); 210 return None; 211 }; 212 self.timelines.insert(id.clone(), timeline); 213 self.timelines.get_mut(id).expect("timeline inserted") 214 }; 215 216 if let FilterState::Ready(filter) = &timeline.filter { 217 debug!("got open with subscription for {:?}", &timeline.kind); 218 timeline.subscription.try_add_local(account_pk, ndb, filter); 219 ensure_remote_timeline_subscription( 220 timeline, 221 account_pk, 222 filter.remote().to_vec(), 223 scoped_subs, 224 ); 225 } else { 226 debug!( 227 "open skipped subscription; filter not ready for {:?}", 228 &timeline.kind 229 ); 230 } 231 232 timeline.subscription.increment(account_pk); 233 return None; 234 } 235 236 let account_pk = scoped_subs.selected_account_pubkey(); 237 let notes_resp = self.notes(ndb, note_cache, txn, id); 238 let (mut open_result, timeline) = match notes_resp.vitality { 239 Vitality::Stale(timeline) => { 240 // The timeline cache is stale, let's update it 241 let notes = collect_stale_notes(timeline, txn, ndb); 242 243 let open_result = if notes.is_empty() { 244 None 245 } else { 246 let new_notes = notes.iter().map(|n| n.key).collect(); 247 Some(TimelineOpenResult::new_notes(new_notes, id.clone())) 248 }; 249 250 // we can't insert and update the VirtualList now, because we 251 // are already borrowing it mutably. Let's pass it as a 252 // result instead 253 // 254 // holder.get_view().insert(¬es); <-- no 255 (open_result, timeline) 256 } 257 258 Vitality::Fresh(timeline) => (None, timeline), 259 }; 260 261 if let FilterState::Ready(filter) = &timeline.filter { 262 debug!("got open with *new* subscription for {:?}", &timeline.kind); 263 timeline.subscription.try_add_local(account_pk, ndb, filter); 264 ensure_remote_timeline_subscription( 265 timeline, 266 account_pk, 267 filter.remote().to_vec(), 268 scoped_subs, 269 ); 270 } else { 271 // This should never happen reasoning, self.notes would have 272 // failed above if the filter wasn't ready 273 error!( 274 "open: filter not ready, so could not setup subscription. this should never happen" 275 ); 276 }; 277 278 timeline.subscription.increment(account_pk); 279 280 if let Some(unknowns) = notes_resp.unknown_pks { 281 match &mut open_result { 282 Some(o) => o.insert_pks(unknowns.pks), 283 None => open_result = Some(TimelineOpenResult::new_pks(unknowns.pks)), 284 } 285 } 286 287 open_result 288 } 289 290 pub fn get(&self, id: &TimelineKind) -> Option<&Timeline> { 291 self.timelines.get(id) 292 } 293 294 pub fn get_mut(&mut self, id: &TimelineKind) -> Option<&mut Timeline> { 295 self.timelines.get_mut(id) 296 } 297 298 pub fn num_timelines(&self) -> usize { 299 self.timelines.len() 300 } 301 302 pub fn set_fresh(&mut self, kind: &TimelineKind) { 303 let Some(tl) = self.get_mut(kind) else { 304 return; 305 }; 306 307 tl.seen_latest_notes = true; 308 } 309 } 310 311 fn collect_stale_notes(timeline: &Timeline, txn: &Transaction, ndb: &Ndb) -> Vec<NoteRef> { 312 let FilterState::Ready(filter) = &timeline.filter else { 313 return Vec::new(); 314 }; 315 316 let mut notes = Vec::new(); 317 for package in filter.local().packages { 318 let cur_notes = find_new_notes( 319 timeline.all_or_any_entries().latest(), 320 package.filters, 321 txn, 322 ndb, 323 ); 324 notes.extend(cur_notes); 325 } 326 notes 327 } 328 329 pub struct GetNotesResponse<'a> { 330 vitality: Vitality<'a, Timeline>, 331 unknown_pks: Option<UnknownPksOwned>, 332 } 333 334 /// Look for new thread notes since our last fetch 335 fn find_new_notes( 336 latest: Option<&NoteRef>, 337 filters: &[Filter], 338 txn: &Transaction, 339 ndb: &Ndb, 340 ) -> Vec<NoteRef> { 341 let Some(last_note) = latest else { 342 return vec![]; 343 }; 344 345 let filters = filter::make_filters_since(filters, last_note.created_at + 1); 346 347 if let Ok(results) = ndb.query(txn, &filters, 1000) { 348 debug!("got {} results from NotesHolder update", results.len()); 349 results 350 .into_iter() 351 .map(NoteRef::from_query_result) 352 .collect() 353 } else { 354 debug!("got no results from NotesHolder update",); 355 vec![] 356 } 357 }