unknowns.rs (12231B)
1 use crate::{ 2 column::Columns, 3 note::NoteRef, 4 notecache::{CachedNote, NoteCache}, 5 timeline::ViewFilter, 6 Result, 7 }; 8 9 use enostr::{Filter, NoteId, Pubkey}; 10 use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction}; 11 use std::collections::HashSet; 12 use std::time::{Duration, Instant}; 13 use tracing::error; 14 15 #[must_use = "process_action should be used on this result"] 16 pub enum SingleUnkIdAction { 17 NoAction, 18 NeedsProcess(UnknownId), 19 } 20 21 #[must_use = "process_action should be used on this result"] 22 pub enum NoteRefsUnkIdAction { 23 NoAction, 24 NeedsProcess(Vec<NoteRef>), 25 } 26 27 impl NoteRefsUnkIdAction { 28 pub fn new(refs: Vec<NoteRef>) -> Self { 29 NoteRefsUnkIdAction::NeedsProcess(refs) 30 } 31 32 pub fn no_action() -> Self { 33 Self::NoAction 34 } 35 36 pub fn process_action( 37 &self, 38 txn: &Transaction, 39 ndb: &Ndb, 40 unk_ids: &mut UnknownIds, 41 note_cache: &mut NoteCache, 42 ) { 43 match self { 44 Self::NoAction => {} 45 Self::NeedsProcess(refs) => { 46 UnknownIds::update_from_note_refs(txn, ndb, unk_ids, note_cache, refs); 47 } 48 } 49 } 50 } 51 52 impl SingleUnkIdAction { 53 pub fn new(id: UnknownId) -> Self { 54 SingleUnkIdAction::NeedsProcess(id) 55 } 56 57 pub fn no_action() -> Self { 58 Self::NoAction 59 } 60 61 pub fn pubkey(pubkey: Pubkey) -> Self { 62 SingleUnkIdAction::new(UnknownId::Pubkey(pubkey)) 63 } 64 65 pub fn note_id(note_id: NoteId) -> Self { 66 SingleUnkIdAction::new(UnknownId::Id(note_id)) 67 } 68 69 /// Some functions may return unknown id actions that need to be processed. 70 /// For example, when we add a new account we need to make sure we have the 71 /// profile for that account. This function ensures we add this to the 72 /// unknown id tracker without adding side effects to functions. 73 pub fn process_action(&self, ids: &mut UnknownIds, ndb: &Ndb, txn: &Transaction) { 74 match self { 75 Self::NeedsProcess(id) => { 76 ids.add_unknown_id_if_missing(ndb, txn, id); 77 } 78 Self::NoAction => {} 79 } 80 } 81 } 82 83 /// Unknown Id searcher 84 #[derive(Default)] 85 pub struct UnknownIds { 86 ids: HashSet<UnknownId>, 87 first_updated: Option<Instant>, 88 last_updated: Option<Instant>, 89 } 90 91 impl UnknownIds { 92 /// Simple debouncer 93 pub fn ready_to_send(&self) -> bool { 94 if self.ids.is_empty() { 95 return false; 96 } 97 98 // we trigger on first set 99 if self.first_updated == self.last_updated { 100 return true; 101 } 102 103 let last_updated = if let Some(last) = self.last_updated { 104 last 105 } else { 106 // if we've 107 return true; 108 }; 109 110 Instant::now() - last_updated >= Duration::from_secs(2) 111 } 112 113 pub fn ids(&self) -> &HashSet<UnknownId> { 114 &self.ids 115 } 116 117 pub fn ids_mut(&mut self) -> &mut HashSet<UnknownId> { 118 &mut self.ids 119 } 120 121 pub fn clear(&mut self) { 122 self.ids = HashSet::default(); 123 } 124 125 pub fn filter(&self) -> Option<Vec<Filter>> { 126 let ids: Vec<&UnknownId> = self.ids.iter().collect(); 127 get_unknown_ids_filter(&ids) 128 } 129 130 /// We've updated some unknown ids, update the last_updated time to now 131 pub fn mark_updated(&mut self) { 132 let now = Instant::now(); 133 if self.first_updated.is_none() { 134 self.first_updated = Some(now); 135 } 136 self.last_updated = Some(now); 137 } 138 139 pub fn update_from_note_key( 140 txn: &Transaction, 141 ndb: &Ndb, 142 unknown_ids: &mut UnknownIds, 143 note_cache: &mut NoteCache, 144 key: NoteKey, 145 ) -> bool { 146 let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { 147 note 148 } else { 149 return false; 150 }; 151 152 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e) 153 } 154 155 /// Should be called on freshly polled notes from subscriptions 156 pub fn update_from_note_refs( 157 txn: &Transaction, 158 ndb: &Ndb, 159 unknown_ids: &mut UnknownIds, 160 note_cache: &mut NoteCache, 161 note_refs: &[NoteRef], 162 ) { 163 for note_ref in note_refs { 164 Self::update_from_note_key(txn, ndb, unknown_ids, note_cache, note_ref.key); 165 } 166 } 167 168 pub fn update_from_note( 169 txn: &Transaction, 170 ndb: &Ndb, 171 unknown_ids: &mut UnknownIds, 172 note_cache: &mut NoteCache, 173 note: &Note, 174 ) -> bool { 175 let before = unknown_ids.ids().len(); 176 let key = note.key().expect("note key"); 177 //let cached_note = note_cache.cached_note_or_insert(key, note).clone(); 178 let cached_note = note_cache.cached_note_or_insert(key, note); 179 if let Err(e) = get_unknown_note_ids(ndb, cached_note, txn, note, unknown_ids.ids_mut()) { 180 error!("UnknownIds::update_from_note {e}"); 181 } 182 let after = unknown_ids.ids().len(); 183 184 if before != after { 185 unknown_ids.mark_updated(); 186 true 187 } else { 188 false 189 } 190 } 191 192 pub fn add_unknown_id_if_missing(&mut self, ndb: &Ndb, txn: &Transaction, unk_id: &UnknownId) { 193 match unk_id { 194 UnknownId::Pubkey(pk) => self.add_pubkey_if_missing(ndb, txn, pk), 195 UnknownId::Id(note_id) => self.add_note_id_if_missing(ndb, txn, note_id), 196 } 197 } 198 199 pub fn add_pubkey_if_missing(&mut self, ndb: &Ndb, txn: &Transaction, pubkey: &Pubkey) { 200 // we already have this profile, skip 201 if ndb.get_profile_by_pubkey(txn, pubkey).is_ok() { 202 return; 203 } 204 205 self.ids.insert(UnknownId::Pubkey(*pubkey)); 206 self.mark_updated(); 207 } 208 209 pub fn add_note_id_if_missing(&mut self, ndb: &Ndb, txn: &Transaction, note_id: &NoteId) { 210 // we already have this note, skip 211 if ndb.get_note_by_id(txn, note_id.bytes()).is_ok() { 212 return; 213 } 214 215 self.ids.insert(UnknownId::Id(*note_id)); 216 self.mark_updated(); 217 } 218 219 pub fn update( 220 txn: &Transaction, 221 unknown_ids: &mut UnknownIds, 222 columns: &Columns, 223 ndb: &Ndb, 224 note_cache: &mut NoteCache, 225 ) -> bool { 226 let before = unknown_ids.ids().len(); 227 if let Err(e) = get_unknown_ids(txn, unknown_ids, columns, ndb, note_cache) { 228 error!("UnknownIds::update {e}"); 229 } 230 let after = unknown_ids.ids().len(); 231 232 if before != after { 233 unknown_ids.mark_updated(); 234 true 235 } else { 236 false 237 } 238 } 239 } 240 241 #[derive(Hash, Clone, Copy, PartialEq, Eq)] 242 pub enum UnknownId { 243 Pubkey(Pubkey), 244 Id(NoteId), 245 } 246 247 impl UnknownId { 248 pub fn is_pubkey(&self) -> Option<&Pubkey> { 249 match self { 250 UnknownId::Pubkey(pk) => Some(pk), 251 _ => None, 252 } 253 } 254 255 pub fn is_id(&self) -> Option<&NoteId> { 256 match self { 257 UnknownId::Id(id) => Some(id), 258 _ => None, 259 } 260 } 261 } 262 263 /// Look for missing notes in various parts of notes that we see: 264 /// 265 /// - pubkeys and notes mentioned inside the note 266 /// - notes being replied to 267 /// 268 /// We return all of this in a HashSet so that we can fetch these from 269 /// remote relays. 270 /// 271 pub fn get_unknown_note_ids<'a>( 272 ndb: &Ndb, 273 cached_note: &CachedNote, 274 txn: &'a Transaction, 275 note: &Note<'a>, 276 ids: &mut HashSet<UnknownId>, 277 ) -> Result<()> { 278 #[cfg(feature = "profiling")] 279 puffin::profile_function!(); 280 281 // the author pubkey 282 if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { 283 ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); 284 } 285 286 // pull notes that notes are replying to 287 if cached_note.reply.root.is_some() { 288 let note_reply = cached_note.reply.borrow(note.tags()); 289 if let Some(root) = note_reply.root() { 290 if ndb.get_note_by_id(txn, root.id).is_err() { 291 ids.insert(UnknownId::Id(NoteId::new(*root.id))); 292 } 293 } 294 295 if !note_reply.is_reply_to_root() { 296 if let Some(reply) = note_reply.reply() { 297 if ndb.get_note_by_id(txn, reply.id).is_err() { 298 ids.insert(UnknownId::Id(NoteId::new(*reply.id))); 299 } 300 } 301 } 302 } 303 304 let blocks = ndb.get_blocks_by_key(txn, note.key().expect("note key"))?; 305 for block in blocks.iter(note) { 306 if block.blocktype() != BlockType::MentionBech32 { 307 continue; 308 } 309 310 match block.as_mention().unwrap() { 311 Mention::Pubkey(npub) => { 312 if ndb.get_profile_by_pubkey(txn, npub.pubkey()).is_err() { 313 ids.insert(UnknownId::Pubkey(Pubkey::new(*npub.pubkey()))); 314 } 315 } 316 Mention::Profile(nprofile) => { 317 if ndb.get_profile_by_pubkey(txn, nprofile.pubkey()).is_err() { 318 ids.insert(UnknownId::Pubkey(Pubkey::new(*nprofile.pubkey()))); 319 } 320 } 321 Mention::Event(ev) => match ndb.get_note_by_id(txn, ev.id()) { 322 Err(_) => { 323 ids.insert(UnknownId::Id(NoteId::new(*ev.id()))); 324 if let Some(pk) = ev.pubkey() { 325 if ndb.get_profile_by_pubkey(txn, pk).is_err() { 326 ids.insert(UnknownId::Pubkey(Pubkey::new(*pk))); 327 } 328 } 329 } 330 Ok(note) => { 331 if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { 332 ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); 333 } 334 } 335 }, 336 Mention::Note(note) => match ndb.get_note_by_id(txn, note.id()) { 337 Err(_) => { 338 ids.insert(UnknownId::Id(NoteId::new(*note.id()))); 339 } 340 Ok(note) => { 341 if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { 342 ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); 343 } 344 } 345 }, 346 _ => {} 347 } 348 } 349 350 Ok(()) 351 } 352 353 fn get_unknown_ids( 354 txn: &Transaction, 355 unknown_ids: &mut UnknownIds, 356 columns: &Columns, 357 ndb: &Ndb, 358 note_cache: &mut NoteCache, 359 ) -> Result<()> { 360 #[cfg(feature = "profiling")] 361 puffin::profile_function!(); 362 363 let mut new_cached_notes: Vec<(NoteKey, CachedNote)> = vec![]; 364 365 for timeline in columns.timelines() { 366 for noteref in timeline.notes(ViewFilter::NotesAndReplies) { 367 let note = ndb.get_note_by_key(txn, noteref.key)?; 368 let note_key = note.key().unwrap(); 369 let cached_note = note_cache.cached_note(noteref.key); 370 let cached_note = if let Some(cn) = cached_note { 371 cn.clone() 372 } else { 373 let new_cached_note = CachedNote::new(¬e); 374 new_cached_notes.push((note_key, new_cached_note.clone())); 375 new_cached_note 376 }; 377 378 let _ = get_unknown_note_ids(ndb, &cached_note, txn, ¬e, unknown_ids.ids_mut()); 379 } 380 } 381 382 // This is mainly done to avoid the double mutable borrow that would happen 383 // if we tried to update the note_cache mutably in the loop above 384 for (note_key, note) in new_cached_notes { 385 note_cache.cache_mut().insert(note_key, note); 386 } 387 388 Ok(()) 389 } 390 391 fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option<Vec<Filter>> { 392 if ids.is_empty() { 393 return None; 394 } 395 396 let ids = &ids[0..500.min(ids.len())]; 397 let mut filters: Vec<Filter> = vec![]; 398 399 let pks: Vec<&[u8; 32]> = ids 400 .iter() 401 .flat_map(|id| id.is_pubkey().map(|pk| pk.bytes())) 402 .collect(); 403 if !pks.is_empty() { 404 let pk_filter = Filter::new().authors(pks).kinds([0]).build(); 405 filters.push(pk_filter); 406 } 407 408 let note_ids: Vec<&[u8; 32]> = ids 409 .iter() 410 .flat_map(|id| id.is_id().map(|id| id.bytes())) 411 .collect(); 412 if !note_ids.is_empty() { 413 filters.push(Filter::new().ids(note_ids).build()); 414 } 415 416 Some(filters) 417 }