mod.rs (31415B)
1 use crate::{ 2 error::Error, 3 multi_subscriber::TimelineSub, 4 subscriptions::{self, SubKind, Subscriptions}, 5 timeline::{kind::ListKind, note_units::InsertManyResponse, timeline_units::NotePayload}, 6 Result, 7 }; 8 9 use notedeck::{ 10 contacts::hybrid_contacts_filter, 11 filter::{self, HybridFilter}, 12 is_future_timestamp, tr, unix_time_secs, Accounts, CachedNote, ContactState, FilterError, 13 FilterState, FilterStates, Localization, NoteCache, NoteRef, UnknownIds, 14 }; 15 16 use egui_virtual_list::VirtualList; 17 use enostr::{PoolRelay, Pubkey, RelayPool}; 18 use nostrdb::{Filter, Ndb, Note, NoteKey, Transaction}; 19 use std::rc::Rc; 20 use std::{cell::RefCell, collections::HashSet}; 21 22 use tracing::{debug, error, info, warn}; 23 24 pub mod cache; 25 pub mod kind; 26 mod note_units; 27 pub mod route; 28 pub mod thread; 29 mod timeline_units; 30 mod unit; 31 32 pub use cache::TimelineCache; 33 pub use kind::{ColumnTitle, PubkeySource, ThreadSelection, TimelineKind}; 34 pub use note_units::{CompositeType, InsertionResponse, NoteUnits}; 35 pub use timeline_units::{TimelineUnits, UnknownPks}; 36 pub use unit::{CompositeUnit, NoteUnit, ReactionUnit, RepostUnit}; 37 38 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default, PartialOrd, Ord)] 39 pub enum ViewFilter { 40 MentionsOnly, 41 Notes, 42 43 #[default] 44 NotesAndReplies, 45 46 All, 47 } 48 49 impl ViewFilter { 50 pub fn name(&self, i18n: &mut Localization) -> String { 51 match self { 52 ViewFilter::Notes => tr!(i18n, "Notes", "Filter label for notes only view"), 53 ViewFilter::NotesAndReplies => { 54 tr!( 55 i18n, 56 "Notes & Replies", 57 "Filter label for notes and replies view" 58 ) 59 } 60 ViewFilter::All => tr!(i18n, "All", "Filter label for all notes view"), 61 ViewFilter::MentionsOnly => { 62 tr!(i18n, "Mentions", "Filter label for mentions only view") 63 } 64 } 65 } 66 67 pub fn filter_notes(cache: &CachedNote, note: &Note) -> bool { 68 note.kind() == 6 || !cache.reply.borrow(note.tags()).is_reply() 69 } 70 71 fn identity(_cache: &CachedNote, _note: &Note) -> bool { 72 true 73 } 74 75 fn notes_and_replies(_cache: &CachedNote, note: &Note) -> bool { 76 note.kind() == 1 || note.kind() == 6 77 } 78 79 fn mentions_only(cache: &CachedNote, note: &Note) -> bool { 80 if note.kind() != 1 { 81 return false; 82 } 83 84 let note_reply = cache.reply.borrow(note.tags()); 85 86 note_reply.is_reply() || note_reply.mention().is_some() 87 } 88 89 pub fn filter(&self) -> fn(&CachedNote, &Note) -> bool { 90 match self { 91 ViewFilter::Notes => ViewFilter::filter_notes, 92 ViewFilter::NotesAndReplies => ViewFilter::notes_and_replies, 93 ViewFilter::All => ViewFilter::identity, 94 ViewFilter::MentionsOnly => ViewFilter::mentions_only, 95 } 96 } 97 } 98 99 /// A timeline view is a filtered view of notes in a timeline. Two standard views 100 /// are "Notes" and "Notes & Replies". A timeline is associated with a Filter, 101 /// but a TimelineTab is a further filtered view of this Filter that can't 102 /// be captured by a Filter itself. 103 #[derive(Default, Debug)] 104 pub struct TimelineTab { 105 pub units: TimelineUnits, 106 pub selection: i32, 107 pub filter: ViewFilter, 108 pub list: Rc<RefCell<VirtualList>>, 109 } 110 111 impl TimelineTab { 112 pub fn new(filter: ViewFilter) -> Self { 113 TimelineTab::new_with_capacity(filter, 1000) 114 } 115 116 pub fn only_notes_and_replies() -> Vec<Self> { 117 vec![TimelineTab::new(ViewFilter::NotesAndReplies)] 118 } 119 120 pub fn no_replies() -> Vec<Self> { 121 vec![TimelineTab::new(ViewFilter::Notes)] 122 } 123 124 pub fn full_tabs() -> Vec<Self> { 125 vec![ 126 TimelineTab::new(ViewFilter::Notes), 127 TimelineTab::new(ViewFilter::NotesAndReplies), 128 ] 129 } 130 131 pub fn notifications() -> Vec<Self> { 132 vec![ 133 TimelineTab::new(ViewFilter::All), 134 TimelineTab::new(ViewFilter::MentionsOnly), 135 ] 136 } 137 138 pub fn new_with_capacity(filter: ViewFilter, cap: usize) -> Self { 139 let selection = 0i32; 140 let mut list = VirtualList::new(); 141 list.hide_on_resize(None); 142 list.over_scan(50.0); 143 let list = Rc::new(RefCell::new(list)); 144 145 TimelineTab { 146 units: TimelineUnits::with_capacity(cap), 147 selection, 148 filter, 149 list, 150 } 151 } 152 153 /// Reset the tab to an empty state, clearing all cached notes. 154 /// 155 /// Used when the contact list changes and we need to rebuild 156 /// the timeline with a new filter. 157 pub fn reset(&mut self) { 158 self.units = TimelineUnits::with_capacity(1000); 159 self.selection = 0; 160 self.list.borrow_mut().reset(); 161 } 162 163 fn insert<'a>( 164 &mut self, 165 payloads: Vec<&'a NotePayload>, 166 ndb: &Ndb, 167 txn: &Transaction, 168 reversed: bool, 169 use_front_insert: bool, 170 ) -> Option<UnknownPks<'a>> { 171 if payloads.is_empty() { 172 return None; 173 } 174 175 let num_refs = payloads.len(); 176 177 let resp = self.units.merge_new_notes(payloads, ndb, txn); 178 179 let InsertManyResponse::Some { 180 entries_merged, 181 merge_kind, 182 } = resp.insertion_response 183 else { 184 return resp.tl_response; 185 }; 186 187 let mut list = self.list.borrow_mut(); 188 189 match merge_kind { 190 // TODO: update egui_virtual_list to support spliced inserts 191 MergeKind::Spliced => { 192 debug!("spliced when inserting {num_refs} new notes, resetting virtual list",); 193 list.reset(); 194 } 195 MergeKind::FrontInsert => 's: { 196 if !use_front_insert { 197 break 's; 198 } 199 200 // only run this logic if we're reverse-chronological 201 // reversed in this case means chronological, since the 202 // default is reverse-chronological. yeah it's confusing. 203 if !reversed { 204 debug!("inserting {num_refs} new notes at start"); 205 list.items_inserted_at_start(entries_merged); 206 } 207 } 208 }; 209 210 resp.tl_response 211 } 212 213 pub fn select_down(&mut self) { 214 debug!("select_down {}", self.selection + 1); 215 if self.selection + 1 > self.units.len() as i32 { 216 return; 217 } 218 219 self.selection += 1; 220 } 221 222 pub fn select_up(&mut self) { 223 debug!("select_up {}", self.selection - 1); 224 if self.selection - 1 < 0 { 225 return; 226 } 227 228 self.selection -= 1; 229 } 230 } 231 232 impl<'a> UnknownPks<'a> { 233 pub fn process(&self, unknown_ids: &mut UnknownIds, ndb: &Ndb, txn: &Transaction) { 234 for pk in &self.unknown_pks { 235 unknown_ids.add_pubkey_if_missing(ndb, txn, pk); 236 } 237 } 238 } 239 240 /// A column in a deck. Holds navigation state, loaded notes, column kind, etc. 241 #[derive(Debug)] 242 pub struct Timeline { 243 pub kind: TimelineKind, 244 // We may not have the filter loaded yet, so let's make it an option so 245 // that codepaths have to explicitly handle it 246 pub filter: FilterStates, 247 pub views: Vec<TimelineTab>, 248 pub selected_view: usize, 249 pub seen_latest_notes: bool, 250 251 pub subscription: TimelineSub, 252 pub enable_front_insert: bool, 253 254 /// Timestamp (`created_at`) of the contact list note used to build 255 /// the current filter. Used to detect when the contact list has 256 /// changed (e.g., after follow/unfollow) so the filter can be rebuilt. 257 pub contact_list_timestamp: Option<u64>, 258 } 259 260 impl Timeline { 261 /// Create a timeline from a contact list 262 pub fn contact_list(contact_list: &Note, pubkey: &[u8; 32]) -> Result<Self> { 263 let with_hashtags = false; 264 let add_pk = Some(pubkey); 265 let filter = hybrid_contacts_filter(contact_list, add_pk, with_hashtags)?; 266 267 Ok(Timeline::new( 268 TimelineKind::contact_list(Pubkey::new(*pubkey)), 269 FilterState::ready_hybrid(filter), 270 TimelineTab::full_tabs(), 271 )) 272 } 273 274 pub fn last_per_pubkey(list: &Note, list_kind: &ListKind) -> Result<Self> { 275 let kind = 1; 276 let notes_per_pk = 1; 277 let filter = filter::last_n_per_pubkey_from_tags(list, kind, notes_per_pk)?; 278 279 Ok(Timeline::new( 280 TimelineKind::last_per_pubkey(*list_kind), 281 FilterState::ready(filter), 282 TimelineTab::only_notes_and_replies(), 283 )) 284 } 285 286 pub fn hashtag(hashtag: Vec<String>) -> Self { 287 let filters = hashtag 288 .iter() 289 .filter(|tag| !tag.is_empty()) 290 .map(|tag| { 291 Filter::new() 292 .kinds([1]) 293 .limit(filter::default_limit()) 294 .tags([tag.as_str()], 't') 295 .build() 296 }) 297 .collect::<Vec<_>>(); 298 299 Timeline::new( 300 TimelineKind::Hashtag(hashtag), 301 FilterState::ready(filters), 302 TimelineTab::only_notes_and_replies(), 303 ) 304 } 305 306 pub fn make_view_id(id: &TimelineKind, col: usize, selected_view: usize) -> egui::Id { 307 egui::Id::new((id, selected_view, col)) 308 } 309 310 pub fn view_id(&self, col: usize) -> egui::Id { 311 Timeline::make_view_id(&self.kind, col, self.selected_view) 312 } 313 314 pub fn new(kind: TimelineKind, filter_state: FilterState, views: Vec<TimelineTab>) -> Self { 315 let filter = FilterStates::new(filter_state); 316 let subscription = TimelineSub::default(); 317 let selected_view = 0; 318 319 // by default, disabled for profiles since they contain widgets above the list items 320 let enable_front_insert = !matches!(kind, TimelineKind::Profile(_)); 321 322 Timeline { 323 kind, 324 filter, 325 views, 326 subscription, 327 selected_view, 328 enable_front_insert, 329 seen_latest_notes: false, 330 contact_list_timestamp: None, 331 } 332 } 333 334 pub fn current_view(&self) -> &TimelineTab { 335 &self.views[self.selected_view] 336 } 337 338 pub fn current_view_mut(&mut self) -> &mut TimelineTab { 339 &mut self.views[self.selected_view] 340 } 341 342 /// Get the note refs for the filter with the widest scope 343 pub fn all_or_any_entries(&self) -> &TimelineUnits { 344 let widest_filter = self 345 .views 346 .iter() 347 .map(|view| view.filter) 348 .max() 349 .expect("at least one filter exists"); 350 351 self.entries(widest_filter) 352 .expect("should have at least notes") 353 } 354 355 pub fn entries(&self, view: ViewFilter) -> Option<&TimelineUnits> { 356 self.view(view).map(|v| &v.units) 357 } 358 359 pub fn latest_note(&self, view: ViewFilter) -> Option<&NoteRef> { 360 self.view(view).and_then(|v| v.units.latest()) 361 } 362 363 pub fn view(&self, view: ViewFilter) -> Option<&TimelineTab> { 364 self.views.iter().find(|tab| tab.filter == view) 365 } 366 367 pub fn view_mut(&mut self, view: ViewFilter) -> Option<&mut TimelineTab> { 368 self.views.iter_mut().find(|tab| tab.filter == view) 369 } 370 371 /// Reset all views to an empty state, clearing all cached notes. 372 /// 373 /// Used when the contact list changes and we need to rebuild 374 /// the timeline with a new filter. 375 pub fn reset_views(&mut self) { 376 for view in &mut self.views { 377 view.reset(); 378 } 379 } 380 381 /// Initial insert of notes into a timeline. Subsequent inserts should 382 /// just use the insert function 383 #[profiling::function] 384 pub fn insert_new( 385 &mut self, 386 txn: &Transaction, 387 ndb: &Ndb, 388 note_cache: &mut NoteCache, 389 notes: &[NoteRef], 390 ) -> Option<UnknownPksOwned> { 391 let filters = { 392 let views = &self.views; 393 let filters: Vec<fn(&CachedNote, &Note) -> bool> = 394 views.iter().map(|v| v.filter.filter()).collect(); 395 filters 396 }; 397 398 let now = unix_time_secs(); 399 let mut unknown_pks = HashSet::new(); 400 for note_ref in notes { 401 if is_future_timestamp(note_ref.created_at, now) { 402 continue; 403 } 404 405 for (view, filter) in filters.iter().enumerate() { 406 if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { 407 if filter( 408 note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), 409 ¬e, 410 ) { 411 if let Some(resp) = self.views[view] 412 .units 413 .merge_new_notes( 414 vec![&NotePayload { 415 note, 416 key: note_ref.key, 417 }], 418 ndb, 419 txn, 420 ) 421 .tl_response 422 { 423 let pks: HashSet<Pubkey> = resp 424 .unknown_pks 425 .into_iter() 426 .map(|r| Pubkey::new(*r)) 427 .collect(); 428 429 unknown_pks.extend(pks); 430 } 431 } 432 } 433 } 434 } 435 436 Some(UnknownPksOwned { pks: unknown_pks }) 437 } 438 439 /// The main function used for inserting notes into timelines. Handles 440 /// inserting into multiple views if we have them. All timeline note 441 /// insertions should use this function. 442 pub fn insert( 443 &mut self, 444 new_note_ids: &[NoteKey], 445 ndb: &Ndb, 446 txn: &Transaction, 447 unknown_ids: &mut UnknownIds, 448 note_cache: &mut NoteCache, 449 reversed: bool, 450 ) -> Result<()> { 451 let mut payloads: Vec<NotePayload> = Vec::with_capacity(new_note_ids.len()); 452 let now = unix_time_secs(); 453 454 for key in new_note_ids { 455 let note = if let Ok(note) = ndb.get_note_by_key(txn, *key) { 456 note 457 } else { 458 error!( 459 "hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", 460 key 461 ); 462 continue; 463 }; 464 465 if is_future_timestamp(note.created_at(), now) { 466 continue; 467 } 468 469 // Ensure that unknown ids are captured when inserting notes 470 // into the timeline 471 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e); 472 473 payloads.push(NotePayload { note, key: *key }); 474 } 475 476 for view in &mut self.views { 477 let should_include = view.filter.filter(); 478 let mut filtered_payloads = Vec::with_capacity(payloads.len()); 479 for payload in &payloads { 480 let cached_note = note_cache.cached_note_or_insert(payload.key, &payload.note); 481 482 if should_include(cached_note, &payload.note) { 483 filtered_payloads.push(payload); 484 } 485 } 486 487 if let Some(res) = view.insert( 488 filtered_payloads, 489 ndb, 490 txn, 491 reversed, 492 self.enable_front_insert, 493 ) { 494 res.process(unknown_ids, ndb, txn); 495 } 496 } 497 498 Ok(()) 499 } 500 501 #[profiling::function] 502 pub fn poll_notes_into_view( 503 &mut self, 504 ndb: &Ndb, 505 txn: &Transaction, 506 unknown_ids: &mut UnknownIds, 507 note_cache: &mut NoteCache, 508 reversed: bool, 509 ) -> Result<()> { 510 if !self.kind.should_subscribe_locally() { 511 // don't need to poll for timelines that don't have local subscriptions 512 return Ok(()); 513 } 514 515 let sub = self 516 .subscription 517 .get_local() 518 .ok_or(Error::App(notedeck::Error::no_active_sub()))?; 519 520 let new_note_ids = ndb.poll_for_notes(sub, 500); 521 if new_note_ids.is_empty() { 522 return Ok(()); 523 } else { 524 self.seen_latest_notes = false; 525 } 526 527 self.insert(&new_note_ids, ndb, txn, unknown_ids, note_cache, reversed) 528 } 529 530 /// Invalidate the timeline, forcing a rebuild on the next check. 531 /// 532 /// This resets all relay states to [`FilterState::NeedsRemote`] and 533 /// clears the contact list timestamp, which will trigger the filter 534 /// rebuild flow when the timeline is next polled. 535 /// 536 /// Note: We reset states rather than clearing them so that 537 /// [`Self::set_all_states`] can update them during the rebuild. 538 pub fn invalidate(&mut self) { 539 self.filter.initial_state = FilterState::NeedsRemote; 540 for state in self.filter.states.values_mut() { 541 *state = FilterState::NeedsRemote; 542 } 543 self.contact_list_timestamp = None; 544 } 545 } 546 547 pub struct UnknownPksOwned { 548 pub pks: HashSet<Pubkey>, 549 } 550 551 impl UnknownPksOwned { 552 pub fn process(&self, ndb: &Ndb, txn: &Transaction, unknown_ids: &mut UnknownIds) { 553 self.pks 554 .iter() 555 .for_each(|p| unknown_ids.add_pubkey_if_missing(ndb, txn, p)); 556 } 557 } 558 559 pub enum MergeKind { 560 FrontInsert, 561 Spliced, 562 } 563 564 pub fn merge_sorted_vecs<T: Ord + Copy>(vec1: &[T], vec2: &[T]) -> (Vec<T>, MergeKind) { 565 let mut merged = Vec::with_capacity(vec1.len() + vec2.len()); 566 let mut i = 0; 567 let mut j = 0; 568 let mut result: Option<MergeKind> = None; 569 570 while i < vec1.len() && j < vec2.len() { 571 if vec1[i] <= vec2[j] { 572 if result.is_none() && j < vec2.len() { 573 // if we're pushing from our large list and still have 574 // some left in vec2, then this is a splice 575 result = Some(MergeKind::Spliced); 576 } 577 merged.push(vec1[i]); 578 i += 1; 579 } else { 580 merged.push(vec2[j]); 581 j += 1; 582 } 583 } 584 585 // Append any remaining elements from either vector 586 if i < vec1.len() { 587 merged.extend_from_slice(&vec1[i..]); 588 } 589 if j < vec2.len() { 590 merged.extend_from_slice(&vec2[j..]); 591 } 592 593 (merged, result.unwrap_or(MergeKind::FrontInsert)) 594 } 595 596 /// When adding a new timeline, we may have a situation where the 597 /// FilterState is NeedsRemote. This can happen if we don't yet have the 598 /// contact list, etc. For these situations, we query all of the relays 599 /// with the same sub_id. We keep track of this sub_id and update the 600 /// filter with the latest version of the returned filter (ie contact 601 /// list) when they arrive. 602 /// 603 /// We do this by maintaining this sub_id in the filter state, even when 604 /// in the ready state. See: [`FilterReady`] 605 #[allow(clippy::too_many_arguments)] 606 pub fn setup_new_timeline( 607 timeline: &mut Timeline, 608 ndb: &mut Ndb, 609 txn: &Transaction, 610 subs: &mut Subscriptions, 611 pool: &mut RelayPool, 612 note_cache: &mut NoteCache, 613 since_optimize: bool, 614 accounts: &Accounts, 615 unknown_ids: &mut UnknownIds, 616 ) { 617 // if we're ready, setup local subs 618 if is_timeline_ready(ndb, pool, note_cache, timeline, accounts, unknown_ids) { 619 if let Err(err) = setup_timeline_nostrdb_sub(ndb, txn, note_cache, timeline, unknown_ids) { 620 error!("setup_new_timeline: {err}"); 621 } 622 } 623 624 for relay in &mut pool.relays { 625 send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts); 626 } 627 timeline.subscription.increment(); 628 } 629 630 /// Send initial filters for a specific relay. This typically gets called 631 /// when we first connect to a new relay for the first time. For 632 /// situations where you are adding a new timeline, use 633 /// setup_new_timeline. 634 #[profiling::function] 635 pub fn send_initial_timeline_filters( 636 since_optimize: bool, 637 timeline_cache: &mut TimelineCache, 638 subs: &mut Subscriptions, 639 pool: &mut RelayPool, 640 relay_id: &str, 641 accounts: &Accounts, 642 ) -> Option<()> { 643 info!("Sending initial filters to {}", relay_id); 644 let relay = &mut pool.relays.iter_mut().find(|r| r.url() == relay_id)?; 645 646 for (_kind, timeline) in timeline_cache { 647 send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts); 648 } 649 650 Some(()) 651 } 652 653 pub fn send_initial_timeline_filter( 654 can_since_optimize: bool, 655 subs: &mut Subscriptions, 656 relay: &mut PoolRelay, 657 timeline: &mut Timeline, 658 accounts: &Accounts, 659 ) { 660 let filter_state = timeline.filter.get_mut(relay.url()); 661 662 match filter_state { 663 FilterState::Broken(err) => { 664 error!( 665 "FetchingRemote state in broken state when sending initial timeline filter? {err}" 666 ); 667 } 668 669 FilterState::FetchingRemote(_unisub) => { 670 error!("FetchingRemote state when sending initial timeline filter?"); 671 } 672 673 FilterState::GotRemote(_sub) => { 674 error!("GotRemote state when sending initial timeline filter?"); 675 } 676 677 FilterState::Ready(filter) => { 678 let filter = filter.to_owned(); 679 let new_filters: Vec<Filter> = filter.remote().to_owned().into_iter().map(|f| { 680 // limit the size of remote filters 681 let default_limit = filter::default_remote_limit(); 682 let mut lim = f.limit().unwrap_or(default_limit); 683 let mut filter = f; 684 if lim > default_limit { 685 lim = default_limit; 686 filter = filter.limit_mut(lim); 687 } 688 689 let entries = timeline.all_or_any_entries(); 690 691 // Should we since optimize? Not always. For example 692 // if we only have a few notes locally. One way to 693 // determine this is by looking at the current filter 694 // and seeing what its limit is. If we have less 695 // notes than the limit, we might want to backfill 696 // older notes 697 if can_since_optimize && filter::should_since_optimize(lim, entries.len()) { 698 filter = filter::since_optimize_filter(filter, entries.latest()); 699 } else { 700 warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", &timeline.kind); 701 } 702 703 filter 704 }).collect(); 705 706 //let sub_id = damus.gen_subid(&SubKind::Initial); 707 let sub_id = subscriptions::new_sub_id(); 708 subs.subs.insert(sub_id.clone(), SubKind::Initial); 709 710 if let Err(err) = relay.subscribe(sub_id.clone(), new_filters.clone()) { 711 error!("error subscribing: {err}"); 712 } else { 713 timeline.subscription.force_add_remote(sub_id); 714 } 715 } 716 717 // we need some data first 718 FilterState::NeedsRemote => fetch_contact_list(subs, timeline, accounts), 719 } 720 } 721 722 pub fn fetch_contact_list(subs: &mut Subscriptions, timeline: &mut Timeline, accounts: &Accounts) { 723 if timeline.filter.get_any_ready().is_some() { 724 return; 725 } 726 727 let new_filter_state = match accounts.get_selected_account().data.contacts.get_state() { 728 ContactState::Unreceived => { 729 FilterState::FetchingRemote(filter::FetchingRemoteType::Contact) 730 } 731 ContactState::Received { 732 contacts: _, 733 note_key: _, 734 timestamp: _, 735 } => FilterState::GotRemote(filter::GotRemoteType::Contact), 736 }; 737 738 timeline.filter.set_all_states(new_filter_state); 739 740 let sub = &accounts.get_subs().contacts; 741 if subs.subs.contains_key(&sub.remote) { 742 return; 743 } 744 745 let sub_kind = SubKind::FetchingContactList(timeline.kind.clone()); 746 subs.subs.insert(sub.remote.clone(), sub_kind); 747 } 748 749 #[profiling::function] 750 fn setup_initial_timeline( 751 ndb: &Ndb, 752 txn: &Transaction, 753 timeline: &mut Timeline, 754 note_cache: &mut NoteCache, 755 unknown_ids: &mut UnknownIds, 756 filters: &HybridFilter, 757 ) -> Result<()> { 758 // some timelines are one-shot and a refreshed, like last_per_pubkey algo feed 759 if timeline.kind.should_subscribe_locally() { 760 timeline.subscription.try_add_local(ndb, filters); 761 } 762 763 debug!( 764 "querying nostrdb sub {:?} {:?}", 765 timeline.subscription, timeline.filter 766 ); 767 768 let notes = { 769 let mut notes = Vec::new(); 770 771 for package in filters.local().packages { 772 let mut lim = 0i32; 773 for filter in package.filters { 774 lim += filter.limit().unwrap_or(1) as i32; 775 } 776 777 debug!("setup_initial_timeline: limit for local filter is {}", lim); 778 779 let cur_notes: Vec<NoteRef> = ndb 780 .query(txn, package.filters, lim)? 781 .into_iter() 782 .map(NoteRef::from_query_result) 783 .collect(); 784 tracing::debug!( 785 "Found {} notes for kind: {:?}", 786 cur_notes.len(), 787 package.kind 788 ); 789 notes.extend(&cur_notes); 790 } 791 792 notes 793 }; 794 795 if let Some(pks) = timeline.insert_new(txn, ndb, note_cache, ¬es) { 796 pks.process(ndb, txn, unknown_ids); 797 } 798 799 Ok(()) 800 } 801 802 #[profiling::function] 803 pub fn setup_initial_nostrdb_subs( 804 ndb: &Ndb, 805 note_cache: &mut NoteCache, 806 timeline_cache: &mut TimelineCache, 807 unknown_ids: &mut UnknownIds, 808 ) -> Result<()> { 809 for (_kind, timeline) in timeline_cache { 810 let txn = Transaction::new(ndb).expect("txn"); 811 if let Err(err) = setup_timeline_nostrdb_sub(ndb, &txn, note_cache, timeline, unknown_ids) { 812 error!("setup_initial_nostrdb_subs: {err}"); 813 } 814 } 815 816 Ok(()) 817 } 818 819 fn setup_timeline_nostrdb_sub( 820 ndb: &Ndb, 821 txn: &Transaction, 822 note_cache: &mut NoteCache, 823 timeline: &mut Timeline, 824 unknown_ids: &mut UnknownIds, 825 ) -> Result<()> { 826 let filter_state = timeline 827 .filter 828 .get_any_ready() 829 .ok_or(Error::App(notedeck::Error::empty_contact_list()))? 830 .to_owned(); 831 832 setup_initial_timeline(ndb, txn, timeline, note_cache, unknown_ids, &filter_state)?; 833 834 Ok(()) 835 } 836 837 /// Check if the contact list has changed since the filter was built. 838 /// 839 /// Returns `Some(timestamp)` if the contact list has a newer timestamp 840 /// than when the filter was built, indicating the filter needs rebuilding. 841 /// Returns `None` if the filter is up-to-date or this isn't a contact timeline. 842 fn contact_list_needs_rebuild(timeline: &Timeline, accounts: &Accounts) -> Option<u64> { 843 if !timeline.kind.is_contacts() { 844 return None; 845 } 846 847 let ContactState::Received { 848 contacts: _, 849 note_key: _, 850 timestamp, 851 } = accounts.get_selected_account().data.contacts.get_state() 852 else { 853 return None; 854 }; 855 856 if timeline.contact_list_timestamp == Some(*timestamp) { 857 return None; 858 } 859 860 Some(*timestamp) 861 } 862 863 /// Check our timeline filter and see if we have any filter data ready. 864 /// 865 /// Our timelines may require additional data before it is functional. For 866 /// example, when we have to fetch a contact list before we do the actual 867 /// following list query. 868 /// 869 /// For contact list timelines, this also detects when the contact list has 870 /// changed (e.g., after follow/unfollow) and triggers a filter rebuild. 871 #[profiling::function] 872 pub fn is_timeline_ready( 873 ndb: &mut Ndb, 874 pool: &mut RelayPool, 875 note_cache: &mut NoteCache, 876 timeline: &mut Timeline, 877 accounts: &Accounts, 878 unknown_ids: &mut UnknownIds, 879 ) -> bool { 880 // Check if filter is ready and contact list hasn't changed 881 if timeline.filter.get_any_ready().is_some() { 882 let Some(new_timestamp) = contact_list_needs_rebuild(timeline, accounts) else { 883 return true; 884 }; 885 886 // Contact list changed - invalidate and rebuild 887 info!( 888 "Contact list changed (old: {:?}, new: {}), rebuilding timeline filter", 889 timeline.contact_list_timestamp, new_timestamp 890 ); 891 timeline.invalidate(); 892 timeline.reset_views(); 893 timeline.subscription.reset(ndb, pool); 894 // Fall through to rebuild 895 } 896 897 let Some(res) = timeline.filter.get_any_gotremote() else { 898 return false; 899 }; 900 901 let (relay_id, note_key) = match res { 902 filter::GotRemoteResult::Normal { relay_id, sub_id } => { 903 // We got at least one eose for our filter request. Let's see 904 // if nostrdb is done processing it yet. 905 let res = ndb.poll_for_notes(sub_id, 1); 906 if res.is_empty() { 907 debug!( 908 "check_timeline_filter_state: no notes found (yet?) for timeline {:?}", 909 timeline 910 ); 911 return false; 912 } 913 914 info!("notes found for contact timeline after GotRemote!"); 915 916 (relay_id, res[0]) 917 } 918 filter::GotRemoteResult::Contact { relay_id } => { 919 let ContactState::Received { 920 contacts: _, 921 note_key, 922 timestamp: _, 923 } = accounts.get_selected_account().data.contacts.get_state() 924 else { 925 return false; 926 }; 927 928 (relay_id, *note_key) 929 } 930 }; 931 932 let with_hashtags = false; 933 934 let (filter, contact_timestamp) = { 935 let txn = Transaction::new(ndb).expect("txn"); 936 let note = ndb.get_note_by_key(&txn, note_key).expect("note"); 937 let add_pk = timeline.kind.pubkey().map(|pk| pk.bytes()); 938 let timestamp = note.created_at(); 939 940 ( 941 hybrid_contacts_filter(¬e, add_pk, with_hashtags), 942 timestamp, 943 ) 944 }; 945 946 // TODO: into_follow_filter is hardcoded to contact lists, let's generalize 947 match filter { 948 Err(notedeck::Error::Filter(e)) => { 949 error!("got broken when building filter {e}"); 950 timeline 951 .filter 952 .set_relay_state(relay_id, FilterState::broken(e)); 953 false 954 } 955 Err(err) => { 956 error!("got broken when building filter {err}"); 957 timeline 958 .filter 959 .set_relay_state(relay_id, FilterState::broken(FilterError::EmptyContactList)); 960 false 961 } 962 Ok(filter) => { 963 // we just switched to the ready state, we should send initial 964 // queries and setup the local subscription 965 info!("Found contact list! Setting up local and remote contact list query"); 966 let txn = Transaction::new(ndb).expect("txn"); 967 setup_initial_timeline(ndb, &txn, timeline, note_cache, unknown_ids, &filter) 968 .expect("setup init"); 969 timeline 970 .filter 971 .set_relay_state(relay_id, FilterState::ready_hybrid(filter.clone())); 972 973 // Store timestamp so we can detect when contact list changes 974 timeline.contact_list_timestamp = Some(contact_timestamp); 975 976 timeline.subscription.try_add_remote(pool, &filter); 977 true 978 } 979 } 980 }