notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

thread.rs (12257B)


      1 use egui_nav::ReturnType;
      2 use egui_virtual_list::VirtualList;
      3 use enostr::{NoteId, RelayPool};
      4 use hashbrown::{hash_map::RawEntryMut, HashMap};
      5 use nostrdb::{Filter, Ndb, Note, NoteKey, NoteReplyBuf, Transaction};
      6 use notedeck::{NoteCache, NoteRef, UnknownIds};
      7 
      8 use crate::{
      9     actionbar::{process_thread_notes, NewThreadNotes},
     10     multi_subscriber::ThreadSubs,
     11     timeline::{
     12         note_units::{NoteUnits, UnitKey},
     13         unit::NoteUnit,
     14         InsertionResponse,
     15     },
     16 };
     17 
     18 use super::ThreadSelection;
     19 
     20 pub struct ThreadNode {
     21     pub replies: SingleNoteUnits,
     22     pub prev: ParentState,
     23     pub have_all_ancestors: bool,
     24     pub list: VirtualList,
     25     pub set_scroll_offset: Option<f32>,
     26 }
     27 
     28 #[derive(Clone)]
     29 pub enum ParentState {
     30     Unknown,
     31     None,
     32     Parent(NoteId),
     33 }
     34 
     35 impl ThreadNode {
     36     pub fn new(parent: ParentState) -> Self {
     37         Self {
     38             replies: SingleNoteUnits::new(true),
     39             prev: parent,
     40             have_all_ancestors: false,
     41             list: VirtualList::new(),
     42             set_scroll_offset: None,
     43         }
     44     }
     45 
     46     pub fn with_offset(mut self, offset: f32) -> Self {
     47         self.set_scroll_offset = Some(offset);
     48         self
     49     }
     50 }
     51 
     52 #[derive(Default)]
     53 pub struct Threads {
     54     pub threads: HashMap<NoteId, ThreadNode>,
     55     pub subs: ThreadSubs,
     56 
     57     pub seen_flags: NoteSeenFlags,
     58 }
     59 
     60 impl Threads {
     61     /// Opening a thread.
     62     /// Similar to [[super::cache::TimelineCache::open]]
     63     #[allow(clippy::too_many_arguments)]
     64     pub fn open(
     65         &mut self,
     66         ndb: &mut Ndb,
     67         txn: &Transaction,
     68         pool: &mut RelayPool,
     69         thread: &ThreadSelection,
     70         new_scope: bool,
     71         col: usize,
     72         scroll_offset: f32,
     73     ) -> Option<NewThreadNotes> {
     74         tracing::info!("Opening thread: {:?}", thread);
     75         let local_sub_filter = if let Some(selected) = &thread.selected_note {
     76             vec![direct_replies_filter_non_root(
     77                 selected.bytes(),
     78                 thread.root_id.bytes(),
     79             )]
     80         } else {
     81             vec![direct_replies_filter_root(thread.root_id.bytes())]
     82         };
     83 
     84         let selected_note_id = thread.selected_or_root();
     85         self.seen_flags.mark_seen(selected_note_id);
     86 
     87         let filter = match self.threads.raw_entry_mut().from_key(&selected_note_id) {
     88             RawEntryMut::Occupied(_entry) => {
     89                 // TODO(kernelkind): reenable this once the panic is fixed
     90                 //
     91                 // let node = entry.into_mut();
     92                 // if let Some(first) = node.replies.first() {
     93                 //     &filter::make_filters_since(&local_sub_filter, first.created_at + 1)
     94                 // } else {
     95                 //     &local_sub_filter
     96                 // }
     97                 &local_sub_filter
     98             }
     99             RawEntryMut::Vacant(entry) => {
    100                 let id = NoteId::new(*selected_note_id);
    101 
    102                 let node = ThreadNode::new(ParentState::Unknown).with_offset(scroll_offset);
    103                 entry.insert(id, node);
    104 
    105                 &local_sub_filter
    106             }
    107         };
    108 
    109         let new_notes = ndb.query(txn, filter, 500).ok().map(|r| {
    110             r.into_iter()
    111                 .map(NoteRef::from_query_result)
    112                 .collect::<Vec<_>>()
    113         });
    114 
    115         self.subs
    116             .subscribe(ndb, pool, col, thread, local_sub_filter, new_scope, || {
    117                 replies_filter_remote(thread)
    118             });
    119 
    120         new_notes.map(|notes| NewThreadNotes {
    121             selected_note_id: NoteId::new(*selected_note_id),
    122             notes: notes.into_iter().map(|f| f.key).collect(),
    123         })
    124     }
    125 
    126     pub fn close(
    127         &mut self,
    128         ndb: &mut Ndb,
    129         pool: &mut RelayPool,
    130         thread: &ThreadSelection,
    131         return_type: ReturnType,
    132         id: usize,
    133     ) {
    134         tracing::info!("Closing thread: {:?}", thread);
    135         self.subs.unsubscribe(ndb, pool, id, thread, return_type);
    136     }
    137 
    138     /// Responsible for making sure the chain and the direct replies are up to date
    139     pub fn update(
    140         &mut self,
    141         selected: &Note<'_>,
    142         note_cache: &mut NoteCache,
    143         ndb: &Ndb,
    144         txn: &Transaction,
    145         unknown_ids: &mut UnknownIds,
    146         col: usize,
    147     ) {
    148         let Some(selected_key) = selected.key() else {
    149             tracing::error!("Selected note did not have a key");
    150             return;
    151         };
    152 
    153         let reply = note_cache
    154             .cached_note_or_insert_mut(selected_key, selected)
    155             .reply;
    156 
    157         self.fill_reply_chain_recursive(selected, &reply, note_cache, ndb, txn, unknown_ids);
    158         let node = self
    159             .threads
    160             .get_mut(&selected.id())
    161             .expect("should be guarenteed to exist from `Self::fill_reply_chain_recursive`");
    162 
    163         let Some(sub) = self.subs.get_local(col) else {
    164             tracing::error!("Was expecting to find local sub");
    165             return;
    166         };
    167 
    168         let keys = ndb.poll_for_notes(sub.sub, 10);
    169 
    170         if keys.is_empty() {
    171             return;
    172         }
    173 
    174         tracing::info!("Got {} new notes", keys.len());
    175 
    176         process_thread_notes(
    177             &keys,
    178             node,
    179             &mut self.seen_flags,
    180             ndb,
    181             txn,
    182             unknown_ids,
    183             note_cache,
    184         );
    185     }
    186 
    187     fn fill_reply_chain_recursive(
    188         &mut self,
    189         cur_note: &Note<'_>,
    190         cur_reply: &NoteReplyBuf,
    191         note_cache: &mut NoteCache,
    192         ndb: &Ndb,
    193         txn: &Transaction,
    194         unknown_ids: &mut UnknownIds,
    195     ) -> bool {
    196         let (unknown_parent_state, mut have_all_ancestors) = self
    197             .threads
    198             .get(&cur_note.id())
    199             .map(|t| (matches!(t.prev, ParentState::Unknown), t.have_all_ancestors))
    200             .unwrap_or((true, false));
    201 
    202         if have_all_ancestors {
    203             return true;
    204         }
    205 
    206         let mut new_parent = None;
    207 
    208         let note_reply = cur_reply.borrow(cur_note.tags());
    209 
    210         let next_link = 's: {
    211             let Some(parent) = note_reply.reply() else {
    212                 break 's NextLink::None;
    213             };
    214 
    215             if unknown_parent_state {
    216                 new_parent = Some(ParentState::Parent(NoteId::new(*parent.id)));
    217             }
    218 
    219             let Ok(reply_note) = ndb.get_note_by_id(txn, parent.id) else {
    220                 break 's NextLink::Unknown(parent.id);
    221             };
    222 
    223             let Some(notekey) = reply_note.key() else {
    224                 break 's NextLink::Unknown(parent.id);
    225             };
    226 
    227             NextLink::Next(reply_note, notekey)
    228         };
    229 
    230         match next_link {
    231             NextLink::Unknown(parent) => {
    232                 unknown_ids.add_note_id_if_missing(ndb, txn, parent);
    233             }
    234             NextLink::Next(next_note, note_key) => {
    235                 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &next_note);
    236 
    237                 let cached_note = note_cache.cached_note_or_insert_mut(note_key, &next_note);
    238 
    239                 let next_reply = cached_note.reply;
    240                 if self.fill_reply_chain_recursive(
    241                     &next_note,
    242                     &next_reply,
    243                     note_cache,
    244                     ndb,
    245                     txn,
    246                     unknown_ids,
    247                 ) {
    248                     have_all_ancestors = true;
    249                 }
    250 
    251                 if !self.seen_flags.contains(next_note.id()) {
    252                     self.seen_flags.mark_replies(
    253                         next_note.id(),
    254                         selected_has_at_least_n_replies(ndb, txn, None, next_note.id(), 2),
    255                     );
    256                 }
    257             }
    258             NextLink::None => {
    259                 have_all_ancestors = true;
    260                 new_parent = Some(ParentState::None);
    261             }
    262         }
    263 
    264         match self.threads.raw_entry_mut().from_key(&cur_note.id()) {
    265             RawEntryMut::Occupied(entry) => {
    266                 let node = entry.into_mut();
    267                 if let Some(parent) = new_parent {
    268                     node.prev = parent;
    269                 }
    270 
    271                 if have_all_ancestors {
    272                     node.have_all_ancestors = true;
    273                 }
    274             }
    275             RawEntryMut::Vacant(entry) => {
    276                 let id = NoteId::new(*cur_note.id());
    277                 let parent = new_parent.unwrap_or(ParentState::Unknown);
    278                 let (_, res) = entry.insert(id, ThreadNode::new(parent));
    279 
    280                 if have_all_ancestors {
    281                     res.have_all_ancestors = true;
    282                 }
    283             }
    284         }
    285 
    286         have_all_ancestors
    287     }
    288 }
    289 
    290 enum NextLink<'a> {
    291     Unknown(&'a [u8; 32]),
    292     Next(Note<'a>, NoteKey),
    293     None,
    294 }
    295 
    296 pub fn selected_has_at_least_n_replies(
    297     ndb: &Ndb,
    298     txn: &Transaction,
    299     selected: Option<&[u8; 32]>,
    300     root: &[u8; 32],
    301     n: u8,
    302 ) -> bool {
    303     let filter = if let Some(selected) = selected {
    304         &vec![direct_replies_filter_non_root(selected, root)]
    305     } else {
    306         &vec![direct_replies_filter_root(root)]
    307     };
    308 
    309     let Ok(res) = ndb.query(txn, filter, n as i32) else {
    310         return false;
    311     };
    312 
    313     res.len() >= n.into()
    314 }
    315 
    316 fn direct_replies_filter_non_root(
    317     selected_note_id: &[u8; 32],
    318     root_id: &[u8; 32],
    319 ) -> nostrdb::Filter {
    320     let tmp_selected = *selected_note_id;
    321     nostrdb::Filter::new()
    322         .kinds([1])
    323         .custom(move |note: nostrdb::Note<'_>| {
    324             let reply = nostrdb::NoteReply::new(note.tags());
    325             if reply.is_reply_to_root() {
    326                 return false;
    327             }
    328 
    329             reply.reply().is_some_and(|r| r.id == &tmp_selected)
    330         })
    331         .event(root_id)
    332         .build()
    333 }
    334 
    335 /// Custom filter requirements:
    336 /// - Do NOT capture references (e.g. `*root_id`) inside the closure
    337 /// - Instead, copy values outside and capture them with `move`
    338 ///
    339 /// Incorrect:
    340 ///     .custom(|_| { *root_id })       // ❌
    341 /// Also Incorrect:
    342 ///     .custom(move |_| { *root_id })  // ❌
    343 /// Correct:
    344 ///     let tmp = *root_id;
    345 ///     .custom(move |_| { tmp })       // ✅
    346 fn direct_replies_filter_root(root_id: &[u8; 32]) -> nostrdb::Filter {
    347     let moved_root_id = *root_id;
    348     nostrdb::Filter::new()
    349         .kinds([1])
    350         .custom(move |note: nostrdb::Note<'_>| {
    351             nostrdb::NoteReply::new(note.tags())
    352                 .reply_to_root()
    353                 .is_some_and(|r| r.id == &moved_root_id)
    354         })
    355         .event(root_id)
    356         .build()
    357 }
    358 
    359 fn replies_filter_remote(selection: &ThreadSelection) -> Vec<Filter> {
    360     vec![
    361         nostrdb::Filter::new()
    362             .kinds([1])
    363             .event(selection.root_id.bytes())
    364             .build(),
    365         nostrdb::Filter::new()
    366             .ids([selection.root_id.bytes()])
    367             .limit(1)
    368             .build(),
    369     ]
    370 }
    371 
    372 /// Represents indicators that there is more content in the note to view
    373 #[derive(Default)]
    374 pub struct NoteSeenFlags {
    375     // true indicates the note has replies AND it has not been read
    376     pub flags: HashMap<NoteId, bool>,
    377 }
    378 
    379 impl NoteSeenFlags {
    380     pub fn mark_seen(&mut self, note_id: &[u8; 32]) {
    381         self.flags.insert(NoteId::new(*note_id), false);
    382     }
    383 
    384     pub fn mark_replies(&mut self, note_id: &[u8; 32], has_replies: bool) {
    385         self.flags.insert(NoteId::new(*note_id), has_replies);
    386     }
    387 
    388     pub fn get(&self, note_id: &[u8; 32]) -> Option<&bool> {
    389         self.flags.get(&note_id)
    390     }
    391 
    392     pub fn contains(&self, note_id: &[u8; 32]) -> bool {
    393         self.flags.contains_key(&note_id)
    394     }
    395 }
    396 
    397 #[derive(Default)]
    398 pub struct SingleNoteUnits {
    399     units: NoteUnits,
    400 }
    401 
    402 impl SingleNoteUnits {
    403     pub fn new(reversed: bool) -> Self {
    404         Self {
    405             units: NoteUnits::new_with_cap(0, reversed),
    406         }
    407     }
    408 
    409     pub fn insert(&mut self, note_ref: NoteRef) -> InsertionResponse {
    410         self.units.merge_single_unit(note_ref)
    411     }
    412 
    413     pub fn values(&self) -> impl Iterator<Item = &NoteRef> {
    414         self.units.values().filter_map(|entry| {
    415             if let NoteUnit::Single(note_ref) = entry {
    416                 Some(note_ref)
    417             } else {
    418                 None
    419             }
    420         })
    421     }
    422 
    423     pub fn contains_key(&self, k: &NoteKey) -> bool {
    424         self.units.contains_key(&UnitKey::Single(*k))
    425     }
    426 }