unknowns.rs (8355B)
1 use crate::notecache::CachedNote; 2 use crate::timeline::ViewFilter; 3 use crate::{Damus, Result}; 4 use enostr::{Filter, NoteId, Pubkey}; 5 use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction}; 6 use std::collections::HashSet; 7 use std::time::{Duration, Instant}; 8 use tracing::error; 9 10 /// Unknown Id searcher 11 #[derive(Default)] 12 pub struct UnknownIds { 13 ids: HashSet<UnknownId>, 14 first_updated: Option<Instant>, 15 last_updated: Option<Instant>, 16 } 17 18 impl UnknownIds { 19 /// Simple debouncer 20 pub fn ready_to_send(&self) -> bool { 21 if self.ids.is_empty() { 22 return false; 23 } 24 25 // we trigger on first set 26 if self.first_updated == self.last_updated { 27 return true; 28 } 29 30 let last_updated = if let Some(last) = self.last_updated { 31 last 32 } else { 33 // if we've 34 return true; 35 }; 36 37 Instant::now() - last_updated >= Duration::from_secs(2) 38 } 39 40 pub fn ids(&self) -> &HashSet<UnknownId> { 41 &self.ids 42 } 43 44 pub fn ids_mut(&mut self) -> &mut HashSet<UnknownId> { 45 &mut self.ids 46 } 47 48 pub fn clear(&mut self) { 49 self.ids = HashSet::default(); 50 } 51 52 pub fn filter(&self) -> Option<Vec<Filter>> { 53 let ids: Vec<&UnknownId> = self.ids.iter().collect(); 54 get_unknown_ids_filter(&ids) 55 } 56 57 /// We've updated some unknown ids, update the last_updated time to now 58 pub fn mark_updated(&mut self) { 59 let now = Instant::now(); 60 if self.first_updated.is_none() { 61 self.first_updated = Some(now); 62 } 63 self.last_updated = Some(now); 64 } 65 66 pub fn update_from_note(txn: &Transaction, app: &mut Damus, note: &Note) -> bool { 67 let before = app.unknown_ids.ids().len(); 68 let key = note.key().expect("note key"); 69 let cached_note = app 70 .note_cache_mut() 71 .cached_note_or_insert(key, note) 72 .clone(); 73 if let Err(e) = 74 get_unknown_note_ids(&app.ndb, &cached_note, txn, note, app.unknown_ids.ids_mut()) 75 { 76 error!("UnknownIds::update_from_note {e}"); 77 } 78 let after = app.unknown_ids.ids().len(); 79 80 if before != after { 81 app.unknown_ids.mark_updated(); 82 true 83 } else { 84 false 85 } 86 } 87 88 pub fn update(txn: &Transaction, app: &mut Damus) -> bool { 89 let before = app.unknown_ids.ids().len(); 90 if let Err(e) = get_unknown_ids(txn, app) { 91 error!("UnknownIds::update {e}"); 92 } 93 let after = app.unknown_ids.ids().len(); 94 95 if before != after { 96 app.unknown_ids.mark_updated(); 97 true 98 } else { 99 false 100 } 101 } 102 } 103 104 #[derive(Hash, Clone, Copy, PartialEq, Eq)] 105 pub enum UnknownId { 106 Pubkey(Pubkey), 107 Id(NoteId), 108 } 109 110 impl UnknownId { 111 pub fn is_pubkey(&self) -> Option<&Pubkey> { 112 match self { 113 UnknownId::Pubkey(pk) => Some(pk), 114 _ => None, 115 } 116 } 117 118 pub fn is_id(&self) -> Option<&NoteId> { 119 match self { 120 UnknownId::Id(id) => Some(id), 121 _ => None, 122 } 123 } 124 } 125 126 /// Look for missing notes in various parts of notes that we see: 127 /// 128 /// - pubkeys and notes mentioned inside the note 129 /// - notes being replied to 130 /// 131 /// We return all of this in a HashSet so that we can fetch these from 132 /// remote relays. 133 /// 134 pub fn get_unknown_note_ids<'a>( 135 ndb: &Ndb, 136 cached_note: &CachedNote, 137 txn: &'a Transaction, 138 note: &Note<'a>, 139 ids: &mut HashSet<UnknownId>, 140 ) -> Result<()> { 141 // the author pubkey 142 143 if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { 144 ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); 145 } 146 147 // pull notes that notes are replying to 148 if cached_note.reply.root.is_some() { 149 let note_reply = cached_note.reply.borrow(note.tags()); 150 if let Some(root) = note_reply.root() { 151 if ndb.get_note_by_id(txn, root.id).is_err() { 152 ids.insert(UnknownId::Id(NoteId::new(*root.id))); 153 } 154 } 155 156 if !note_reply.is_reply_to_root() { 157 if let Some(reply) = note_reply.reply() { 158 if ndb.get_note_by_id(txn, reply.id).is_err() { 159 ids.insert(UnknownId::Id(NoteId::new(*reply.id))); 160 } 161 } 162 } 163 } 164 165 let blocks = ndb.get_blocks_by_key(txn, note.key().expect("note key"))?; 166 for block in blocks.iter(note) { 167 if block.blocktype() != BlockType::MentionBech32 { 168 continue; 169 } 170 171 match block.as_mention().unwrap() { 172 Mention::Pubkey(npub) => { 173 if ndb.get_profile_by_pubkey(txn, npub.pubkey()).is_err() { 174 ids.insert(UnknownId::Pubkey(Pubkey::new(*npub.pubkey()))); 175 } 176 } 177 Mention::Profile(nprofile) => { 178 if ndb.get_profile_by_pubkey(txn, nprofile.pubkey()).is_err() { 179 ids.insert(UnknownId::Pubkey(Pubkey::new(*nprofile.pubkey()))); 180 } 181 } 182 Mention::Event(ev) => match ndb.get_note_by_id(txn, ev.id()) { 183 Err(_) => { 184 ids.insert(UnknownId::Id(NoteId::new(*ev.id()))); 185 if let Some(pk) = ev.pubkey() { 186 if ndb.get_profile_by_pubkey(txn, pk).is_err() { 187 ids.insert(UnknownId::Pubkey(Pubkey::new(*pk))); 188 } 189 } 190 } 191 Ok(note) => { 192 if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { 193 ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); 194 } 195 } 196 }, 197 Mention::Note(note) => match ndb.get_note_by_id(txn, note.id()) { 198 Err(_) => { 199 ids.insert(UnknownId::Id(NoteId::new(*note.id()))); 200 } 201 Ok(note) => { 202 if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() { 203 ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey()))); 204 } 205 } 206 }, 207 _ => {} 208 } 209 } 210 211 Ok(()) 212 } 213 214 fn get_unknown_ids(txn: &Transaction, damus: &mut Damus) -> Result<()> { 215 #[cfg(feature = "profiling")] 216 puffin::profile_function!(); 217 218 let mut new_cached_notes: Vec<(NoteKey, CachedNote)> = vec![]; 219 220 for timeline in &damus.timelines { 221 for noteref in timeline.notes(ViewFilter::NotesAndReplies) { 222 let note = damus.ndb.get_note_by_key(txn, noteref.key)?; 223 let note_key = note.key().unwrap(); 224 let cached_note = damus.note_cache().cached_note(noteref.key); 225 let cached_note = if let Some(cn) = cached_note { 226 cn.clone() 227 } else { 228 let new_cached_note = CachedNote::new(¬e); 229 new_cached_notes.push((note_key, new_cached_note.clone())); 230 new_cached_note 231 }; 232 233 let _ = get_unknown_note_ids( 234 &damus.ndb, 235 &cached_note, 236 txn, 237 ¬e, 238 damus.unknown_ids.ids_mut(), 239 ); 240 } 241 } 242 243 // This is mainly done to avoid the double mutable borrow that would happen 244 // if we tried to update the note_cache mutably in the loop above 245 for (note_key, note) in new_cached_notes { 246 damus.note_cache_mut().cache_mut().insert(note_key, note); 247 } 248 249 Ok(()) 250 } 251 252 fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option<Vec<Filter>> { 253 if ids.is_empty() { 254 return None; 255 } 256 257 let ids = &ids[0..500.min(ids.len())]; 258 let mut filters: Vec<Filter> = vec![]; 259 260 let pks: Vec<&[u8; 32]> = ids 261 .iter() 262 .flat_map(|id| id.is_pubkey().map(|pk| pk.bytes())) 263 .collect(); 264 if !pks.is_empty() { 265 let pk_filter = Filter::new().authors(pks).kinds([0]).build(); 266 filters.push(pk_filter); 267 } 268 269 let note_ids: Vec<&[u8; 32]> = ids 270 .iter() 271 .flat_map(|id| id.is_id().map(|id| id.bytes())) 272 .collect(); 273 if !note_ids.is_empty() { 274 filters.push(Filter::new().ids(note_ids).build()); 275 } 276 277 Some(filters) 278 }