thread.rs (14453B)
1 use std::{ 2 collections::{BTreeSet, HashSet}, 3 hash::Hash, 4 }; 5 6 use egui_nav::ReturnType; 7 use egui_virtual_list::VirtualList; 8 use enostr::{NoteId, RelayPool}; 9 use hashbrown::{hash_map::RawEntryMut, HashMap}; 10 use nostrdb::{Filter, Ndb, Note, NoteKey, NoteReplyBuf, Transaction}; 11 use notedeck::{NoteCache, NoteRef, UnknownIds}; 12 13 use crate::{ 14 actionbar::{process_thread_notes, NewThreadNotes}, 15 multi_subscriber::ThreadSubs, 16 timeline::MergeKind, 17 }; 18 19 use super::ThreadSelection; 20 21 pub struct ThreadNode { 22 pub replies: HybridSet<NoteRef>, 23 pub prev: ParentState, 24 pub have_all_ancestors: bool, 25 pub list: VirtualList, 26 } 27 28 #[derive(Clone)] 29 pub enum ParentState { 30 Unknown, 31 None, 32 Parent(NoteId), 33 } 34 35 /// Affords: 36 /// - O(1) contains 37 /// - O(log n) sorted insertion 38 pub struct HybridSet<T> { 39 reversed: bool, 40 lookup: HashSet<T>, // fast deduplication 41 ordered: BTreeSet<T>, // sorted iteration 42 } 43 44 impl<T> Default for HybridSet<T> { 45 fn default() -> Self { 46 Self { 47 reversed: Default::default(), 48 lookup: Default::default(), 49 ordered: Default::default(), 50 } 51 } 52 } 53 54 pub enum InsertionResponse { 55 AlreadyExists, 56 Merged(MergeKind), 57 } 58 59 impl<T: Copy + Ord + Eq + Hash> HybridSet<T> { 60 pub fn insert(&mut self, val: T) -> InsertionResponse { 61 if !self.lookup.insert(val) { 62 return InsertionResponse::AlreadyExists; 63 } 64 65 let front_insertion = match self.ordered.iter().next() { 66 Some(first) => (val >= *first) == self.reversed, 67 None => true, 68 }; 69 70 self.ordered.insert(val); // O(log n) 71 72 InsertionResponse::Merged(if front_insertion { 73 MergeKind::FrontInsert 74 } else { 75 MergeKind::Spliced 76 }) 77 } 78 } 79 80 impl<T: Eq + Hash> HybridSet<T> { 81 pub fn contains(&self, val: &T) -> bool { 82 self.lookup.contains(val) // O(1) 83 } 84 } 85 86 impl<T> HybridSet<T> { 87 pub fn iter(&self) -> HybridIter<'_, T> { 88 HybridIter { 89 inner: self.ordered.iter(), 90 reversed: self.reversed, 91 } 92 } 93 94 pub fn new(reversed: bool) -> Self { 95 Self { 96 reversed, 97 ..Default::default() 98 } 99 } 100 } 101 102 impl<'a, T> IntoIterator for &'a HybridSet<T> { 103 type Item = &'a T; 104 type IntoIter = HybridIter<'a, T>; 105 106 fn into_iter(self) -> Self::IntoIter { 107 self.iter() 108 } 109 } 110 111 pub struct HybridIter<'a, T> { 112 inner: std::collections::btree_set::Iter<'a, T>, 113 reversed: bool, 114 } 115 116 impl<'a, T> Iterator for HybridIter<'a, T> { 117 type Item = &'a T; 118 119 fn next(&mut self) -> Option<Self::Item> { 120 if self.reversed { 121 self.inner.next_back() 122 } else { 123 self.inner.next() 124 } 125 } 126 } 127 128 impl ThreadNode { 129 pub fn new(parent: ParentState) -> Self { 130 Self { 131 replies: HybridSet::new(true), 132 prev: parent, 133 have_all_ancestors: false, 134 list: VirtualList::new(), 135 } 136 } 137 } 138 139 #[derive(Default)] 140 pub struct Threads { 141 pub threads: HashMap<NoteId, ThreadNode>, 142 pub subs: ThreadSubs, 143 144 pub seen_flags: NoteSeenFlags, 145 } 146 147 impl Threads { 148 /// Opening a thread. 149 /// Similar to [[super::cache::TimelineCache::open]] 150 pub fn open( 151 &mut self, 152 ndb: &mut Ndb, 153 txn: &Transaction, 154 pool: &mut RelayPool, 155 thread: &ThreadSelection, 156 new_scope: bool, 157 col: usize, 158 ) -> Option<NewThreadNotes> { 159 tracing::info!("Opening thread: {:?}", thread); 160 let local_sub_filter = if let Some(selected) = &thread.selected_note { 161 vec![direct_replies_filter_non_root( 162 selected.bytes(), 163 thread.root_id.bytes(), 164 )] 165 } else { 166 vec![direct_replies_filter_root(thread.root_id.bytes())] 167 }; 168 169 let selected_note_id = thread.selected_or_root(); 170 self.seen_flags.mark_seen(selected_note_id); 171 172 let filter = match self.threads.raw_entry_mut().from_key(&selected_note_id) { 173 RawEntryMut::Occupied(_entry) => { 174 // TODO(kernelkind): reenable this once the panic is fixed 175 // 176 // let node = entry.into_mut(); 177 // if let Some(first) = node.replies.first() { 178 // &filter::make_filters_since(&local_sub_filter, first.created_at + 1) 179 // } else { 180 // &local_sub_filter 181 // } 182 &local_sub_filter 183 } 184 RawEntryMut::Vacant(entry) => { 185 let id = NoteId::new(*selected_note_id); 186 187 let node = ThreadNode::new(ParentState::Unknown); 188 entry.insert(id, node); 189 190 &local_sub_filter 191 } 192 }; 193 194 let new_notes = ndb.query(txn, filter, 500).ok().map(|r| { 195 r.into_iter() 196 .map(NoteRef::from_query_result) 197 .collect::<Vec<_>>() 198 }); 199 200 self.subs 201 .subscribe(ndb, pool, col, thread, local_sub_filter, new_scope, || { 202 replies_filter_remote(thread) 203 }); 204 205 new_notes.map(|notes| NewThreadNotes { 206 selected_note_id: NoteId::new(*selected_note_id), 207 notes: notes.into_iter().map(|f| f.key).collect(), 208 }) 209 } 210 211 pub fn close( 212 &mut self, 213 ndb: &mut Ndb, 214 pool: &mut RelayPool, 215 thread: &ThreadSelection, 216 return_type: ReturnType, 217 id: usize, 218 ) { 219 tracing::info!("Closing thread: {:?}", thread); 220 self.subs.unsubscribe(ndb, pool, id, thread, return_type); 221 } 222 223 /// Responsible for making sure the chain and the direct replies are up to date 224 pub fn update( 225 &mut self, 226 selected: &Note<'_>, 227 note_cache: &mut NoteCache, 228 ndb: &Ndb, 229 txn: &Transaction, 230 unknown_ids: &mut UnknownIds, 231 col: usize, 232 ) { 233 let Some(selected_key) = selected.key() else { 234 tracing::error!("Selected note did not have a key"); 235 return; 236 }; 237 238 let reply = note_cache 239 .cached_note_or_insert_mut(selected_key, selected) 240 .reply; 241 242 self.fill_reply_chain_recursive(selected, &reply, note_cache, ndb, txn, unknown_ids); 243 let node = self 244 .threads 245 .get_mut(&selected.id()) 246 .expect("should be guarenteed to exist from `Self::fill_reply_chain_recursive`"); 247 248 let Some(sub) = self.subs.get_local(col) else { 249 tracing::error!("Was expecting to find local sub"); 250 return; 251 }; 252 253 let keys = ndb.poll_for_notes(sub.sub, 10); 254 255 if keys.is_empty() { 256 return; 257 } 258 259 tracing::info!("Got {} new notes", keys.len()); 260 261 process_thread_notes( 262 &keys, 263 node, 264 &mut self.seen_flags, 265 ndb, 266 txn, 267 unknown_ids, 268 note_cache, 269 ); 270 } 271 272 fn fill_reply_chain_recursive( 273 &mut self, 274 cur_note: &Note<'_>, 275 cur_reply: &NoteReplyBuf, 276 note_cache: &mut NoteCache, 277 ndb: &Ndb, 278 txn: &Transaction, 279 unknown_ids: &mut UnknownIds, 280 ) -> bool { 281 let (unknown_parent_state, mut have_all_ancestors) = self 282 .threads 283 .get(&cur_note.id()) 284 .map(|t| (matches!(t.prev, ParentState::Unknown), t.have_all_ancestors)) 285 .unwrap_or((true, false)); 286 287 if have_all_ancestors { 288 return true; 289 } 290 291 let mut new_parent = None; 292 293 let note_reply = cur_reply.borrow(cur_note.tags()); 294 295 let next_link = 's: { 296 let Some(parent) = note_reply.reply() else { 297 break 's NextLink::None; 298 }; 299 300 if unknown_parent_state { 301 new_parent = Some(ParentState::Parent(NoteId::new(*parent.id))); 302 } 303 304 let Ok(reply_note) = ndb.get_note_by_id(txn, parent.id) else { 305 break 's NextLink::Unknown(parent.id); 306 }; 307 308 let Some(notekey) = reply_note.key() else { 309 break 's NextLink::Unknown(parent.id); 310 }; 311 312 NextLink::Next(reply_note, notekey) 313 }; 314 315 match next_link { 316 NextLink::Unknown(parent) => { 317 unknown_ids.add_note_id_if_missing(ndb, txn, parent); 318 } 319 NextLink::Next(next_note, note_key) => { 320 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &next_note); 321 322 let cached_note = note_cache.cached_note_or_insert_mut(note_key, &next_note); 323 324 let next_reply = cached_note.reply; 325 if self.fill_reply_chain_recursive( 326 &next_note, 327 &next_reply, 328 note_cache, 329 ndb, 330 txn, 331 unknown_ids, 332 ) { 333 have_all_ancestors = true; 334 } 335 336 if !self.seen_flags.contains(next_note.id()) { 337 self.seen_flags.mark_replies( 338 next_note.id(), 339 selected_has_at_least_n_replies(ndb, txn, None, next_note.id(), 2), 340 ); 341 } 342 } 343 NextLink::None => { 344 have_all_ancestors = true; 345 new_parent = Some(ParentState::None); 346 } 347 } 348 349 match self.threads.raw_entry_mut().from_key(&cur_note.id()) { 350 RawEntryMut::Occupied(entry) => { 351 let node = entry.into_mut(); 352 if let Some(parent) = new_parent { 353 node.prev = parent; 354 } 355 356 if have_all_ancestors { 357 node.have_all_ancestors = true; 358 } 359 } 360 RawEntryMut::Vacant(entry) => { 361 let id = NoteId::new(*cur_note.id()); 362 let parent = new_parent.unwrap_or(ParentState::Unknown); 363 let (_, res) = entry.insert(id, ThreadNode::new(parent)); 364 365 if have_all_ancestors { 366 res.have_all_ancestors = true; 367 } 368 } 369 } 370 371 have_all_ancestors 372 } 373 } 374 375 enum NextLink<'a> { 376 Unknown(&'a [u8; 32]), 377 Next(Note<'a>, NoteKey), 378 None, 379 } 380 381 pub fn selected_has_at_least_n_replies( 382 ndb: &Ndb, 383 txn: &Transaction, 384 selected: Option<&[u8; 32]>, 385 root: &[u8; 32], 386 n: u8, 387 ) -> bool { 388 let filter = if let Some(selected) = selected { 389 &vec![direct_replies_filter_non_root(selected, root)] 390 } else { 391 &vec![direct_replies_filter_root(root)] 392 }; 393 394 let Ok(res) = ndb.query(txn, filter, n as i32) else { 395 return false; 396 }; 397 398 res.len() >= n.into() 399 } 400 401 fn direct_replies_filter_non_root( 402 selected_note_id: &[u8; 32], 403 root_id: &[u8; 32], 404 ) -> nostrdb::Filter { 405 let tmp_selected = *selected_note_id; 406 nostrdb::Filter::new() 407 .kinds([1]) 408 .custom(move |n: nostrdb::Note<'_>| { 409 for tag in n.tags() { 410 if tag.count() < 4 { 411 continue; 412 } 413 414 let Some("e") = tag.get_str(0) else { 415 continue; 416 }; 417 418 let Some(tagged_id) = tag.get_id(1) else { 419 continue; 420 }; 421 422 if *tagged_id != tmp_selected { 423 // NOTE: if these aren't dereferenced a segfault occurs... 424 continue; 425 } 426 427 if let Some(data) = tag.get_str(3) { 428 if data == "reply" { 429 return true; 430 } 431 } 432 } 433 false 434 }) 435 .event(root_id) 436 .build() 437 } 438 439 /// Custom filter requirements: 440 /// - Do NOT capture references (e.g. `*root_id`) inside the closure 441 /// - Instead, copy values outside and capture them with `move` 442 /// 443 /// Incorrect: 444 /// .custom(|_| { *root_id }) // ❌ 445 /// Also Incorrect: 446 /// .custom(move |_| { *root_id }) // ❌ 447 /// Correct: 448 /// let tmp = *root_id; 449 /// .custom(move |_| { tmp }) // ✅ 450 fn direct_replies_filter_root(root_id: &[u8; 32]) -> nostrdb::Filter { 451 let tmp_root = *root_id; 452 nostrdb::Filter::new() 453 .kinds([1]) 454 .custom(move |n: nostrdb::Note<'_>| { 455 let mut contains_root = false; 456 for tag in n.tags() { 457 if tag.count() < 4 { 458 continue; 459 } 460 461 let Some("e") = tag.get_str(0) else { 462 continue; 463 }; 464 465 if let Some(s) = tag.get_str(3) { 466 if s == "reply" { 467 return false; 468 } 469 } 470 471 let Some(tagged_id) = tag.get_id(1) else { 472 continue; 473 }; 474 475 if *tagged_id != tmp_root { 476 continue; 477 } 478 479 if let Some(s) = tag.get_str(3) { 480 if s == "root" { 481 contains_root = true; 482 } 483 } 484 } 485 486 contains_root 487 }) 488 .event(root_id) 489 .build() 490 } 491 492 fn replies_filter_remote(selection: &ThreadSelection) -> Vec<Filter> { 493 vec![ 494 nostrdb::Filter::new() 495 .kinds([1]) 496 .event(selection.root_id.bytes()) 497 .build(), 498 nostrdb::Filter::new() 499 .ids([selection.root_id.bytes()]) 500 .limit(1) 501 .build(), 502 ] 503 } 504 505 /// Represents indicators that there is more content in the note to view 506 #[derive(Default)] 507 pub struct NoteSeenFlags { 508 // true indicates the note has replies AND it has not been read 509 pub flags: HashMap<NoteId, bool>, 510 } 511 512 impl NoteSeenFlags { 513 pub fn mark_seen(&mut self, note_id: &[u8; 32]) { 514 self.flags.insert(NoteId::new(*note_id), false); 515 } 516 517 pub fn mark_replies(&mut self, note_id: &[u8; 32], has_replies: bool) { 518 self.flags.insert(NoteId::new(*note_id), has_replies); 519 } 520 521 pub fn get(&self, note_id: &[u8; 32]) -> Option<&bool> { 522 self.flags.get(¬e_id) 523 } 524 525 pub fn contains(&self, note_id: &[u8; 32]) -> bool { 526 self.flags.contains_key(¬e_id) 527 } 528 }