notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

thread.rs (14453B)


      1 use std::{
      2     collections::{BTreeSet, HashSet},
      3     hash::Hash,
      4 };
      5 
      6 use egui_nav::ReturnType;
      7 use egui_virtual_list::VirtualList;
      8 use enostr::{NoteId, RelayPool};
      9 use hashbrown::{hash_map::RawEntryMut, HashMap};
     10 use nostrdb::{Filter, Ndb, Note, NoteKey, NoteReplyBuf, Transaction};
     11 use notedeck::{NoteCache, NoteRef, UnknownIds};
     12 
     13 use crate::{
     14     actionbar::{process_thread_notes, NewThreadNotes},
     15     multi_subscriber::ThreadSubs,
     16     timeline::MergeKind,
     17 };
     18 
     19 use super::ThreadSelection;
     20 
     21 pub struct ThreadNode {
     22     pub replies: HybridSet<NoteRef>,
     23     pub prev: ParentState,
     24     pub have_all_ancestors: bool,
     25     pub list: VirtualList,
     26 }
     27 
     28 #[derive(Clone)]
     29 pub enum ParentState {
     30     Unknown,
     31     None,
     32     Parent(NoteId),
     33 }
     34 
     35 /// Affords:
     36 /// - O(1) contains
     37 /// - O(log n) sorted insertion
     38 pub struct HybridSet<T> {
     39     reversed: bool,
     40     lookup: HashSet<T>,   // fast deduplication
     41     ordered: BTreeSet<T>, // sorted iteration
     42 }
     43 
     44 impl<T> Default for HybridSet<T> {
     45     fn default() -> Self {
     46         Self {
     47             reversed: Default::default(),
     48             lookup: Default::default(),
     49             ordered: Default::default(),
     50         }
     51     }
     52 }
     53 
     54 pub enum InsertionResponse {
     55     AlreadyExists,
     56     Merged(MergeKind),
     57 }
     58 
     59 impl<T: Copy + Ord + Eq + Hash> HybridSet<T> {
     60     pub fn insert(&mut self, val: T) -> InsertionResponse {
     61         if !self.lookup.insert(val) {
     62             return InsertionResponse::AlreadyExists;
     63         }
     64 
     65         let front_insertion = match self.ordered.iter().next() {
     66             Some(first) => (val >= *first) == self.reversed,
     67             None => true,
     68         };
     69 
     70         self.ordered.insert(val); // O(log n)
     71 
     72         InsertionResponse::Merged(if front_insertion {
     73             MergeKind::FrontInsert
     74         } else {
     75             MergeKind::Spliced
     76         })
     77     }
     78 }
     79 
     80 impl<T: Eq + Hash> HybridSet<T> {
     81     pub fn contains(&self, val: &T) -> bool {
     82         self.lookup.contains(val) // O(1)
     83     }
     84 }
     85 
     86 impl<T> HybridSet<T> {
     87     pub fn iter(&self) -> HybridIter<'_, T> {
     88         HybridIter {
     89             inner: self.ordered.iter(),
     90             reversed: self.reversed,
     91         }
     92     }
     93 
     94     pub fn new(reversed: bool) -> Self {
     95         Self {
     96             reversed,
     97             ..Default::default()
     98         }
     99     }
    100 }
    101 
    102 impl<'a, T> IntoIterator for &'a HybridSet<T> {
    103     type Item = &'a T;
    104     type IntoIter = HybridIter<'a, T>;
    105 
    106     fn into_iter(self) -> Self::IntoIter {
    107         self.iter()
    108     }
    109 }
    110 
    111 pub struct HybridIter<'a, T> {
    112     inner: std::collections::btree_set::Iter<'a, T>,
    113     reversed: bool,
    114 }
    115 
    116 impl<'a, T> Iterator for HybridIter<'a, T> {
    117     type Item = &'a T;
    118 
    119     fn next(&mut self) -> Option<Self::Item> {
    120         if self.reversed {
    121             self.inner.next_back()
    122         } else {
    123             self.inner.next()
    124         }
    125     }
    126 }
    127 
    128 impl ThreadNode {
    129     pub fn new(parent: ParentState) -> Self {
    130         Self {
    131             replies: HybridSet::new(true),
    132             prev: parent,
    133             have_all_ancestors: false,
    134             list: VirtualList::new(),
    135         }
    136     }
    137 }
    138 
    139 #[derive(Default)]
    140 pub struct Threads {
    141     pub threads: HashMap<NoteId, ThreadNode>,
    142     pub subs: ThreadSubs,
    143 
    144     pub seen_flags: NoteSeenFlags,
    145 }
    146 
    147 impl Threads {
    148     /// Opening a thread.
    149     /// Similar to [[super::cache::TimelineCache::open]]
    150     pub fn open(
    151         &mut self,
    152         ndb: &mut Ndb,
    153         txn: &Transaction,
    154         pool: &mut RelayPool,
    155         thread: &ThreadSelection,
    156         new_scope: bool,
    157         col: usize,
    158     ) -> Option<NewThreadNotes> {
    159         tracing::info!("Opening thread: {:?}", thread);
    160         let local_sub_filter = if let Some(selected) = &thread.selected_note {
    161             vec![direct_replies_filter_non_root(
    162                 selected.bytes(),
    163                 thread.root_id.bytes(),
    164             )]
    165         } else {
    166             vec![direct_replies_filter_root(thread.root_id.bytes())]
    167         };
    168 
    169         let selected_note_id = thread.selected_or_root();
    170         self.seen_flags.mark_seen(selected_note_id);
    171 
    172         let filter = match self.threads.raw_entry_mut().from_key(&selected_note_id) {
    173             RawEntryMut::Occupied(_entry) => {
    174                 // TODO(kernelkind): reenable this once the panic is fixed
    175                 //
    176                 // let node = entry.into_mut();
    177                 // if let Some(first) = node.replies.first() {
    178                 //     &filter::make_filters_since(&local_sub_filter, first.created_at + 1)
    179                 // } else {
    180                 //     &local_sub_filter
    181                 // }
    182                 &local_sub_filter
    183             }
    184             RawEntryMut::Vacant(entry) => {
    185                 let id = NoteId::new(*selected_note_id);
    186 
    187                 let node = ThreadNode::new(ParentState::Unknown);
    188                 entry.insert(id, node);
    189 
    190                 &local_sub_filter
    191             }
    192         };
    193 
    194         let new_notes = ndb.query(txn, filter, 500).ok().map(|r| {
    195             r.into_iter()
    196                 .map(NoteRef::from_query_result)
    197                 .collect::<Vec<_>>()
    198         });
    199 
    200         self.subs
    201             .subscribe(ndb, pool, col, thread, local_sub_filter, new_scope, || {
    202                 replies_filter_remote(thread)
    203             });
    204 
    205         new_notes.map(|notes| NewThreadNotes {
    206             selected_note_id: NoteId::new(*selected_note_id),
    207             notes: notes.into_iter().map(|f| f.key).collect(),
    208         })
    209     }
    210 
    211     pub fn close(
    212         &mut self,
    213         ndb: &mut Ndb,
    214         pool: &mut RelayPool,
    215         thread: &ThreadSelection,
    216         return_type: ReturnType,
    217         id: usize,
    218     ) {
    219         tracing::info!("Closing thread: {:?}", thread);
    220         self.subs.unsubscribe(ndb, pool, id, thread, return_type);
    221     }
    222 
    223     /// Responsible for making sure the chain and the direct replies are up to date
    224     pub fn update(
    225         &mut self,
    226         selected: &Note<'_>,
    227         note_cache: &mut NoteCache,
    228         ndb: &Ndb,
    229         txn: &Transaction,
    230         unknown_ids: &mut UnknownIds,
    231         col: usize,
    232     ) {
    233         let Some(selected_key) = selected.key() else {
    234             tracing::error!("Selected note did not have a key");
    235             return;
    236         };
    237 
    238         let reply = note_cache
    239             .cached_note_or_insert_mut(selected_key, selected)
    240             .reply;
    241 
    242         self.fill_reply_chain_recursive(selected, &reply, note_cache, ndb, txn, unknown_ids);
    243         let node = self
    244             .threads
    245             .get_mut(&selected.id())
    246             .expect("should be guarenteed to exist from `Self::fill_reply_chain_recursive`");
    247 
    248         let Some(sub) = self.subs.get_local(col) else {
    249             tracing::error!("Was expecting to find local sub");
    250             return;
    251         };
    252 
    253         let keys = ndb.poll_for_notes(sub.sub, 10);
    254 
    255         if keys.is_empty() {
    256             return;
    257         }
    258 
    259         tracing::info!("Got {} new notes", keys.len());
    260 
    261         process_thread_notes(
    262             &keys,
    263             node,
    264             &mut self.seen_flags,
    265             ndb,
    266             txn,
    267             unknown_ids,
    268             note_cache,
    269         );
    270     }
    271 
    272     fn fill_reply_chain_recursive(
    273         &mut self,
    274         cur_note: &Note<'_>,
    275         cur_reply: &NoteReplyBuf,
    276         note_cache: &mut NoteCache,
    277         ndb: &Ndb,
    278         txn: &Transaction,
    279         unknown_ids: &mut UnknownIds,
    280     ) -> bool {
    281         let (unknown_parent_state, mut have_all_ancestors) = self
    282             .threads
    283             .get(&cur_note.id())
    284             .map(|t| (matches!(t.prev, ParentState::Unknown), t.have_all_ancestors))
    285             .unwrap_or((true, false));
    286 
    287         if have_all_ancestors {
    288             return true;
    289         }
    290 
    291         let mut new_parent = None;
    292 
    293         let note_reply = cur_reply.borrow(cur_note.tags());
    294 
    295         let next_link = 's: {
    296             let Some(parent) = note_reply.reply() else {
    297                 break 's NextLink::None;
    298             };
    299 
    300             if unknown_parent_state {
    301                 new_parent = Some(ParentState::Parent(NoteId::new(*parent.id)));
    302             }
    303 
    304             let Ok(reply_note) = ndb.get_note_by_id(txn, parent.id) else {
    305                 break 's NextLink::Unknown(parent.id);
    306             };
    307 
    308             let Some(notekey) = reply_note.key() else {
    309                 break 's NextLink::Unknown(parent.id);
    310             };
    311 
    312             NextLink::Next(reply_note, notekey)
    313         };
    314 
    315         match next_link {
    316             NextLink::Unknown(parent) => {
    317                 unknown_ids.add_note_id_if_missing(ndb, txn, parent);
    318             }
    319             NextLink::Next(next_note, note_key) => {
    320                 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &next_note);
    321 
    322                 let cached_note = note_cache.cached_note_or_insert_mut(note_key, &next_note);
    323 
    324                 let next_reply = cached_note.reply;
    325                 if self.fill_reply_chain_recursive(
    326                     &next_note,
    327                     &next_reply,
    328                     note_cache,
    329                     ndb,
    330                     txn,
    331                     unknown_ids,
    332                 ) {
    333                     have_all_ancestors = true;
    334                 }
    335 
    336                 if !self.seen_flags.contains(next_note.id()) {
    337                     self.seen_flags.mark_replies(
    338                         next_note.id(),
    339                         selected_has_at_least_n_replies(ndb, txn, None, next_note.id(), 2),
    340                     );
    341                 }
    342             }
    343             NextLink::None => {
    344                 have_all_ancestors = true;
    345                 new_parent = Some(ParentState::None);
    346             }
    347         }
    348 
    349         match self.threads.raw_entry_mut().from_key(&cur_note.id()) {
    350             RawEntryMut::Occupied(entry) => {
    351                 let node = entry.into_mut();
    352                 if let Some(parent) = new_parent {
    353                     node.prev = parent;
    354                 }
    355 
    356                 if have_all_ancestors {
    357                     node.have_all_ancestors = true;
    358                 }
    359             }
    360             RawEntryMut::Vacant(entry) => {
    361                 let id = NoteId::new(*cur_note.id());
    362                 let parent = new_parent.unwrap_or(ParentState::Unknown);
    363                 let (_, res) = entry.insert(id, ThreadNode::new(parent));
    364 
    365                 if have_all_ancestors {
    366                     res.have_all_ancestors = true;
    367                 }
    368             }
    369         }
    370 
    371         have_all_ancestors
    372     }
    373 }
    374 
    375 enum NextLink<'a> {
    376     Unknown(&'a [u8; 32]),
    377     Next(Note<'a>, NoteKey),
    378     None,
    379 }
    380 
    381 pub fn selected_has_at_least_n_replies(
    382     ndb: &Ndb,
    383     txn: &Transaction,
    384     selected: Option<&[u8; 32]>,
    385     root: &[u8; 32],
    386     n: u8,
    387 ) -> bool {
    388     let filter = if let Some(selected) = selected {
    389         &vec![direct_replies_filter_non_root(selected, root)]
    390     } else {
    391         &vec![direct_replies_filter_root(root)]
    392     };
    393 
    394     let Ok(res) = ndb.query(txn, filter, n as i32) else {
    395         return false;
    396     };
    397 
    398     res.len() >= n.into()
    399 }
    400 
    401 fn direct_replies_filter_non_root(
    402     selected_note_id: &[u8; 32],
    403     root_id: &[u8; 32],
    404 ) -> nostrdb::Filter {
    405     let tmp_selected = *selected_note_id;
    406     nostrdb::Filter::new()
    407         .kinds([1])
    408         .custom(move |n: nostrdb::Note<'_>| {
    409             for tag in n.tags() {
    410                 if tag.count() < 4 {
    411                     continue;
    412                 }
    413 
    414                 let Some("e") = tag.get_str(0) else {
    415                     continue;
    416                 };
    417 
    418                 let Some(tagged_id) = tag.get_id(1) else {
    419                     continue;
    420                 };
    421 
    422                 if *tagged_id != tmp_selected {
    423                     // NOTE: if these aren't dereferenced a segfault occurs...
    424                     continue;
    425                 }
    426 
    427                 if let Some(data) = tag.get_str(3) {
    428                     if data == "reply" {
    429                         return true;
    430                     }
    431                 }
    432             }
    433             false
    434         })
    435         .event(root_id)
    436         .build()
    437 }
    438 
    439 /// Custom filter requirements:
    440 /// - Do NOT capture references (e.g. `*root_id`) inside the closure
    441 /// - Instead, copy values outside and capture them with `move`
    442 ///
    443 /// Incorrect:
    444 ///     .custom(|_| { *root_id })       // ❌
    445 /// Also Incorrect:
    446 ///     .custom(move |_| { *root_id })  // ❌
    447 /// Correct:
    448 ///     let tmp = *root_id;
    449 ///     .custom(move |_| { tmp })       // ✅
    450 fn direct_replies_filter_root(root_id: &[u8; 32]) -> nostrdb::Filter {
    451     let tmp_root = *root_id;
    452     nostrdb::Filter::new()
    453         .kinds([1])
    454         .custom(move |n: nostrdb::Note<'_>| {
    455             let mut contains_root = false;
    456             for tag in n.tags() {
    457                 if tag.count() < 4 {
    458                     continue;
    459                 }
    460 
    461                 let Some("e") = tag.get_str(0) else {
    462                     continue;
    463                 };
    464 
    465                 if let Some(s) = tag.get_str(3) {
    466                     if s == "reply" {
    467                         return false;
    468                     }
    469                 }
    470 
    471                 let Some(tagged_id) = tag.get_id(1) else {
    472                     continue;
    473                 };
    474 
    475                 if *tagged_id != tmp_root {
    476                     continue;
    477                 }
    478 
    479                 if let Some(s) = tag.get_str(3) {
    480                     if s == "root" {
    481                         contains_root = true;
    482                     }
    483                 }
    484             }
    485 
    486             contains_root
    487         })
    488         .event(root_id)
    489         .build()
    490 }
    491 
    492 fn replies_filter_remote(selection: &ThreadSelection) -> Vec<Filter> {
    493     vec![
    494         nostrdb::Filter::new()
    495             .kinds([1])
    496             .event(selection.root_id.bytes())
    497             .build(),
    498         nostrdb::Filter::new()
    499             .ids([selection.root_id.bytes()])
    500             .limit(1)
    501             .build(),
    502     ]
    503 }
    504 
    505 /// Represents indicators that there is more content in the note to view
    506 #[derive(Default)]
    507 pub struct NoteSeenFlags {
    508     // true indicates the note has replies AND it has not been read
    509     pub flags: HashMap<NoteId, bool>,
    510 }
    511 
    512 impl NoteSeenFlags {
    513     pub fn mark_seen(&mut self, note_id: &[u8; 32]) {
    514         self.flags.insert(NoteId::new(*note_id), false);
    515     }
    516 
    517     pub fn mark_replies(&mut self, note_id: &[u8; 32], has_replies: bool) {
    518         self.flags.insert(NoteId::new(*note_id), has_replies);
    519     }
    520 
    521     pub fn get(&self, note_id: &[u8; 32]) -> Option<&bool> {
    522         self.flags.get(&note_id)
    523     }
    524 
    525     pub fn contains(&self, note_id: &[u8; 32]) -> bool {
    526         self.flags.contains_key(&note_id)
    527     }
    528 }