mod.rs (22934B)
1 use crate::{ 2 column::Columns, 3 decks::DecksCache, 4 error::Error, 5 subscriptions::{self, SubKind, Subscriptions}, 6 thread::Thread, 7 Result, 8 }; 9 10 use notedeck::{ 11 filter, CachedNote, FilterError, FilterState, FilterStates, NoteCache, NoteRef, RootNoteIdBuf, 12 UnknownIds, 13 }; 14 15 use std::fmt; 16 use std::sync::atomic::{AtomicU32, Ordering}; 17 18 use egui_virtual_list::VirtualList; 19 use enostr::{PoolRelay, Pubkey, RelayPool}; 20 use nostrdb::{Filter, Ndb, Note, NoteKey, Subscription, Transaction}; 21 use std::cell::RefCell; 22 use std::hash::Hash; 23 use std::rc::Rc; 24 25 use tracing::{debug, error, info, warn}; 26 27 pub mod cache; 28 pub mod kind; 29 pub mod route; 30 31 pub use cache::{TimelineCache, TimelineCacheKey}; 32 pub use kind::{ColumnTitle, PubkeySource, TimelineKind}; 33 pub use route::TimelineRoute; 34 35 #[derive(Debug, Hash, Copy, Clone, Eq, PartialEq)] 36 pub struct TimelineId(u32); 37 38 impl TimelineId { 39 pub fn new(id: u32) -> Self { 40 TimelineId(id) 41 } 42 } 43 44 impl fmt::Display for TimelineId { 45 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 46 write!(f, "TimelineId({})", self.0) 47 } 48 } 49 50 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] 51 pub enum ViewFilter { 52 Notes, 53 54 #[default] 55 NotesAndReplies, 56 } 57 58 impl ViewFilter { 59 pub fn name(&self) -> &'static str { 60 match self { 61 ViewFilter::Notes => "Notes", 62 ViewFilter::NotesAndReplies => "Notes & Replies", 63 } 64 } 65 66 pub fn filter_notes(cache: &CachedNote, note: &Note) -> bool { 67 !cache.reply.borrow(note.tags()).is_reply() 68 } 69 70 fn identity(_cache: &CachedNote, _note: &Note) -> bool { 71 true 72 } 73 74 pub fn filter(&self) -> fn(&CachedNote, &Note) -> bool { 75 match self { 76 ViewFilter::Notes => ViewFilter::filter_notes, 77 ViewFilter::NotesAndReplies => ViewFilter::identity, 78 } 79 } 80 } 81 82 /// A timeline view is a filtered view of notes in a timeline. Two standard views 83 /// are "Notes" and "Notes & Replies". A timeline is associated with a Filter, 84 /// but a TimelineTab is a further filtered view of this Filter that can't 85 /// be captured by a Filter itself. 86 #[derive(Default, Debug)] 87 pub struct TimelineTab { 88 pub notes: Vec<NoteRef>, 89 pub selection: i32, 90 pub filter: ViewFilter, 91 pub list: Rc<RefCell<VirtualList>>, 92 } 93 94 impl TimelineTab { 95 pub fn new(filter: ViewFilter) -> Self { 96 TimelineTab::new_with_capacity(filter, 1000) 97 } 98 99 pub fn only_notes_and_replies() -> Vec<Self> { 100 vec![TimelineTab::new(ViewFilter::NotesAndReplies)] 101 } 102 103 pub fn no_replies() -> Vec<Self> { 104 vec![TimelineTab::new(ViewFilter::Notes)] 105 } 106 107 pub fn full_tabs() -> Vec<Self> { 108 vec![ 109 TimelineTab::new(ViewFilter::Notes), 110 TimelineTab::new(ViewFilter::NotesAndReplies), 111 ] 112 } 113 114 pub fn new_with_capacity(filter: ViewFilter, cap: usize) -> Self { 115 let selection = 0i32; 116 let mut list = VirtualList::new(); 117 list.hide_on_resize(None); 118 list.over_scan(1000.0); 119 let list = Rc::new(RefCell::new(list)); 120 let notes: Vec<NoteRef> = Vec::with_capacity(cap); 121 122 TimelineTab { 123 notes, 124 selection, 125 filter, 126 list, 127 } 128 } 129 130 fn insert(&mut self, new_refs: &[NoteRef], reversed: bool) { 131 if new_refs.is_empty() { 132 return; 133 } 134 let num_prev_items = self.notes.len(); 135 let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs); 136 137 self.notes = notes; 138 let new_items = self.notes.len() - num_prev_items; 139 140 // TODO: technically items could have been added inbetween 141 if new_items > 0 { 142 let mut list = self.list.borrow_mut(); 143 144 match merge_kind { 145 // TODO: update egui_virtual_list to support spliced inserts 146 MergeKind::Spliced => { 147 debug!( 148 "spliced when inserting {} new notes, resetting virtual list", 149 new_refs.len() 150 ); 151 list.reset(); 152 } 153 MergeKind::FrontInsert => { 154 // only run this logic if we're reverse-chronological 155 // reversed in this case means chronological, since the 156 // default is reverse-chronological. yeah it's confusing. 157 if !reversed { 158 debug!("inserting {} new notes at start", new_refs.len()); 159 list.items_inserted_at_start(new_items); 160 } 161 } 162 } 163 } 164 } 165 166 pub fn select_down(&mut self) { 167 debug!("select_down {}", self.selection + 1); 168 if self.selection + 1 > self.notes.len() as i32 { 169 return; 170 } 171 172 self.selection += 1; 173 } 174 175 pub fn select_up(&mut self) { 176 debug!("select_up {}", self.selection - 1); 177 if self.selection - 1 < 0 { 178 return; 179 } 180 181 self.selection -= 1; 182 } 183 } 184 185 /// A column in a deck. Holds navigation state, loaded notes, column kind, etc. 186 #[derive(Debug)] 187 pub struct Timeline { 188 pub id: TimelineId, 189 pub kind: TimelineKind, 190 // We may not have the filter loaded yet, so let's make it an option so 191 // that codepaths have to explicitly handle it 192 pub filter: FilterStates, 193 pub views: Vec<TimelineTab>, 194 pub selected_view: usize, 195 196 pub subscription: Option<Subscription>, 197 } 198 199 impl Timeline { 200 /// Create a timeline from a contact list 201 pub fn contact_list( 202 contact_list: &Note, 203 pk_src: PubkeySource, 204 deck_author: Option<&[u8; 32]>, 205 ) -> Result<Self> { 206 let our_pubkey = deck_author.map(|da| pk_src.to_pubkey_bytes(da)); 207 let filter = filter::filter_from_tags(contact_list, our_pubkey)?.into_follow_filter(); 208 209 Ok(Timeline::new( 210 TimelineKind::contact_list(pk_src), 211 FilterState::ready(filter), 212 TimelineTab::full_tabs(), 213 )) 214 } 215 216 pub fn thread(note_id: RootNoteIdBuf) -> Self { 217 let filter = Thread::filters_raw(note_id.borrow()) 218 .iter_mut() 219 .map(|fb| fb.build()) 220 .collect(); 221 Timeline::new( 222 TimelineKind::Thread(note_id), 223 FilterState::ready(filter), 224 TimelineTab::only_notes_and_replies(), 225 ) 226 } 227 228 pub fn hashtag(hashtag: String) -> Self { 229 let filter = Filter::new() 230 .kinds([1]) 231 .limit(filter::default_limit()) 232 .tags([hashtag.clone()], 't') 233 .build(); 234 235 Timeline::new( 236 TimelineKind::Hashtag(hashtag), 237 FilterState::ready(vec![filter]), 238 TimelineTab::only_notes_and_replies(), 239 ) 240 } 241 242 pub fn make_view_id(id: TimelineId, selected_view: usize) -> egui::Id { 243 egui::Id::new((id, selected_view)) 244 } 245 246 pub fn view_id(&self) -> egui::Id { 247 Timeline::make_view_id(self.id, self.selected_view) 248 } 249 250 pub fn new(kind: TimelineKind, filter_state: FilterState, views: Vec<TimelineTab>) -> Self { 251 // global unique id for all new timelines 252 static UIDS: AtomicU32 = AtomicU32::new(0); 253 254 let filter = FilterStates::new(filter_state); 255 let subscription: Option<Subscription> = None; 256 let selected_view = 0; 257 let id = TimelineId::new(UIDS.fetch_add(1, Ordering::Relaxed)); 258 259 Timeline { 260 id, 261 kind, 262 filter, 263 views, 264 subscription, 265 selected_view, 266 } 267 } 268 269 pub fn current_view(&self) -> &TimelineTab { 270 &self.views[self.selected_view] 271 } 272 273 pub fn current_view_mut(&mut self) -> &mut TimelineTab { 274 &mut self.views[self.selected_view] 275 } 276 277 /// Get the note refs for NotesAndReplies. If we only have Notes, then 278 /// just return that instead 279 pub fn all_or_any_notes(&self) -> &[NoteRef] { 280 self.notes(ViewFilter::NotesAndReplies).unwrap_or_else(|| { 281 self.notes(ViewFilter::Notes) 282 .expect("should have at least notes") 283 }) 284 } 285 286 pub fn notes(&self, view: ViewFilter) -> Option<&[NoteRef]> { 287 self.view(view).map(|v| &*v.notes) 288 } 289 290 pub fn view(&self, view: ViewFilter) -> Option<&TimelineTab> { 291 self.views.iter().find(|tab| tab.filter == view) 292 } 293 294 pub fn view_mut(&mut self, view: ViewFilter) -> Option<&mut TimelineTab> { 295 self.views.iter_mut().find(|tab| tab.filter == view) 296 } 297 298 /// Initial insert of notes into a timeline. Subsequent inserts should 299 /// just use the insert function 300 pub fn insert_new( 301 &mut self, 302 txn: &Transaction, 303 ndb: &Ndb, 304 note_cache: &mut NoteCache, 305 notes: &[NoteRef], 306 ) { 307 let filters = { 308 let views = &self.views; 309 let filters: Vec<fn(&CachedNote, &Note) -> bool> = 310 views.iter().map(|v| v.filter.filter()).collect(); 311 filters 312 }; 313 314 for note_ref in notes { 315 for (view, filter) in filters.iter().enumerate() { 316 if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { 317 if filter( 318 note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), 319 ¬e, 320 ) { 321 self.views[view].notes.push(*note_ref) 322 } 323 } 324 } 325 } 326 } 327 328 /// The main function used for inserting notes into timelines. Handles 329 /// inserting into multiple views if we have them. All timeline note 330 /// insertions should use this function. 331 pub fn insert( 332 &mut self, 333 new_note_ids: &[NoteKey], 334 ndb: &Ndb, 335 txn: &Transaction, 336 unknown_ids: &mut UnknownIds, 337 note_cache: &mut NoteCache, 338 reversed: bool, 339 ) -> Result<()> { 340 let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len()); 341 342 for key in new_note_ids { 343 let note = if let Ok(note) = ndb.get_note_by_key(txn, *key) { 344 note 345 } else { 346 error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key); 347 continue; 348 }; 349 350 // Ensure that unknown ids are captured when inserting notes 351 // into the timeline 352 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e); 353 354 let created_at = note.created_at(); 355 new_refs.push(( 356 note, 357 NoteRef { 358 key: *key, 359 created_at, 360 }, 361 )); 362 } 363 364 for view in &mut self.views { 365 match view.filter { 366 ViewFilter::NotesAndReplies => { 367 let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect(); 368 369 view.insert(&refs, reversed); 370 } 371 372 ViewFilter::Notes => { 373 let mut filtered_refs = Vec::with_capacity(new_refs.len()); 374 for (note, nr) in &new_refs { 375 let cached_note = note_cache.cached_note_or_insert(nr.key, note); 376 377 if ViewFilter::filter_notes(cached_note, note) { 378 filtered_refs.push(*nr); 379 } 380 } 381 382 view.insert(&filtered_refs, reversed); 383 } 384 } 385 } 386 387 Ok(()) 388 } 389 390 pub fn poll_notes_into_view( 391 &mut self, 392 ndb: &Ndb, 393 txn: &Transaction, 394 unknown_ids: &mut UnknownIds, 395 note_cache: &mut NoteCache, 396 reversed: bool, 397 ) -> Result<()> { 398 let sub = self 399 .subscription 400 .ok_or(Error::App(notedeck::Error::no_active_sub()))?; 401 402 let new_note_ids = ndb.poll_for_notes(sub, 500); 403 if new_note_ids.is_empty() { 404 return Ok(()); 405 } else { 406 debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids); 407 } 408 409 self.insert(&new_note_ids, ndb, txn, unknown_ids, note_cache, reversed) 410 } 411 } 412 413 pub enum MergeKind { 414 FrontInsert, 415 Spliced, 416 } 417 418 pub fn merge_sorted_vecs<T: Ord + Copy>(vec1: &[T], vec2: &[T]) -> (Vec<T>, MergeKind) { 419 let mut merged = Vec::with_capacity(vec1.len() + vec2.len()); 420 let mut i = 0; 421 let mut j = 0; 422 let mut result: Option<MergeKind> = None; 423 424 while i < vec1.len() && j < vec2.len() { 425 if vec1[i] <= vec2[j] { 426 if result.is_none() && j < vec2.len() { 427 // if we're pushing from our large list and still have 428 // some left in vec2, then this is a splice 429 result = Some(MergeKind::Spliced); 430 } 431 merged.push(vec1[i]); 432 i += 1; 433 } else { 434 merged.push(vec2[j]); 435 j += 1; 436 } 437 } 438 439 // Append any remaining elements from either vector 440 if i < vec1.len() { 441 merged.extend_from_slice(&vec1[i..]); 442 } 443 if j < vec2.len() { 444 merged.extend_from_slice(&vec2[j..]); 445 } 446 447 (merged, result.unwrap_or(MergeKind::FrontInsert)) 448 } 449 450 /// When adding a new timeline, we may have a situation where the 451 /// FilterState is NeedsRemote. This can happen if we don't yet have the 452 /// contact list, etc. For these situations, we query all of the relays 453 /// with the same sub_id. We keep track of this sub_id and update the 454 /// filter with the latest version of the returned filter (ie contact 455 /// list) when they arrive. 456 /// 457 /// We do this by maintaining this sub_id in the filter state, even when 458 /// in the ready state. See: [`FilterReady`] 459 #[allow(clippy::too_many_arguments)] 460 pub fn setup_new_timeline( 461 timeline: &mut Timeline, 462 ndb: &Ndb, 463 subs: &mut Subscriptions, 464 pool: &mut RelayPool, 465 note_cache: &mut NoteCache, 466 since_optimize: bool, 467 our_pk: Option<&Pubkey>, 468 ) { 469 // if we're ready, setup local subs 470 if is_timeline_ready(ndb, pool, note_cache, timeline, our_pk) { 471 if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline) { 472 error!("setup_new_timeline: {err}"); 473 } 474 } 475 476 for relay in &mut pool.relays { 477 send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline); 478 } 479 } 480 481 /// Send initial filters for a specific relay. This typically gets called 482 /// when we first connect to a new relay for the first time. For 483 /// situations where you are adding a new timeline, use 484 /// setup_new_timeline. 485 pub fn send_initial_timeline_filters( 486 ndb: &Ndb, 487 since_optimize: bool, 488 columns: &mut Columns, 489 subs: &mut Subscriptions, 490 pool: &mut RelayPool, 491 relay_id: &str, 492 ) -> Option<()> { 493 info!("Sending initial filters to {}", relay_id); 494 let relay = &mut pool.relays.iter_mut().find(|r| r.url() == relay_id)?; 495 496 for timeline in columns.timelines_mut() { 497 send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline); 498 } 499 500 Some(()) 501 } 502 503 pub fn send_initial_timeline_filter( 504 ndb: &Ndb, 505 can_since_optimize: bool, 506 subs: &mut Subscriptions, 507 relay: &mut PoolRelay, 508 timeline: &mut Timeline, 509 ) { 510 let filter_state = timeline.filter.get(relay.url()); 511 512 match filter_state { 513 FilterState::Broken(err) => { 514 error!( 515 "FetchingRemote state in broken state when sending initial timeline filter? {err}" 516 ); 517 } 518 519 FilterState::FetchingRemote(_unisub) => { 520 error!("FetchingRemote state when sending initial timeline filter?"); 521 } 522 523 FilterState::GotRemote(_sub) => { 524 error!("GotRemote state when sending initial timeline filter?"); 525 } 526 527 FilterState::Ready(filter) => { 528 let filter = filter.to_owned(); 529 let new_filters = filter.into_iter().map(|f| { 530 // limit the size of remote filters 531 let default_limit = filter::default_remote_limit(); 532 let mut lim = f.limit().unwrap_or(default_limit); 533 let mut filter = f; 534 if lim > default_limit { 535 lim = default_limit; 536 filter = filter.limit_mut(lim); 537 } 538 539 let notes = timeline.all_or_any_notes(); 540 541 // Should we since optimize? Not always. For example 542 // if we only have a few notes locally. One way to 543 // determine this is by looking at the current filter 544 // and seeing what its limit is. If we have less 545 // notes than the limit, we might want to backfill 546 // older notes 547 if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { 548 filter = filter::since_optimize_filter(filter, notes); 549 } else { 550 warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); 551 } 552 553 filter 554 }).collect(); 555 556 //let sub_id = damus.gen_subid(&SubKind::Initial); 557 let sub_id = subscriptions::new_sub_id(); 558 subs.subs.insert(sub_id.clone(), SubKind::Initial); 559 560 if let Err(err) = relay.subscribe(sub_id, new_filters) { 561 error!("error subscribing: {err}"); 562 } 563 } 564 565 // we need some data first 566 FilterState::NeedsRemote(filter) => { 567 fetch_contact_list(filter.to_owned(), ndb, subs, relay, timeline) 568 } 569 } 570 } 571 572 fn fetch_contact_list( 573 filter: Vec<Filter>, 574 ndb: &Ndb, 575 subs: &mut Subscriptions, 576 relay: &mut PoolRelay, 577 timeline: &mut Timeline, 578 ) { 579 let sub_kind = SubKind::FetchingContactList(timeline.id); 580 let sub_id = subscriptions::new_sub_id(); 581 let local_sub = ndb.subscribe(&filter).expect("sub"); 582 583 timeline.filter.set_relay_state( 584 relay.url().to_string(), 585 FilterState::fetching_remote(sub_id.clone(), local_sub), 586 ); 587 588 subs.subs.insert(sub_id.clone(), sub_kind); 589 590 info!("fetching contact list from {}", relay.url()); 591 if let Err(err) = relay.subscribe(sub_id, filter) { 592 error!("error subscribing: {err}"); 593 } 594 } 595 596 fn setup_initial_timeline( 597 ndb: &Ndb, 598 timeline: &mut Timeline, 599 note_cache: &mut NoteCache, 600 filters: &[Filter], 601 ) -> Result<()> { 602 timeline.subscription = Some(ndb.subscribe(filters)?); 603 let txn = Transaction::new(ndb)?; 604 debug!( 605 "querying nostrdb sub {:?} {:?}", 606 timeline.subscription, timeline.filter 607 ); 608 let lim = filters[0].limit().unwrap_or(filter::default_limit()) as i32; 609 610 let notes: Vec<NoteRef> = ndb 611 .query(&txn, filters, lim)? 612 .into_iter() 613 .map(NoteRef::from_query_result) 614 .collect(); 615 616 timeline.insert_new(&txn, ndb, note_cache, ¬es); 617 618 Ok(()) 619 } 620 621 pub fn setup_initial_nostrdb_subs( 622 ndb: &Ndb, 623 note_cache: &mut NoteCache, 624 decks_cache: &mut DecksCache, 625 ) -> Result<()> { 626 for decks in decks_cache.get_all_decks_mut() { 627 for deck in decks.decks_mut() { 628 for timeline in deck.columns_mut().timelines_mut() { 629 if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline) { 630 error!("setup_initial_nostrdb_subs: {err}"); 631 } 632 } 633 } 634 } 635 636 Ok(()) 637 } 638 639 fn setup_timeline_nostrdb_sub( 640 ndb: &Ndb, 641 note_cache: &mut NoteCache, 642 timeline: &mut Timeline, 643 ) -> Result<()> { 644 let filter_state = timeline 645 .filter 646 .get_any_ready() 647 .ok_or(Error::App(notedeck::Error::empty_contact_list()))? 648 .to_owned(); 649 650 setup_initial_timeline(ndb, timeline, note_cache, &filter_state)?; 651 652 Ok(()) 653 } 654 655 /// Check our timeline filter and see if we have any filter data ready. 656 /// Our timelines may require additional data before it is functional. For 657 /// example, when we have to fetch a contact list before we do the actual 658 /// following list query. 659 pub fn is_timeline_ready( 660 ndb: &Ndb, 661 pool: &mut RelayPool, 662 note_cache: &mut NoteCache, 663 timeline: &mut Timeline, 664 our_pk: Option<&Pubkey>, 665 ) -> bool { 666 // TODO: we should debounce the filter states a bit to make sure we have 667 // seen all of the different contact lists from each relay 668 if let Some(_f) = timeline.filter.get_any_ready() { 669 return true; 670 } 671 672 let (relay_id, sub) = if let Some((relay_id, sub)) = timeline.filter.get_any_gotremote() { 673 (relay_id.to_string(), sub) 674 } else { 675 return false; 676 }; 677 678 // We got at least one eose for our filter request. Let's see 679 // if nostrdb is done processing it yet. 680 let res = ndb.poll_for_notes(sub, 1); 681 if res.is_empty() { 682 debug!( 683 "check_timeline_filter_state: no notes found (yet?) for timeline {:?}", 684 timeline 685 ); 686 return false; 687 } 688 689 info!("notes found for contact timeline after GotRemote!"); 690 691 let note_key = res[0]; 692 693 let filter = { 694 let txn = Transaction::new(ndb).expect("txn"); 695 let note = ndb.get_note_by_key(&txn, note_key).expect("note"); 696 let add_pk = timeline 697 .kind 698 .pubkey_source() 699 .as_ref() 700 .and_then(|pk_src| our_pk.map(|pk| pk_src.to_pubkey_bytes(pk))); 701 filter::filter_from_tags(¬e, add_pk).map(|f| f.into_follow_filter()) 702 }; 703 704 // TODO: into_follow_filter is hardcoded to contact lists, let's generalize 705 match filter { 706 Err(notedeck::Error::Filter(e)) => { 707 error!("got broken when building filter {e}"); 708 timeline 709 .filter 710 .set_relay_state(relay_id, FilterState::broken(e)); 711 false 712 } 713 Err(err) => { 714 error!("got broken when building filter {err}"); 715 timeline 716 .filter 717 .set_relay_state(relay_id, FilterState::broken(FilterError::EmptyContactList)); 718 false 719 } 720 Ok(filter) => { 721 // we just switched to the ready state, we should send initial 722 // queries and setup the local subscription 723 info!("Found contact list! Setting up local and remote contact list query"); 724 setup_initial_timeline(ndb, timeline, note_cache, &filter).expect("setup init"); 725 timeline 726 .filter 727 .set_relay_state(relay_id, FilterState::ready(filter.clone())); 728 729 //let ck = &timeline.kind; 730 //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); 731 let subid = subscriptions::new_sub_id(); 732 pool.subscribe(subid, filter); 733 true 734 } 735 } 736 }