thread.rs (12257B)
1 use egui_nav::ReturnType; 2 use egui_virtual_list::VirtualList; 3 use enostr::{NoteId, RelayPool}; 4 use hashbrown::{hash_map::RawEntryMut, HashMap}; 5 use nostrdb::{Filter, Ndb, Note, NoteKey, NoteReplyBuf, Transaction}; 6 use notedeck::{NoteCache, NoteRef, UnknownIds}; 7 8 use crate::{ 9 actionbar::{process_thread_notes, NewThreadNotes}, 10 multi_subscriber::ThreadSubs, 11 timeline::{ 12 note_units::{NoteUnits, UnitKey}, 13 unit::NoteUnit, 14 InsertionResponse, 15 }, 16 }; 17 18 use super::ThreadSelection; 19 20 pub struct ThreadNode { 21 pub replies: SingleNoteUnits, 22 pub prev: ParentState, 23 pub have_all_ancestors: bool, 24 pub list: VirtualList, 25 pub set_scroll_offset: Option<f32>, 26 } 27 28 #[derive(Clone)] 29 pub enum ParentState { 30 Unknown, 31 None, 32 Parent(NoteId), 33 } 34 35 impl ThreadNode { 36 pub fn new(parent: ParentState) -> Self { 37 Self { 38 replies: SingleNoteUnits::new(true), 39 prev: parent, 40 have_all_ancestors: false, 41 list: VirtualList::new(), 42 set_scroll_offset: None, 43 } 44 } 45 46 pub fn with_offset(mut self, offset: f32) -> Self { 47 self.set_scroll_offset = Some(offset); 48 self 49 } 50 } 51 52 #[derive(Default)] 53 pub struct Threads { 54 pub threads: HashMap<NoteId, ThreadNode>, 55 pub subs: ThreadSubs, 56 57 pub seen_flags: NoteSeenFlags, 58 } 59 60 impl Threads { 61 /// Opening a thread. 62 /// Similar to [[super::cache::TimelineCache::open]] 63 #[allow(clippy::too_many_arguments)] 64 pub fn open( 65 &mut self, 66 ndb: &mut Ndb, 67 txn: &Transaction, 68 pool: &mut RelayPool, 69 thread: &ThreadSelection, 70 new_scope: bool, 71 col: usize, 72 scroll_offset: f32, 73 ) -> Option<NewThreadNotes> { 74 tracing::info!("Opening thread: {:?}", thread); 75 let local_sub_filter = if let Some(selected) = &thread.selected_note { 76 vec![direct_replies_filter_non_root( 77 selected.bytes(), 78 thread.root_id.bytes(), 79 )] 80 } else { 81 vec![direct_replies_filter_root(thread.root_id.bytes())] 82 }; 83 84 let selected_note_id = thread.selected_or_root(); 85 self.seen_flags.mark_seen(selected_note_id); 86 87 let filter = match self.threads.raw_entry_mut().from_key(&selected_note_id) { 88 RawEntryMut::Occupied(_entry) => { 89 // TODO(kernelkind): reenable this once the panic is fixed 90 // 91 // let node = entry.into_mut(); 92 // if let Some(first) = node.replies.first() { 93 // &filter::make_filters_since(&local_sub_filter, first.created_at + 1) 94 // } else { 95 // &local_sub_filter 96 // } 97 &local_sub_filter 98 } 99 RawEntryMut::Vacant(entry) => { 100 let id = NoteId::new(*selected_note_id); 101 102 let node = ThreadNode::new(ParentState::Unknown).with_offset(scroll_offset); 103 entry.insert(id, node); 104 105 &local_sub_filter 106 } 107 }; 108 109 let new_notes = ndb.query(txn, filter, 500).ok().map(|r| { 110 r.into_iter() 111 .map(NoteRef::from_query_result) 112 .collect::<Vec<_>>() 113 }); 114 115 self.subs 116 .subscribe(ndb, pool, col, thread, local_sub_filter, new_scope, || { 117 replies_filter_remote(thread) 118 }); 119 120 new_notes.map(|notes| NewThreadNotes { 121 selected_note_id: NoteId::new(*selected_note_id), 122 notes: notes.into_iter().map(|f| f.key).collect(), 123 }) 124 } 125 126 pub fn close( 127 &mut self, 128 ndb: &mut Ndb, 129 pool: &mut RelayPool, 130 thread: &ThreadSelection, 131 return_type: ReturnType, 132 id: usize, 133 ) { 134 tracing::info!("Closing thread: {:?}", thread); 135 self.subs.unsubscribe(ndb, pool, id, thread, return_type); 136 } 137 138 /// Responsible for making sure the chain and the direct replies are up to date 139 pub fn update( 140 &mut self, 141 selected: &Note<'_>, 142 note_cache: &mut NoteCache, 143 ndb: &Ndb, 144 txn: &Transaction, 145 unknown_ids: &mut UnknownIds, 146 col: usize, 147 ) { 148 let Some(selected_key) = selected.key() else { 149 tracing::error!("Selected note did not have a key"); 150 return; 151 }; 152 153 let reply = note_cache 154 .cached_note_or_insert_mut(selected_key, selected) 155 .reply; 156 157 self.fill_reply_chain_recursive(selected, &reply, note_cache, ndb, txn, unknown_ids); 158 let node = self 159 .threads 160 .get_mut(&selected.id()) 161 .expect("should be guarenteed to exist from `Self::fill_reply_chain_recursive`"); 162 163 let Some(sub) = self.subs.get_local(col) else { 164 tracing::error!("Was expecting to find local sub"); 165 return; 166 }; 167 168 let keys = ndb.poll_for_notes(sub.sub, 10); 169 170 if keys.is_empty() { 171 return; 172 } 173 174 tracing::info!("Got {} new notes", keys.len()); 175 176 process_thread_notes( 177 &keys, 178 node, 179 &mut self.seen_flags, 180 ndb, 181 txn, 182 unknown_ids, 183 note_cache, 184 ); 185 } 186 187 fn fill_reply_chain_recursive( 188 &mut self, 189 cur_note: &Note<'_>, 190 cur_reply: &NoteReplyBuf, 191 note_cache: &mut NoteCache, 192 ndb: &Ndb, 193 txn: &Transaction, 194 unknown_ids: &mut UnknownIds, 195 ) -> bool { 196 let (unknown_parent_state, mut have_all_ancestors) = self 197 .threads 198 .get(&cur_note.id()) 199 .map(|t| (matches!(t.prev, ParentState::Unknown), t.have_all_ancestors)) 200 .unwrap_or((true, false)); 201 202 if have_all_ancestors { 203 return true; 204 } 205 206 let mut new_parent = None; 207 208 let note_reply = cur_reply.borrow(cur_note.tags()); 209 210 let next_link = 's: { 211 let Some(parent) = note_reply.reply() else { 212 break 's NextLink::None; 213 }; 214 215 if unknown_parent_state { 216 new_parent = Some(ParentState::Parent(NoteId::new(*parent.id))); 217 } 218 219 let Ok(reply_note) = ndb.get_note_by_id(txn, parent.id) else { 220 break 's NextLink::Unknown(parent.id); 221 }; 222 223 let Some(notekey) = reply_note.key() else { 224 break 's NextLink::Unknown(parent.id); 225 }; 226 227 NextLink::Next(reply_note, notekey) 228 }; 229 230 match next_link { 231 NextLink::Unknown(parent) => { 232 unknown_ids.add_note_id_if_missing(ndb, txn, parent); 233 } 234 NextLink::Next(next_note, note_key) => { 235 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &next_note); 236 237 let cached_note = note_cache.cached_note_or_insert_mut(note_key, &next_note); 238 239 let next_reply = cached_note.reply; 240 if self.fill_reply_chain_recursive( 241 &next_note, 242 &next_reply, 243 note_cache, 244 ndb, 245 txn, 246 unknown_ids, 247 ) { 248 have_all_ancestors = true; 249 } 250 251 if !self.seen_flags.contains(next_note.id()) { 252 self.seen_flags.mark_replies( 253 next_note.id(), 254 selected_has_at_least_n_replies(ndb, txn, None, next_note.id(), 2), 255 ); 256 } 257 } 258 NextLink::None => { 259 have_all_ancestors = true; 260 new_parent = Some(ParentState::None); 261 } 262 } 263 264 match self.threads.raw_entry_mut().from_key(&cur_note.id()) { 265 RawEntryMut::Occupied(entry) => { 266 let node = entry.into_mut(); 267 if let Some(parent) = new_parent { 268 node.prev = parent; 269 } 270 271 if have_all_ancestors { 272 node.have_all_ancestors = true; 273 } 274 } 275 RawEntryMut::Vacant(entry) => { 276 let id = NoteId::new(*cur_note.id()); 277 let parent = new_parent.unwrap_or(ParentState::Unknown); 278 let (_, res) = entry.insert(id, ThreadNode::new(parent)); 279 280 if have_all_ancestors { 281 res.have_all_ancestors = true; 282 } 283 } 284 } 285 286 have_all_ancestors 287 } 288 } 289 290 enum NextLink<'a> { 291 Unknown(&'a [u8; 32]), 292 Next(Note<'a>, NoteKey), 293 None, 294 } 295 296 pub fn selected_has_at_least_n_replies( 297 ndb: &Ndb, 298 txn: &Transaction, 299 selected: Option<&[u8; 32]>, 300 root: &[u8; 32], 301 n: u8, 302 ) -> bool { 303 let filter = if let Some(selected) = selected { 304 &vec![direct_replies_filter_non_root(selected, root)] 305 } else { 306 &vec![direct_replies_filter_root(root)] 307 }; 308 309 let Ok(res) = ndb.query(txn, filter, n as i32) else { 310 return false; 311 }; 312 313 res.len() >= n.into() 314 } 315 316 fn direct_replies_filter_non_root( 317 selected_note_id: &[u8; 32], 318 root_id: &[u8; 32], 319 ) -> nostrdb::Filter { 320 let tmp_selected = *selected_note_id; 321 nostrdb::Filter::new() 322 .kinds([1]) 323 .custom(move |note: nostrdb::Note<'_>| { 324 let reply = nostrdb::NoteReply::new(note.tags()); 325 if reply.is_reply_to_root() { 326 return false; 327 } 328 329 reply.reply().is_some_and(|r| r.id == &tmp_selected) 330 }) 331 .event(root_id) 332 .build() 333 } 334 335 /// Custom filter requirements: 336 /// - Do NOT capture references (e.g. `*root_id`) inside the closure 337 /// - Instead, copy values outside and capture them with `move` 338 /// 339 /// Incorrect: 340 /// .custom(|_| { *root_id }) // ❌ 341 /// Also Incorrect: 342 /// .custom(move |_| { *root_id }) // ❌ 343 /// Correct: 344 /// let tmp = *root_id; 345 /// .custom(move |_| { tmp }) // ✅ 346 fn direct_replies_filter_root(root_id: &[u8; 32]) -> nostrdb::Filter { 347 let moved_root_id = *root_id; 348 nostrdb::Filter::new() 349 .kinds([1]) 350 .custom(move |note: nostrdb::Note<'_>| { 351 nostrdb::NoteReply::new(note.tags()) 352 .reply_to_root() 353 .is_some_and(|r| r.id == &moved_root_id) 354 }) 355 .event(root_id) 356 .build() 357 } 358 359 fn replies_filter_remote(selection: &ThreadSelection) -> Vec<Filter> { 360 vec![ 361 nostrdb::Filter::new() 362 .kinds([1]) 363 .event(selection.root_id.bytes()) 364 .build(), 365 nostrdb::Filter::new() 366 .ids([selection.root_id.bytes()]) 367 .limit(1) 368 .build(), 369 ] 370 } 371 372 /// Represents indicators that there is more content in the note to view 373 #[derive(Default)] 374 pub struct NoteSeenFlags { 375 // true indicates the note has replies AND it has not been read 376 pub flags: HashMap<NoteId, bool>, 377 } 378 379 impl NoteSeenFlags { 380 pub fn mark_seen(&mut self, note_id: &[u8; 32]) { 381 self.flags.insert(NoteId::new(*note_id), false); 382 } 383 384 pub fn mark_replies(&mut self, note_id: &[u8; 32], has_replies: bool) { 385 self.flags.insert(NoteId::new(*note_id), has_replies); 386 } 387 388 pub fn get(&self, note_id: &[u8; 32]) -> Option<&bool> { 389 self.flags.get(¬e_id) 390 } 391 392 pub fn contains(&self, note_id: &[u8; 32]) -> bool { 393 self.flags.contains_key(¬e_id) 394 } 395 } 396 397 #[derive(Default)] 398 pub struct SingleNoteUnits { 399 units: NoteUnits, 400 } 401 402 impl SingleNoteUnits { 403 pub fn new(reversed: bool) -> Self { 404 Self { 405 units: NoteUnits::new_with_cap(0, reversed), 406 } 407 } 408 409 pub fn insert(&mut self, note_ref: NoteRef) -> InsertionResponse { 410 self.units.merge_single_unit(note_ref) 411 } 412 413 pub fn values(&self) -> impl Iterator<Item = &NoteRef> { 414 self.units.values().filter_map(|entry| { 415 if let NoteUnit::Single(note_ref) = entry { 416 Some(note_ref) 417 } else { 418 None 419 } 420 }) 421 } 422 423 pub fn contains_key(&self, k: &NoteKey) -> bool { 424 self.units.contains_key(&UnitKey::Single(*k)) 425 } 426 }