cache.rs (8961B)
1 use crate::{ 2 actionbar::TimelineOpenResult, 3 error::Error, 4 timeline::{Timeline, TimelineKind, UnknownPksOwned}, 5 }; 6 7 use notedeck::{filter, FilterState, NoteCache, NoteRef}; 8 9 use enostr::RelayPool; 10 use nostrdb::{Filter, Ndb, Transaction}; 11 use std::collections::HashMap; 12 use tracing::{debug, error, info, warn}; 13 14 #[derive(Default)] 15 pub struct TimelineCache { 16 timelines: HashMap<TimelineKind, Timeline>, 17 } 18 19 pub enum Vitality<'a, M> { 20 Fresh(&'a mut M), 21 Stale(&'a mut M), 22 } 23 24 impl<'a, M> Vitality<'a, M> { 25 pub fn get_ptr(self) -> &'a mut M { 26 match self { 27 Self::Fresh(ptr) => ptr, 28 Self::Stale(ptr) => ptr, 29 } 30 } 31 32 pub fn is_stale(&self) -> bool { 33 match self { 34 Self::Fresh(_ptr) => false, 35 Self::Stale(_ptr) => true, 36 } 37 } 38 } 39 40 impl<'a> IntoIterator for &'a mut TimelineCache { 41 type Item = (&'a TimelineKind, &'a mut Timeline); 42 type IntoIter = std::collections::hash_map::IterMut<'a, TimelineKind, Timeline>; 43 44 fn into_iter(self) -> Self::IntoIter { 45 self.timelines.iter_mut() 46 } 47 } 48 49 impl TimelineCache { 50 /// Pop a timeline from the timeline cache. This only removes the timeline 51 /// if it has reached 0 subscribers, meaning it was the last one to be 52 /// removed 53 pub fn pop( 54 &mut self, 55 id: &TimelineKind, 56 ndb: &mut Ndb, 57 pool: &mut RelayPool, 58 ) -> Result<(), Error> { 59 let timeline = if let Some(timeline) = self.timelines.get_mut(id) { 60 timeline 61 } else { 62 return Err(Error::TimelineNotFound); 63 }; 64 65 timeline.subscription.unsubscribe_or_decrement(ndb, pool); 66 67 if timeline.subscription.no_sub() { 68 debug!( 69 "popped last timeline {:?}, removing from timeline cache", 70 id 71 ); 72 self.timelines.remove(id); 73 } 74 75 Ok(()) 76 } 77 78 fn get_expected_mut(&mut self, key: &TimelineKind) -> &mut Timeline { 79 self.timelines 80 .get_mut(key) 81 .expect("expected notes in timline cache") 82 } 83 84 /// Insert a new timeline into the cache, based on the TimelineKind 85 #[allow(clippy::too_many_arguments)] 86 fn insert_new( 87 &mut self, 88 id: TimelineKind, 89 txn: &Transaction, 90 ndb: &Ndb, 91 notes: &[NoteRef], 92 note_cache: &mut NoteCache, 93 ) -> Option<UnknownPksOwned> { 94 let mut timeline = if let Some(timeline) = id.clone().into_timeline(txn, ndb) { 95 timeline 96 } else { 97 error!("Error creating timeline from {:?}", &id); 98 return None; 99 }; 100 101 // insert initial notes into timeline 102 let res = timeline.insert_new(txn, ndb, note_cache, notes); 103 self.timelines.insert(id, timeline); 104 105 res 106 } 107 108 pub fn insert(&mut self, id: TimelineKind, timeline: Timeline) { 109 if let Some(cur_timeline) = self.timelines.get_mut(&id) { 110 cur_timeline.subscription.increment(); 111 return; 112 }; 113 114 self.timelines.insert(id, timeline); 115 } 116 117 /// Get and/or update the notes associated with this timeline 118 fn notes<'a>( 119 &'a mut self, 120 ndb: &Ndb, 121 note_cache: &mut NoteCache, 122 txn: &Transaction, 123 id: &TimelineKind, 124 ) -> GetNotesResponse<'a> { 125 // we can't use the naive hashmap entry API here because lookups 126 // require a copy, wait until we have a raw entry api. We could 127 // also use hashbrown? 128 129 if self.timelines.contains_key(id) { 130 return GetNotesResponse { 131 vitality: Vitality::Stale(self.get_expected_mut(id)), 132 unknown_pks: None, 133 }; 134 } 135 136 let notes = if let FilterState::Ready(filters) = id.filters(txn, ndb) { 137 let mut notes = Vec::new(); 138 139 for package in filters.local().packages { 140 if let Ok(results) = ndb.query(txn, package.filters, 1000) { 141 let cur_notes: Vec<NoteRef> = results 142 .into_iter() 143 .map(NoteRef::from_query_result) 144 .collect(); 145 146 notes.extend(cur_notes); 147 } else { 148 debug!("got no results from TimelineCache lookup for {:?}", id); 149 } 150 } 151 152 notes 153 } else { 154 // filter is not ready yet 155 vec![] 156 }; 157 158 if notes.is_empty() { 159 warn!("NotesHolder query returned 0 notes? ") 160 } else { 161 info!("found NotesHolder with {} notes", notes.len()); 162 } 163 164 let unknown_pks = self.insert_new(id.to_owned(), txn, ndb, ¬es, note_cache); 165 166 GetNotesResponse { 167 vitality: Vitality::Fresh(self.get_expected_mut(id)), 168 unknown_pks, 169 } 170 } 171 172 /// Open a timeline, this is another way of saying insert a timeline 173 /// into the timeline cache. If there exists a timeline already, we 174 /// bump its subscription reference count. If it's new we start a new 175 /// subscription 176 pub fn open( 177 &mut self, 178 ndb: &Ndb, 179 note_cache: &mut NoteCache, 180 txn: &Transaction, 181 pool: &mut RelayPool, 182 id: &TimelineKind, 183 ) -> Option<TimelineOpenResult> { 184 let notes_resp = self.notes(ndb, note_cache, txn, id); 185 let (mut open_result, timeline) = match notes_resp.vitality { 186 Vitality::Stale(timeline) => { 187 // The timeline cache is stale, let's update it 188 let notes = { 189 let mut notes = Vec::new(); 190 for package in timeline.subscription.get_filter()?.local().packages { 191 let cur_notes = find_new_notes( 192 timeline.all_or_any_entries().latest(), 193 package.filters, 194 txn, 195 ndb, 196 ); 197 notes.extend(cur_notes); 198 } 199 notes 200 }; 201 202 let open_result = if notes.is_empty() { 203 None 204 } else { 205 let new_notes = notes.iter().map(|n| n.key).collect(); 206 Some(TimelineOpenResult::new_notes(new_notes, id.clone())) 207 }; 208 209 // we can't insert and update the VirtualList now, because we 210 // are already borrowing it mutably. Let's pass it as a 211 // result instead 212 // 213 // holder.get_view().insert(¬es); <-- no 214 (open_result, timeline) 215 } 216 217 Vitality::Fresh(timeline) => (None, timeline), 218 }; 219 220 if let Some(filter) = timeline.filter.get_any_ready() { 221 debug!("got open with *new* subscription for {:?}", &timeline.kind); 222 timeline.subscription.try_add_local(ndb, filter); 223 timeline.subscription.try_add_remote(pool, filter); 224 } else { 225 // This should never happen reasoning, self.notes would have 226 // failed above if the filter wasn't ready 227 error!( 228 "open: filter not ready, so could not setup subscription. this should never happen" 229 ); 230 }; 231 232 timeline.subscription.increment(); 233 234 if let Some(unknowns) = notes_resp.unknown_pks { 235 match &mut open_result { 236 Some(o) => o.insert_pks(unknowns.pks), 237 None => open_result = Some(TimelineOpenResult::new_pks(unknowns.pks)), 238 } 239 } 240 241 open_result 242 } 243 244 pub fn get(&self, id: &TimelineKind) -> Option<&Timeline> { 245 self.timelines.get(id) 246 } 247 248 pub fn get_mut(&mut self, id: &TimelineKind) -> Option<&mut Timeline> { 249 self.timelines.get_mut(id) 250 } 251 252 pub fn num_timelines(&self) -> usize { 253 self.timelines.len() 254 } 255 256 pub fn set_fresh(&mut self, kind: &TimelineKind) { 257 let Some(tl) = self.get_mut(kind) else { 258 return; 259 }; 260 261 tl.seen_latest_notes = true; 262 } 263 } 264 265 pub struct GetNotesResponse<'a> { 266 vitality: Vitality<'a, Timeline>, 267 unknown_pks: Option<UnknownPksOwned>, 268 } 269 270 /// Look for new thread notes since our last fetch 271 fn find_new_notes( 272 latest: Option<&NoteRef>, 273 filters: &[Filter], 274 txn: &Transaction, 275 ndb: &Ndb, 276 ) -> Vec<NoteRef> { 277 let Some(last_note) = latest else { 278 return vec![]; 279 }; 280 281 let filters = filter::make_filters_since(filters, last_note.created_at + 1); 282 283 if let Ok(results) = ndb.query(txn, &filters, 1000) { 284 debug!("got {} results from NotesHolder update", results.len()); 285 results 286 .into_iter() 287 .map(NoteRef::from_query_result) 288 .collect() 289 } else { 290 debug!("got no results from NotesHolder update",); 291 vec![] 292 } 293 }