notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

thread.rs (12513B)


      1 use egui_nav::ReturnType;
      2 use egui_virtual_list::VirtualList;
      3 use enostr::NoteId;
      4 use hashbrown::{hash_map::RawEntryMut, HashMap};
      5 use nostrdb::{Filter, Ndb, Note, NoteKey, NoteReplyBuf, Transaction};
      6 use notedeck::{Accounts, NoteCache, NoteRef, ScopedSubApi, UnknownIds};
      7 
      8 use crate::{
      9     actionbar::{process_thread_notes, NewThreadNotes},
     10     timeline::{
     11         note_units::{NoteUnits, UnitKey},
     12         sub::ThreadSubs,
     13         unit::NoteUnit,
     14         InsertionResponse,
     15     },
     16 };
     17 
     18 use super::ThreadSelection;
     19 
     20 pub struct ThreadNode {
     21     pub replies: SingleNoteUnits,
     22     pub prev: ParentState,
     23     pub have_all_ancestors: bool,
     24     pub list: VirtualList,
     25     pub set_scroll_offset: Option<f32>,
     26 }
     27 
     28 #[derive(Clone)]
     29 pub enum ParentState {
     30     Unknown,
     31     None,
     32     Parent(NoteId),
     33 }
     34 
     35 impl ThreadNode {
     36     pub fn new(parent: ParentState) -> Self {
     37         Self {
     38             replies: SingleNoteUnits::new(true),
     39             prev: parent,
     40             have_all_ancestors: false,
     41             list: VirtualList::new(),
     42             set_scroll_offset: None,
     43         }
     44     }
     45 
     46     pub fn with_offset(mut self, offset: f32) -> Self {
     47         self.set_scroll_offset = Some(offset);
     48         self
     49     }
     50 }
     51 
     52 #[derive(Default)]
     53 pub struct Threads {
     54     pub threads: HashMap<NoteId, ThreadNode>,
     55     pub subs: ThreadSubs,
     56 
     57     pub seen_flags: NoteSeenFlags,
     58 }
     59 
     60 impl Threads {
     61     /// Opening a thread.
     62     /// Similar to [[super::cache::TimelineCache::open]]
     63     #[allow(clippy::too_many_arguments)]
     64     #[profiling::function]
     65     pub fn open(
     66         &mut self,
     67         ndb: &mut Ndb,
     68         txn: &Transaction,
     69         scoped_subs: &mut ScopedSubApi<'_, '_>,
     70         thread: &ThreadSelection,
     71         new_scope: bool,
     72         col: usize,
     73         scroll_offset: f32,
     74     ) -> Option<NewThreadNotes> {
     75         tracing::info!("Opening thread: {:?}", thread);
     76         let local_sub_filter = if let Some(selected) = &thread.selected_note {
     77             vec![direct_replies_filter_non_root(
     78                 selected.bytes(),
     79                 thread.root_id.bytes(),
     80             )]
     81         } else {
     82             vec![direct_replies_filter_root(thread.root_id.bytes())]
     83         };
     84 
     85         let selected_note_id = thread.selected_or_root();
     86         self.seen_flags.mark_seen(selected_note_id);
     87 
     88         let filter = match self.threads.raw_entry_mut().from_key(&selected_note_id) {
     89             RawEntryMut::Occupied(_entry) => {
     90                 // TODO(kernelkind): reenable this once the panic is fixed
     91                 //
     92                 // let node = entry.into_mut();
     93                 // if let Some(first) = node.replies.first() {
     94                 //     &filter::make_filters_since(&local_sub_filter, first.created_at + 1)
     95                 // } else {
     96                 //     &local_sub_filter
     97                 // }
     98                 &local_sub_filter
     99             }
    100             RawEntryMut::Vacant(entry) => {
    101                 let id = NoteId::new(*selected_note_id);
    102 
    103                 let node = ThreadNode::new(ParentState::Unknown).with_offset(scroll_offset);
    104                 entry.insert(id, node);
    105 
    106                 &local_sub_filter
    107             }
    108         };
    109 
    110         let new_notes = ndb.query(txn, filter, 500).ok().map(|r| {
    111             r.into_iter()
    112                 .map(NoteRef::from_query_result)
    113                 .collect::<Vec<_>>()
    114         });
    115 
    116         self.subs.subscribe(
    117             ndb,
    118             scoped_subs,
    119             col,
    120             thread,
    121             local_sub_filter,
    122             new_scope,
    123             replies_filter_remote(thread),
    124         );
    125 
    126         new_notes.map(|notes| NewThreadNotes {
    127             selected_note_id: NoteId::new(*selected_note_id),
    128             notes: notes.into_iter().map(|f| f.key).collect(),
    129         })
    130     }
    131 
    132     pub fn close(
    133         &mut self,
    134         ndb: &mut Ndb,
    135         scoped_subs: &mut ScopedSubApi<'_, '_>,
    136         thread: &ThreadSelection,
    137         return_type: ReturnType,
    138         id: usize,
    139     ) {
    140         tracing::info!("Closing thread: {:?}", thread);
    141         self.subs
    142             .unsubscribe(ndb, scoped_subs, id, thread, return_type);
    143     }
    144 
    145     /// Responsible for making sure the chain and the direct replies are up to date
    146     #[allow(clippy::too_many_arguments)]
    147     #[profiling::function]
    148     pub fn update(
    149         &mut self,
    150         selected: &Note<'_>,
    151         note_cache: &mut NoteCache,
    152         ndb: &Ndb,
    153         txn: &Transaction,
    154         unknown_ids: &mut UnknownIds,
    155         accounts: &Accounts,
    156         col: usize,
    157     ) {
    158         let Some(selected_key) = selected.key() else {
    159             tracing::error!("Selected note did not have a key");
    160             return;
    161         };
    162 
    163         let reply = note_cache
    164             .cached_note_or_insert_mut(selected_key, selected)
    165             .reply;
    166 
    167         self.fill_reply_chain_recursive(selected, &reply, note_cache, ndb, txn, unknown_ids);
    168         let node = self
    169             .threads
    170             .get_mut(&selected.id())
    171             .expect("should be guarenteed to exist from `Self::fill_reply_chain_recursive`");
    172 
    173         let Some(sub) = self.subs.get_local_for_selected(accounts, col) else {
    174             tracing::error!("Was expecting to find local sub");
    175             return;
    176         };
    177 
    178         let keys = ndb.poll_for_notes(*sub, 10);
    179 
    180         if keys.is_empty() {
    181             return;
    182         }
    183 
    184         tracing::info!("Got {} new notes", keys.len());
    185 
    186         process_thread_notes(
    187             &keys,
    188             node,
    189             &mut self.seen_flags,
    190             ndb,
    191             txn,
    192             unknown_ids,
    193             note_cache,
    194         );
    195     }
    196 
    197     fn fill_reply_chain_recursive(
    198         &mut self,
    199         cur_note: &Note<'_>,
    200         cur_reply: &NoteReplyBuf,
    201         note_cache: &mut NoteCache,
    202         ndb: &Ndb,
    203         txn: &Transaction,
    204         unknown_ids: &mut UnknownIds,
    205     ) -> bool {
    206         let (unknown_parent_state, mut have_all_ancestors) = self
    207             .threads
    208             .get(&cur_note.id())
    209             .map(|t| (matches!(t.prev, ParentState::Unknown), t.have_all_ancestors))
    210             .unwrap_or((true, false));
    211 
    212         if have_all_ancestors {
    213             return true;
    214         }
    215 
    216         let mut new_parent = None;
    217 
    218         let note_reply = cur_reply.borrow(cur_note.tags());
    219 
    220         let next_link = 's: {
    221             let Some(parent) = note_reply.reply() else {
    222                 break 's NextLink::None;
    223             };
    224 
    225             if unknown_parent_state {
    226                 new_parent = Some(ParentState::Parent(NoteId::new(*parent.id)));
    227             }
    228 
    229             let Ok(reply_note) = ndb.get_note_by_id(txn, parent.id) else {
    230                 break 's NextLink::Unknown(parent.id);
    231             };
    232 
    233             let Some(notekey) = reply_note.key() else {
    234                 break 's NextLink::Unknown(parent.id);
    235             };
    236 
    237             NextLink::Next(reply_note, notekey)
    238         };
    239 
    240         match next_link {
    241             NextLink::Unknown(parent) => {
    242                 unknown_ids.add_note_id_if_missing(ndb, txn, parent);
    243             }
    244             NextLink::Next(next_note, note_key) => {
    245                 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &next_note);
    246 
    247                 let cached_note = note_cache.cached_note_or_insert_mut(note_key, &next_note);
    248 
    249                 let next_reply = cached_note.reply;
    250                 if self.fill_reply_chain_recursive(
    251                     &next_note,
    252                     &next_reply,
    253                     note_cache,
    254                     ndb,
    255                     txn,
    256                     unknown_ids,
    257                 ) {
    258                     have_all_ancestors = true;
    259                 }
    260 
    261                 if !self.seen_flags.contains(next_note.id()) {
    262                     self.seen_flags.mark_replies(
    263                         next_note.id(),
    264                         selected_has_at_least_n_replies(ndb, txn, None, next_note.id(), 2),
    265                     );
    266                 }
    267             }
    268             NextLink::None => {
    269                 have_all_ancestors = true;
    270                 new_parent = Some(ParentState::None);
    271             }
    272         }
    273 
    274         match self.threads.raw_entry_mut().from_key(&cur_note.id()) {
    275             RawEntryMut::Occupied(entry) => {
    276                 let node = entry.into_mut();
    277                 if let Some(parent) = new_parent {
    278                     node.prev = parent;
    279                 }
    280 
    281                 if have_all_ancestors {
    282                     node.have_all_ancestors = true;
    283                 }
    284             }
    285             RawEntryMut::Vacant(entry) => {
    286                 let id = NoteId::new(*cur_note.id());
    287                 let parent = new_parent.unwrap_or(ParentState::Unknown);
    288                 let (_, res) = entry.insert(id, ThreadNode::new(parent));
    289 
    290                 if have_all_ancestors {
    291                     res.have_all_ancestors = true;
    292                 }
    293             }
    294         }
    295 
    296         have_all_ancestors
    297     }
    298 }
    299 
    300 enum NextLink<'a> {
    301     Unknown(&'a [u8; 32]),
    302     Next(Note<'a>, NoteKey),
    303     None,
    304 }
    305 
    306 pub fn selected_has_at_least_n_replies(
    307     ndb: &Ndb,
    308     txn: &Transaction,
    309     selected: Option<&[u8; 32]>,
    310     root: &[u8; 32],
    311     n: u8,
    312 ) -> bool {
    313     let filter = if let Some(selected) = selected {
    314         &vec![direct_replies_filter_non_root(selected, root)]
    315     } else {
    316         &vec![direct_replies_filter_root(root)]
    317     };
    318 
    319     let Ok(res) = ndb.query(txn, filter, n as i32) else {
    320         return false;
    321     };
    322 
    323     res.len() >= n.into()
    324 }
    325 
    326 fn direct_replies_filter_non_root(
    327     selected_note_id: &[u8; 32],
    328     root_id: &[u8; 32],
    329 ) -> nostrdb::Filter {
    330     let tmp_selected = *selected_note_id;
    331     nostrdb::Filter::new()
    332         .kinds([1])
    333         .custom(move |note: nostrdb::Note<'_>| {
    334             let reply = nostrdb::NoteReply::new(note.tags());
    335             if reply.is_reply_to_root() {
    336                 return false;
    337             }
    338 
    339             reply.reply().is_some_and(|r| r.id == &tmp_selected)
    340         })
    341         .event(root_id)
    342         .build()
    343 }
    344 
    345 /// Custom filter requirements:
    346 /// - Do NOT capture references (e.g. `*root_id`) inside the closure
    347 /// - Instead, copy values outside and capture them with `move`
    348 ///
    349 /// Incorrect:
    350 ///     .custom(|_| { *root_id })       // ❌
    351 /// Also Incorrect:
    352 ///     .custom(move |_| { *root_id })  // ❌
    353 /// Correct:
    354 ///     let tmp = *root_id;
    355 ///     .custom(move |_| { tmp })       // ✅
    356 fn direct_replies_filter_root(root_id: &[u8; 32]) -> nostrdb::Filter {
    357     let moved_root_id = *root_id;
    358     nostrdb::Filter::new()
    359         .kinds([1])
    360         .custom(move |note: nostrdb::Note<'_>| {
    361             nostrdb::NoteReply::new(note.tags())
    362                 .reply_to_root()
    363                 .is_some_and(|r| r.id == &moved_root_id)
    364         })
    365         .event(root_id)
    366         .build()
    367 }
    368 
    369 fn replies_filter_remote(selection: &ThreadSelection) -> Vec<Filter> {
    370     vec![
    371         nostrdb::Filter::new()
    372             .kinds([1])
    373             .event(selection.root_id.bytes())
    374             .build(),
    375         nostrdb::Filter::new()
    376             .ids([selection.root_id.bytes()])
    377             .limit(1)
    378             .build(),
    379     ]
    380 }
    381 
    382 /// Represents indicators that there is more content in the note to view
    383 #[derive(Default)]
    384 pub struct NoteSeenFlags {
    385     // true indicates the note has replies AND it has not been read
    386     pub flags: HashMap<NoteId, bool>,
    387 }
    388 
    389 impl NoteSeenFlags {
    390     pub fn mark_seen(&mut self, note_id: &[u8; 32]) {
    391         self.flags.insert(NoteId::new(*note_id), false);
    392     }
    393 
    394     pub fn mark_replies(&mut self, note_id: &[u8; 32], has_replies: bool) {
    395         self.flags.insert(NoteId::new(*note_id), has_replies);
    396     }
    397 
    398     pub fn get(&self, note_id: &[u8; 32]) -> Option<&bool> {
    399         self.flags.get(&note_id)
    400     }
    401 
    402     pub fn contains(&self, note_id: &[u8; 32]) -> bool {
    403         self.flags.contains_key(&note_id)
    404     }
    405 }
    406 
    407 #[derive(Default)]
    408 pub struct SingleNoteUnits {
    409     units: NoteUnits,
    410 }
    411 
    412 impl SingleNoteUnits {
    413     pub fn new(reversed: bool) -> Self {
    414         Self {
    415             units: NoteUnits::new_with_cap(0, reversed),
    416         }
    417     }
    418 
    419     pub fn insert(&mut self, note_ref: NoteRef) -> InsertionResponse {
    420         self.units.merge_single_unit(note_ref)
    421     }
    422 
    423     pub fn values(&self) -> impl Iterator<Item = &NoteRef> {
    424         self.units.values().filter_map(|entry| {
    425             if let NoteUnit::Single(note_ref) = entry {
    426                 Some(note_ref)
    427             } else {
    428                 None
    429             }
    430         })
    431     }
    432 
    433     pub fn contains_key(&self, k: &NoteKey) -> bool {
    434         self.units.contains_key(&UnitKey::Single(*k))
    435     }
    436 }