thread.rs (12513B)
1 use egui_nav::ReturnType; 2 use egui_virtual_list::VirtualList; 3 use enostr::NoteId; 4 use hashbrown::{hash_map::RawEntryMut, HashMap}; 5 use nostrdb::{Filter, Ndb, Note, NoteKey, NoteReplyBuf, Transaction}; 6 use notedeck::{Accounts, NoteCache, NoteRef, ScopedSubApi, UnknownIds}; 7 8 use crate::{ 9 actionbar::{process_thread_notes, NewThreadNotes}, 10 timeline::{ 11 note_units::{NoteUnits, UnitKey}, 12 sub::ThreadSubs, 13 unit::NoteUnit, 14 InsertionResponse, 15 }, 16 }; 17 18 use super::ThreadSelection; 19 20 pub struct ThreadNode { 21 pub replies: SingleNoteUnits, 22 pub prev: ParentState, 23 pub have_all_ancestors: bool, 24 pub list: VirtualList, 25 pub set_scroll_offset: Option<f32>, 26 } 27 28 #[derive(Clone)] 29 pub enum ParentState { 30 Unknown, 31 None, 32 Parent(NoteId), 33 } 34 35 impl ThreadNode { 36 pub fn new(parent: ParentState) -> Self { 37 Self { 38 replies: SingleNoteUnits::new(true), 39 prev: parent, 40 have_all_ancestors: false, 41 list: VirtualList::new(), 42 set_scroll_offset: None, 43 } 44 } 45 46 pub fn with_offset(mut self, offset: f32) -> Self { 47 self.set_scroll_offset = Some(offset); 48 self 49 } 50 } 51 52 #[derive(Default)] 53 pub struct Threads { 54 pub threads: HashMap<NoteId, ThreadNode>, 55 pub subs: ThreadSubs, 56 57 pub seen_flags: NoteSeenFlags, 58 } 59 60 impl Threads { 61 /// Opening a thread. 62 /// Similar to [[super::cache::TimelineCache::open]] 63 #[allow(clippy::too_many_arguments)] 64 #[profiling::function] 65 pub fn open( 66 &mut self, 67 ndb: &mut Ndb, 68 txn: &Transaction, 69 scoped_subs: &mut ScopedSubApi<'_, '_>, 70 thread: &ThreadSelection, 71 new_scope: bool, 72 col: usize, 73 scroll_offset: f32, 74 ) -> Option<NewThreadNotes> { 75 tracing::info!("Opening thread: {:?}", thread); 76 let local_sub_filter = if let Some(selected) = &thread.selected_note { 77 vec![direct_replies_filter_non_root( 78 selected.bytes(), 79 thread.root_id.bytes(), 80 )] 81 } else { 82 vec![direct_replies_filter_root(thread.root_id.bytes())] 83 }; 84 85 let selected_note_id = thread.selected_or_root(); 86 self.seen_flags.mark_seen(selected_note_id); 87 88 let filter = match self.threads.raw_entry_mut().from_key(&selected_note_id) { 89 RawEntryMut::Occupied(_entry) => { 90 // TODO(kernelkind): reenable this once the panic is fixed 91 // 92 // let node = entry.into_mut(); 93 // if let Some(first) = node.replies.first() { 94 // &filter::make_filters_since(&local_sub_filter, first.created_at + 1) 95 // } else { 96 // &local_sub_filter 97 // } 98 &local_sub_filter 99 } 100 RawEntryMut::Vacant(entry) => { 101 let id = NoteId::new(*selected_note_id); 102 103 let node = ThreadNode::new(ParentState::Unknown).with_offset(scroll_offset); 104 entry.insert(id, node); 105 106 &local_sub_filter 107 } 108 }; 109 110 let new_notes = ndb.query(txn, filter, 500).ok().map(|r| { 111 r.into_iter() 112 .map(NoteRef::from_query_result) 113 .collect::<Vec<_>>() 114 }); 115 116 self.subs.subscribe( 117 ndb, 118 scoped_subs, 119 col, 120 thread, 121 local_sub_filter, 122 new_scope, 123 replies_filter_remote(thread), 124 ); 125 126 new_notes.map(|notes| NewThreadNotes { 127 selected_note_id: NoteId::new(*selected_note_id), 128 notes: notes.into_iter().map(|f| f.key).collect(), 129 }) 130 } 131 132 pub fn close( 133 &mut self, 134 ndb: &mut Ndb, 135 scoped_subs: &mut ScopedSubApi<'_, '_>, 136 thread: &ThreadSelection, 137 return_type: ReturnType, 138 id: usize, 139 ) { 140 tracing::info!("Closing thread: {:?}", thread); 141 self.subs 142 .unsubscribe(ndb, scoped_subs, id, thread, return_type); 143 } 144 145 /// Responsible for making sure the chain and the direct replies are up to date 146 #[allow(clippy::too_many_arguments)] 147 #[profiling::function] 148 pub fn update( 149 &mut self, 150 selected: &Note<'_>, 151 note_cache: &mut NoteCache, 152 ndb: &Ndb, 153 txn: &Transaction, 154 unknown_ids: &mut UnknownIds, 155 accounts: &Accounts, 156 col: usize, 157 ) { 158 let Some(selected_key) = selected.key() else { 159 tracing::error!("Selected note did not have a key"); 160 return; 161 }; 162 163 let reply = note_cache 164 .cached_note_or_insert_mut(selected_key, selected) 165 .reply; 166 167 self.fill_reply_chain_recursive(selected, &reply, note_cache, ndb, txn, unknown_ids); 168 let node = self 169 .threads 170 .get_mut(&selected.id()) 171 .expect("should be guarenteed to exist from `Self::fill_reply_chain_recursive`"); 172 173 let Some(sub) = self.subs.get_local_for_selected(accounts, col) else { 174 tracing::error!("Was expecting to find local sub"); 175 return; 176 }; 177 178 let keys = ndb.poll_for_notes(*sub, 10); 179 180 if keys.is_empty() { 181 return; 182 } 183 184 tracing::info!("Got {} new notes", keys.len()); 185 186 process_thread_notes( 187 &keys, 188 node, 189 &mut self.seen_flags, 190 ndb, 191 txn, 192 unknown_ids, 193 note_cache, 194 ); 195 } 196 197 fn fill_reply_chain_recursive( 198 &mut self, 199 cur_note: &Note<'_>, 200 cur_reply: &NoteReplyBuf, 201 note_cache: &mut NoteCache, 202 ndb: &Ndb, 203 txn: &Transaction, 204 unknown_ids: &mut UnknownIds, 205 ) -> bool { 206 let (unknown_parent_state, mut have_all_ancestors) = self 207 .threads 208 .get(&cur_note.id()) 209 .map(|t| (matches!(t.prev, ParentState::Unknown), t.have_all_ancestors)) 210 .unwrap_or((true, false)); 211 212 if have_all_ancestors { 213 return true; 214 } 215 216 let mut new_parent = None; 217 218 let note_reply = cur_reply.borrow(cur_note.tags()); 219 220 let next_link = 's: { 221 let Some(parent) = note_reply.reply() else { 222 break 's NextLink::None; 223 }; 224 225 if unknown_parent_state { 226 new_parent = Some(ParentState::Parent(NoteId::new(*parent.id))); 227 } 228 229 let Ok(reply_note) = ndb.get_note_by_id(txn, parent.id) else { 230 break 's NextLink::Unknown(parent.id); 231 }; 232 233 let Some(notekey) = reply_note.key() else { 234 break 's NextLink::Unknown(parent.id); 235 }; 236 237 NextLink::Next(reply_note, notekey) 238 }; 239 240 match next_link { 241 NextLink::Unknown(parent) => { 242 unknown_ids.add_note_id_if_missing(ndb, txn, parent); 243 } 244 NextLink::Next(next_note, note_key) => { 245 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &next_note); 246 247 let cached_note = note_cache.cached_note_or_insert_mut(note_key, &next_note); 248 249 let next_reply = cached_note.reply; 250 if self.fill_reply_chain_recursive( 251 &next_note, 252 &next_reply, 253 note_cache, 254 ndb, 255 txn, 256 unknown_ids, 257 ) { 258 have_all_ancestors = true; 259 } 260 261 if !self.seen_flags.contains(next_note.id()) { 262 self.seen_flags.mark_replies( 263 next_note.id(), 264 selected_has_at_least_n_replies(ndb, txn, None, next_note.id(), 2), 265 ); 266 } 267 } 268 NextLink::None => { 269 have_all_ancestors = true; 270 new_parent = Some(ParentState::None); 271 } 272 } 273 274 match self.threads.raw_entry_mut().from_key(&cur_note.id()) { 275 RawEntryMut::Occupied(entry) => { 276 let node = entry.into_mut(); 277 if let Some(parent) = new_parent { 278 node.prev = parent; 279 } 280 281 if have_all_ancestors { 282 node.have_all_ancestors = true; 283 } 284 } 285 RawEntryMut::Vacant(entry) => { 286 let id = NoteId::new(*cur_note.id()); 287 let parent = new_parent.unwrap_or(ParentState::Unknown); 288 let (_, res) = entry.insert(id, ThreadNode::new(parent)); 289 290 if have_all_ancestors { 291 res.have_all_ancestors = true; 292 } 293 } 294 } 295 296 have_all_ancestors 297 } 298 } 299 300 enum NextLink<'a> { 301 Unknown(&'a [u8; 32]), 302 Next(Note<'a>, NoteKey), 303 None, 304 } 305 306 pub fn selected_has_at_least_n_replies( 307 ndb: &Ndb, 308 txn: &Transaction, 309 selected: Option<&[u8; 32]>, 310 root: &[u8; 32], 311 n: u8, 312 ) -> bool { 313 let filter = if let Some(selected) = selected { 314 &vec![direct_replies_filter_non_root(selected, root)] 315 } else { 316 &vec![direct_replies_filter_root(root)] 317 }; 318 319 let Ok(res) = ndb.query(txn, filter, n as i32) else { 320 return false; 321 }; 322 323 res.len() >= n.into() 324 } 325 326 fn direct_replies_filter_non_root( 327 selected_note_id: &[u8; 32], 328 root_id: &[u8; 32], 329 ) -> nostrdb::Filter { 330 let tmp_selected = *selected_note_id; 331 nostrdb::Filter::new() 332 .kinds([1]) 333 .custom(move |note: nostrdb::Note<'_>| { 334 let reply = nostrdb::NoteReply::new(note.tags()); 335 if reply.is_reply_to_root() { 336 return false; 337 } 338 339 reply.reply().is_some_and(|r| r.id == &tmp_selected) 340 }) 341 .event(root_id) 342 .build() 343 } 344 345 /// Custom filter requirements: 346 /// - Do NOT capture references (e.g. `*root_id`) inside the closure 347 /// - Instead, copy values outside and capture them with `move` 348 /// 349 /// Incorrect: 350 /// .custom(|_| { *root_id }) // ❌ 351 /// Also Incorrect: 352 /// .custom(move |_| { *root_id }) // ❌ 353 /// Correct: 354 /// let tmp = *root_id; 355 /// .custom(move |_| { tmp }) // ✅ 356 fn direct_replies_filter_root(root_id: &[u8; 32]) -> nostrdb::Filter { 357 let moved_root_id = *root_id; 358 nostrdb::Filter::new() 359 .kinds([1]) 360 .custom(move |note: nostrdb::Note<'_>| { 361 nostrdb::NoteReply::new(note.tags()) 362 .reply_to_root() 363 .is_some_and(|r| r.id == &moved_root_id) 364 }) 365 .event(root_id) 366 .build() 367 } 368 369 fn replies_filter_remote(selection: &ThreadSelection) -> Vec<Filter> { 370 vec![ 371 nostrdb::Filter::new() 372 .kinds([1]) 373 .event(selection.root_id.bytes()) 374 .build(), 375 nostrdb::Filter::new() 376 .ids([selection.root_id.bytes()]) 377 .limit(1) 378 .build(), 379 ] 380 } 381 382 /// Represents indicators that there is more content in the note to view 383 #[derive(Default)] 384 pub struct NoteSeenFlags { 385 // true indicates the note has replies AND it has not been read 386 pub flags: HashMap<NoteId, bool>, 387 } 388 389 impl NoteSeenFlags { 390 pub fn mark_seen(&mut self, note_id: &[u8; 32]) { 391 self.flags.insert(NoteId::new(*note_id), false); 392 } 393 394 pub fn mark_replies(&mut self, note_id: &[u8; 32], has_replies: bool) { 395 self.flags.insert(NoteId::new(*note_id), has_replies); 396 } 397 398 pub fn get(&self, note_id: &[u8; 32]) -> Option<&bool> { 399 self.flags.get(¬e_id) 400 } 401 402 pub fn contains(&self, note_id: &[u8; 32]) -> bool { 403 self.flags.contains_key(¬e_id) 404 } 405 } 406 407 #[derive(Default)] 408 pub struct SingleNoteUnits { 409 units: NoteUnits, 410 } 411 412 impl SingleNoteUnits { 413 pub fn new(reversed: bool) -> Self { 414 Self { 415 units: NoteUnits::new_with_cap(0, reversed), 416 } 417 } 418 419 pub fn insert(&mut self, note_ref: NoteRef) -> InsertionResponse { 420 self.units.merge_single_unit(note_ref) 421 } 422 423 pub fn values(&self) -> impl Iterator<Item = &NoteRef> { 424 self.units.values().filter_map(|entry| { 425 if let NoteUnit::Single(note_ref) = entry { 426 Some(note_ref) 427 } else { 428 None 429 } 430 }) 431 } 432 433 pub fn contains_key(&self, k: &NoteKey) -> bool { 434 self.units.contains_key(&UnitKey::Single(*k)) 435 } 436 }