notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

unknowns.rs (12231B)


      1 use crate::{
      2     column::Columns,
      3     note::NoteRef,
      4     notecache::{CachedNote, NoteCache},
      5     timeline::ViewFilter,
      6     Result,
      7 };
      8 
      9 use enostr::{Filter, NoteId, Pubkey};
     10 use nostrdb::{BlockType, Mention, Ndb, Note, NoteKey, Transaction};
     11 use std::collections::HashSet;
     12 use std::time::{Duration, Instant};
     13 use tracing::error;
     14 
     15 #[must_use = "process_action should be used on this result"]
     16 pub enum SingleUnkIdAction {
     17     NoAction,
     18     NeedsProcess(UnknownId),
     19 }
     20 
     21 #[must_use = "process_action should be used on this result"]
     22 pub enum NoteRefsUnkIdAction {
     23     NoAction,
     24     NeedsProcess(Vec<NoteRef>),
     25 }
     26 
     27 impl NoteRefsUnkIdAction {
     28     pub fn new(refs: Vec<NoteRef>) -> Self {
     29         NoteRefsUnkIdAction::NeedsProcess(refs)
     30     }
     31 
     32     pub fn no_action() -> Self {
     33         Self::NoAction
     34     }
     35 
     36     pub fn process_action(
     37         &self,
     38         txn: &Transaction,
     39         ndb: &Ndb,
     40         unk_ids: &mut UnknownIds,
     41         note_cache: &mut NoteCache,
     42     ) {
     43         match self {
     44             Self::NoAction => {}
     45             Self::NeedsProcess(refs) => {
     46                 UnknownIds::update_from_note_refs(txn, ndb, unk_ids, note_cache, refs);
     47             }
     48         }
     49     }
     50 }
     51 
     52 impl SingleUnkIdAction {
     53     pub fn new(id: UnknownId) -> Self {
     54         SingleUnkIdAction::NeedsProcess(id)
     55     }
     56 
     57     pub fn no_action() -> Self {
     58         Self::NoAction
     59     }
     60 
     61     pub fn pubkey(pubkey: Pubkey) -> Self {
     62         SingleUnkIdAction::new(UnknownId::Pubkey(pubkey))
     63     }
     64 
     65     pub fn note_id(note_id: NoteId) -> Self {
     66         SingleUnkIdAction::new(UnknownId::Id(note_id))
     67     }
     68 
     69     /// Some functions may return unknown id actions that need to be processed.
     70     /// For example, when we add a new account we need to make sure we have the
     71     /// profile for that account. This function ensures we add this to the
     72     /// unknown id tracker without adding side effects to functions.
     73     pub fn process_action(&self, ids: &mut UnknownIds, ndb: &Ndb, txn: &Transaction) {
     74         match self {
     75             Self::NeedsProcess(id) => {
     76                 ids.add_unknown_id_if_missing(ndb, txn, id);
     77             }
     78             Self::NoAction => {}
     79         }
     80     }
     81 }
     82 
     83 /// Unknown Id searcher
     84 #[derive(Default)]
     85 pub struct UnknownIds {
     86     ids: HashSet<UnknownId>,
     87     first_updated: Option<Instant>,
     88     last_updated: Option<Instant>,
     89 }
     90 
     91 impl UnknownIds {
     92     /// Simple debouncer
     93     pub fn ready_to_send(&self) -> bool {
     94         if self.ids.is_empty() {
     95             return false;
     96         }
     97 
     98         // we trigger on first set
     99         if self.first_updated == self.last_updated {
    100             return true;
    101         }
    102 
    103         let last_updated = if let Some(last) = self.last_updated {
    104             last
    105         } else {
    106             // if we've
    107             return true;
    108         };
    109 
    110         Instant::now() - last_updated >= Duration::from_secs(2)
    111     }
    112 
    113     pub fn ids(&self) -> &HashSet<UnknownId> {
    114         &self.ids
    115     }
    116 
    117     pub fn ids_mut(&mut self) -> &mut HashSet<UnknownId> {
    118         &mut self.ids
    119     }
    120 
    121     pub fn clear(&mut self) {
    122         self.ids = HashSet::default();
    123     }
    124 
    125     pub fn filter(&self) -> Option<Vec<Filter>> {
    126         let ids: Vec<&UnknownId> = self.ids.iter().collect();
    127         get_unknown_ids_filter(&ids)
    128     }
    129 
    130     /// We've updated some unknown ids, update the last_updated time to now
    131     pub fn mark_updated(&mut self) {
    132         let now = Instant::now();
    133         if self.first_updated.is_none() {
    134             self.first_updated = Some(now);
    135         }
    136         self.last_updated = Some(now);
    137     }
    138 
    139     pub fn update_from_note_key(
    140         txn: &Transaction,
    141         ndb: &Ndb,
    142         unknown_ids: &mut UnknownIds,
    143         note_cache: &mut NoteCache,
    144         key: NoteKey,
    145     ) -> bool {
    146         let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
    147             note
    148         } else {
    149             return false;
    150         };
    151 
    152         UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &note)
    153     }
    154 
    155     /// Should be called on freshly polled notes from subscriptions
    156     pub fn update_from_note_refs(
    157         txn: &Transaction,
    158         ndb: &Ndb,
    159         unknown_ids: &mut UnknownIds,
    160         note_cache: &mut NoteCache,
    161         note_refs: &[NoteRef],
    162     ) {
    163         for note_ref in note_refs {
    164             Self::update_from_note_key(txn, ndb, unknown_ids, note_cache, note_ref.key);
    165         }
    166     }
    167 
    168     pub fn update_from_note(
    169         txn: &Transaction,
    170         ndb: &Ndb,
    171         unknown_ids: &mut UnknownIds,
    172         note_cache: &mut NoteCache,
    173         note: &Note,
    174     ) -> bool {
    175         let before = unknown_ids.ids().len();
    176         let key = note.key().expect("note key");
    177         //let cached_note = note_cache.cached_note_or_insert(key, note).clone();
    178         let cached_note = note_cache.cached_note_or_insert(key, note);
    179         if let Err(e) = get_unknown_note_ids(ndb, cached_note, txn, note, unknown_ids.ids_mut()) {
    180             error!("UnknownIds::update_from_note {e}");
    181         }
    182         let after = unknown_ids.ids().len();
    183 
    184         if before != after {
    185             unknown_ids.mark_updated();
    186             true
    187         } else {
    188             false
    189         }
    190     }
    191 
    192     pub fn add_unknown_id_if_missing(&mut self, ndb: &Ndb, txn: &Transaction, unk_id: &UnknownId) {
    193         match unk_id {
    194             UnknownId::Pubkey(pk) => self.add_pubkey_if_missing(ndb, txn, pk),
    195             UnknownId::Id(note_id) => self.add_note_id_if_missing(ndb, txn, note_id),
    196         }
    197     }
    198 
    199     pub fn add_pubkey_if_missing(&mut self, ndb: &Ndb, txn: &Transaction, pubkey: &Pubkey) {
    200         // we already have this profile, skip
    201         if ndb.get_profile_by_pubkey(txn, pubkey).is_ok() {
    202             return;
    203         }
    204 
    205         self.ids.insert(UnknownId::Pubkey(*pubkey));
    206         self.mark_updated();
    207     }
    208 
    209     pub fn add_note_id_if_missing(&mut self, ndb: &Ndb, txn: &Transaction, note_id: &NoteId) {
    210         // we already have this note, skip
    211         if ndb.get_note_by_id(txn, note_id.bytes()).is_ok() {
    212             return;
    213         }
    214 
    215         self.ids.insert(UnknownId::Id(*note_id));
    216         self.mark_updated();
    217     }
    218 
    219     pub fn update(
    220         txn: &Transaction,
    221         unknown_ids: &mut UnknownIds,
    222         columns: &Columns,
    223         ndb: &Ndb,
    224         note_cache: &mut NoteCache,
    225     ) -> bool {
    226         let before = unknown_ids.ids().len();
    227         if let Err(e) = get_unknown_ids(txn, unknown_ids, columns, ndb, note_cache) {
    228             error!("UnknownIds::update {e}");
    229         }
    230         let after = unknown_ids.ids().len();
    231 
    232         if before != after {
    233             unknown_ids.mark_updated();
    234             true
    235         } else {
    236             false
    237         }
    238     }
    239 }
    240 
    241 #[derive(Hash, Clone, Copy, PartialEq, Eq)]
    242 pub enum UnknownId {
    243     Pubkey(Pubkey),
    244     Id(NoteId),
    245 }
    246 
    247 impl UnknownId {
    248     pub fn is_pubkey(&self) -> Option<&Pubkey> {
    249         match self {
    250             UnknownId::Pubkey(pk) => Some(pk),
    251             _ => None,
    252         }
    253     }
    254 
    255     pub fn is_id(&self) -> Option<&NoteId> {
    256         match self {
    257             UnknownId::Id(id) => Some(id),
    258             _ => None,
    259         }
    260     }
    261 }
    262 
    263 /// Look for missing notes in various parts of notes that we see:
    264 ///
    265 /// - pubkeys and notes mentioned inside the note
    266 /// - notes being replied to
    267 ///
    268 /// We return all of this in a HashSet so that we can fetch these from
    269 /// remote relays.
    270 ///
    271 pub fn get_unknown_note_ids<'a>(
    272     ndb: &Ndb,
    273     cached_note: &CachedNote,
    274     txn: &'a Transaction,
    275     note: &Note<'a>,
    276     ids: &mut HashSet<UnknownId>,
    277 ) -> Result<()> {
    278     #[cfg(feature = "profiling")]
    279     puffin::profile_function!();
    280 
    281     // the author pubkey
    282     if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
    283         ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey())));
    284     }
    285 
    286     // pull notes that notes are replying to
    287     if cached_note.reply.root.is_some() {
    288         let note_reply = cached_note.reply.borrow(note.tags());
    289         if let Some(root) = note_reply.root() {
    290             if ndb.get_note_by_id(txn, root.id).is_err() {
    291                 ids.insert(UnknownId::Id(NoteId::new(*root.id)));
    292             }
    293         }
    294 
    295         if !note_reply.is_reply_to_root() {
    296             if let Some(reply) = note_reply.reply() {
    297                 if ndb.get_note_by_id(txn, reply.id).is_err() {
    298                     ids.insert(UnknownId::Id(NoteId::new(*reply.id)));
    299                 }
    300             }
    301         }
    302     }
    303 
    304     let blocks = ndb.get_blocks_by_key(txn, note.key().expect("note key"))?;
    305     for block in blocks.iter(note) {
    306         if block.blocktype() != BlockType::MentionBech32 {
    307             continue;
    308         }
    309 
    310         match block.as_mention().unwrap() {
    311             Mention::Pubkey(npub) => {
    312                 if ndb.get_profile_by_pubkey(txn, npub.pubkey()).is_err() {
    313                     ids.insert(UnknownId::Pubkey(Pubkey::new(*npub.pubkey())));
    314                 }
    315             }
    316             Mention::Profile(nprofile) => {
    317                 if ndb.get_profile_by_pubkey(txn, nprofile.pubkey()).is_err() {
    318                     ids.insert(UnknownId::Pubkey(Pubkey::new(*nprofile.pubkey())));
    319                 }
    320             }
    321             Mention::Event(ev) => match ndb.get_note_by_id(txn, ev.id()) {
    322                 Err(_) => {
    323                     ids.insert(UnknownId::Id(NoteId::new(*ev.id())));
    324                     if let Some(pk) = ev.pubkey() {
    325                         if ndb.get_profile_by_pubkey(txn, pk).is_err() {
    326                             ids.insert(UnknownId::Pubkey(Pubkey::new(*pk)));
    327                         }
    328                     }
    329                 }
    330                 Ok(note) => {
    331                     if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
    332                         ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey())));
    333                     }
    334                 }
    335             },
    336             Mention::Note(note) => match ndb.get_note_by_id(txn, note.id()) {
    337                 Err(_) => {
    338                     ids.insert(UnknownId::Id(NoteId::new(*note.id())));
    339                 }
    340                 Ok(note) => {
    341                     if ndb.get_profile_by_pubkey(txn, note.pubkey()).is_err() {
    342                         ids.insert(UnknownId::Pubkey(Pubkey::new(*note.pubkey())));
    343                     }
    344                 }
    345             },
    346             _ => {}
    347         }
    348     }
    349 
    350     Ok(())
    351 }
    352 
    353 fn get_unknown_ids(
    354     txn: &Transaction,
    355     unknown_ids: &mut UnknownIds,
    356     columns: &Columns,
    357     ndb: &Ndb,
    358     note_cache: &mut NoteCache,
    359 ) -> Result<()> {
    360     #[cfg(feature = "profiling")]
    361     puffin::profile_function!();
    362 
    363     let mut new_cached_notes: Vec<(NoteKey, CachedNote)> = vec![];
    364 
    365     for timeline in columns.timelines() {
    366         for noteref in timeline.notes(ViewFilter::NotesAndReplies) {
    367             let note = ndb.get_note_by_key(txn, noteref.key)?;
    368             let note_key = note.key().unwrap();
    369             let cached_note = note_cache.cached_note(noteref.key);
    370             let cached_note = if let Some(cn) = cached_note {
    371                 cn.clone()
    372             } else {
    373                 let new_cached_note = CachedNote::new(&note);
    374                 new_cached_notes.push((note_key, new_cached_note.clone()));
    375                 new_cached_note
    376             };
    377 
    378             let _ = get_unknown_note_ids(ndb, &cached_note, txn, &note, unknown_ids.ids_mut());
    379         }
    380     }
    381 
    382     // This is mainly done to avoid the double mutable borrow that would happen
    383     // if we tried to update the note_cache mutably in the loop above
    384     for (note_key, note) in new_cached_notes {
    385         note_cache.cache_mut().insert(note_key, note);
    386     }
    387 
    388     Ok(())
    389 }
    390 
    391 fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option<Vec<Filter>> {
    392     if ids.is_empty() {
    393         return None;
    394     }
    395 
    396     let ids = &ids[0..500.min(ids.len())];
    397     let mut filters: Vec<Filter> = vec![];
    398 
    399     let pks: Vec<&[u8; 32]> = ids
    400         .iter()
    401         .flat_map(|id| id.is_pubkey().map(|pk| pk.bytes()))
    402         .collect();
    403     if !pks.is_empty() {
    404         let pk_filter = Filter::new().authors(pks).kinds([0]).build();
    405         filters.push(pk_filter);
    406     }
    407 
    408     let note_ids: Vec<&[u8; 32]> = ids
    409         .iter()
    410         .flat_map(|id| id.is_id().map(|id| id.bytes()))
    411         .collect();
    412     if !note_ids.is_empty() {
    413         filters.push(Filter::new().ids(note_ids).build());
    414     }
    415 
    416     Some(filters)
    417 }