mod.rs (31149B)
1 use crate::{ 2 error::Error, 3 scoped_sub_owner_keys::timeline_remote_owner_key, 4 timeline::{ 5 kind::{people_list_note_filter, AlgoTimeline, ListKind, PeopleListRef}, 6 note_units::InsertManyResponse, 7 sub::TimelineSub, 8 timeline_units::NotePayload, 9 }, 10 Result, 11 }; 12 13 use notedeck::{ 14 contacts::hybrid_contacts_filter, 15 filter::{self}, 16 is_future_timestamp, tr, unix_time_secs, Accounts, CachedNote, ContactState, FilterError, 17 FilterState, Localization, NoteCache, NoteRef, RelaySelection, ScopedSubApi, ScopedSubIdentity, 18 SubConfig, SubKey, UnknownIds, 19 }; 20 21 use egui_virtual_list::VirtualList; 22 use enostr::Pubkey; 23 use nostrdb::{Filter, Ndb, Note, NoteKey, Transaction}; 24 use std::rc::Rc; 25 use std::{cell::RefCell, collections::HashSet}; 26 27 use tracing::{debug, error, info, warn}; 28 29 pub mod cache; 30 pub mod kind; 31 mod note_units; 32 pub mod route; 33 mod sub; 34 pub mod thread; 35 mod timeline_units; 36 mod unit; 37 38 pub use cache::TimelineCache; 39 pub use kind::{ColumnTitle, PubkeySource, ThreadSelection, TimelineKind}; 40 pub use note_units::{CompositeType, InsertionResponse, NoteUnits}; 41 pub use timeline_units::{TimelineUnits, UnknownPks}; 42 pub use unit::{CompositeUnit, NoteUnit, ReactionUnit, RepostUnit}; 43 44 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 45 enum TimelineScopedSub { 46 RemoteByKind, 47 } 48 49 fn timeline_remote_sub_key(kind: &TimelineKind) -> SubKey { 50 SubKey::builder(TimelineScopedSub::RemoteByKind) 51 .with(kind) 52 .finish() 53 } 54 55 fn timeline_remote_sub_config(remote_filters: Vec<Filter>) -> SubConfig { 56 SubConfig { 57 relays: RelaySelection::AccountsRead, 58 filters: remote_filters, 59 use_transparent: false, 60 } 61 } 62 63 pub(crate) fn ensure_remote_timeline_subscription( 64 timeline: &mut Timeline, 65 account_pk: Pubkey, 66 remote_filters: Vec<Filter>, 67 scoped_subs: &mut ScopedSubApi<'_, '_>, 68 ) { 69 let owner = timeline_remote_owner_key(account_pk, &timeline.kind); 70 let identity = ScopedSubIdentity::account(owner, timeline_remote_sub_key(&timeline.kind)); 71 let config = timeline_remote_sub_config(remote_filters); 72 let _ = scoped_subs.ensure_sub(identity, config); 73 timeline.subscription.mark_remote_seeded(account_pk); 74 } 75 76 pub(crate) fn update_remote_timeline_subscription( 77 timeline: &mut Timeline, 78 remote_filters: Vec<Filter>, 79 scoped_subs: &mut ScopedSubApi<'_, '_>, 80 ) { 81 let owner = timeline_remote_owner_key(scoped_subs.selected_account_pubkey(), &timeline.kind); 82 let identity = ScopedSubIdentity::account(owner, timeline_remote_sub_key(&timeline.kind)); 83 let config = timeline_remote_sub_config(remote_filters); 84 let _ = scoped_subs.set_sub(identity, config); 85 timeline 86 .subscription 87 .mark_remote_seeded(scoped_subs.selected_account_pubkey()); 88 } 89 90 pub fn drop_timeline_remote_owner( 91 timeline: &Timeline, 92 account_pk: Pubkey, 93 scoped_subs: &mut ScopedSubApi<'_, '_>, 94 ) { 95 let owner = timeline_remote_owner_key(account_pk, &timeline.kind); 96 let _ = scoped_subs.drop_owner(owner); 97 } 98 99 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default, PartialOrd, Ord)] 100 pub enum ViewFilter { 101 MentionsOnly, 102 Notes, 103 104 #[default] 105 NotesAndReplies, 106 107 All, 108 } 109 110 impl ViewFilter { 111 pub fn name(&self, i18n: &mut Localization) -> String { 112 match self { 113 ViewFilter::Notes => tr!(i18n, "Notes", "Filter label for notes only view"), 114 ViewFilter::NotesAndReplies => { 115 tr!( 116 i18n, 117 "Notes & Replies", 118 "Filter label for notes and replies view" 119 ) 120 } 121 ViewFilter::All => tr!(i18n, "All", "Filter label for all notes view"), 122 ViewFilter::MentionsOnly => { 123 tr!(i18n, "Mentions", "Filter label for mentions only view") 124 } 125 } 126 } 127 128 pub fn filter_notes(cache: &CachedNote, note: &Note) -> bool { 129 note.kind() == 6 || !cache.reply.borrow(note.tags()).is_reply() 130 } 131 132 fn identity(_cache: &CachedNote, _note: &Note) -> bool { 133 true 134 } 135 136 fn notes_and_replies(_cache: &CachedNote, note: &Note) -> bool { 137 note.kind() == 1 || note.kind() == 6 138 } 139 140 fn mentions_only(cache: &CachedNote, note: &Note) -> bool { 141 if note.kind() != 1 { 142 return false; 143 } 144 145 let note_reply = cache.reply.borrow(note.tags()); 146 147 note_reply.is_reply() || note_reply.mention().is_some() 148 } 149 150 pub fn filter(&self) -> fn(&CachedNote, &Note) -> bool { 151 match self { 152 ViewFilter::Notes => ViewFilter::filter_notes, 153 ViewFilter::NotesAndReplies => ViewFilter::notes_and_replies, 154 ViewFilter::All => ViewFilter::identity, 155 ViewFilter::MentionsOnly => ViewFilter::mentions_only, 156 } 157 } 158 } 159 160 /// A timeline view is a filtered view of notes in a timeline. Two standard views 161 /// are "Notes" and "Notes & Replies". A timeline is associated with a Filter, 162 /// but a TimelineTab is a further filtered view of this Filter that can't 163 /// be captured by a Filter itself. 164 #[derive(Default, Debug)] 165 pub struct TimelineTab { 166 pub units: TimelineUnits, 167 pub selection: i32, 168 pub filter: ViewFilter, 169 pub list: Rc<RefCell<VirtualList>>, 170 } 171 172 impl TimelineTab { 173 pub fn new(filter: ViewFilter) -> Self { 174 TimelineTab::new_with_capacity(filter, 1000) 175 } 176 177 pub fn only_notes_and_replies() -> Vec<Self> { 178 vec![TimelineTab::new(ViewFilter::NotesAndReplies)] 179 } 180 181 pub fn no_replies() -> Vec<Self> { 182 vec![TimelineTab::new(ViewFilter::Notes)] 183 } 184 185 pub fn full_tabs() -> Vec<Self> { 186 vec![ 187 TimelineTab::new(ViewFilter::Notes), 188 TimelineTab::new(ViewFilter::NotesAndReplies), 189 ] 190 } 191 192 pub fn notifications() -> Vec<Self> { 193 vec![ 194 TimelineTab::new(ViewFilter::All), 195 TimelineTab::new(ViewFilter::MentionsOnly), 196 ] 197 } 198 199 pub fn new_with_capacity(filter: ViewFilter, cap: usize) -> Self { 200 let selection = 0i32; 201 let mut list = VirtualList::new(); 202 list.hide_on_resize(None); 203 list.over_scan(50.0); 204 let list = Rc::new(RefCell::new(list)); 205 206 TimelineTab { 207 units: TimelineUnits::with_capacity(cap), 208 selection, 209 filter, 210 list, 211 } 212 } 213 214 /// Reset the tab to an empty state, clearing all cached notes. 215 /// 216 /// Used when the contact list changes and we need to rebuild 217 /// the timeline with a new filter. 218 pub fn reset(&mut self) { 219 self.units = TimelineUnits::with_capacity(1000); 220 self.selection = 0; 221 self.list.borrow_mut().reset(); 222 } 223 224 #[profiling::function] 225 fn insert<'a>( 226 &mut self, 227 payloads: Vec<&'a NotePayload>, 228 ndb: &Ndb, 229 txn: &Transaction, 230 reversed: bool, 231 use_front_insert: bool, 232 ) -> Option<UnknownPks<'a>> { 233 if payloads.is_empty() { 234 return None; 235 } 236 237 let num_refs = payloads.len(); 238 239 let resp = self.units.merge_new_notes(payloads, ndb, txn); 240 241 let InsertManyResponse::Some { 242 entries_merged, 243 merge_kind, 244 } = resp.insertion_response 245 else { 246 return resp.tl_response; 247 }; 248 249 let mut list = self.list.borrow_mut(); 250 251 match merge_kind { 252 // TODO: update egui_virtual_list to support spliced inserts 253 MergeKind::Spliced => { 254 tracing::trace!( 255 "spliced when inserting {num_refs} new notes, resetting virtual list", 256 ); 257 list.reset(); 258 } 259 MergeKind::FrontInsert => 's: { 260 if !use_front_insert { 261 break 's; 262 } 263 264 // only run this logic if we're reverse-chronological 265 // reversed in this case means chronological, since the 266 // default is reverse-chronological. yeah it's confusing. 267 if !reversed { 268 debug!("inserting {num_refs} new notes at start"); 269 list.items_inserted_at_start(entries_merged); 270 } 271 } 272 }; 273 274 resp.tl_response 275 } 276 277 pub fn select_down(&mut self) { 278 debug!("select_down {}", self.selection + 1); 279 if self.selection + 1 > self.units.len() as i32 { 280 return; 281 } 282 283 self.selection += 1; 284 } 285 286 pub fn select_up(&mut self) { 287 debug!("select_up {}", self.selection - 1); 288 if self.selection - 1 < 0 { 289 return; 290 } 291 292 self.selection -= 1; 293 } 294 } 295 296 impl<'a> UnknownPks<'a> { 297 pub fn process(&self, unknown_ids: &mut UnknownIds, ndb: &Ndb, txn: &Transaction) { 298 for pk in &self.unknown_pks { 299 unknown_ids.add_pubkey_if_missing(ndb, txn, pk); 300 } 301 } 302 } 303 304 /// A column in a deck. Holds navigation state, loaded notes, column kind, etc. 305 #[derive(Debug)] 306 pub struct Timeline { 307 pub kind: TimelineKind, 308 // We may not have the filter loaded yet, so let's make it an option so 309 // that codepaths have to explicitly handle it 310 pub filter: FilterState, 311 pub views: Vec<TimelineTab>, 312 pub selected_view: usize, 313 pub seen_latest_notes: bool, 314 315 pub subscription: TimelineSub, 316 pub enable_front_insert: bool, 317 318 /// Timestamp (`created_at`) of the contact list note used to build 319 /// the current filter. Used to detect when the contact list has 320 /// changed (e.g., after follow/unfollow) so the filter can be rebuilt. 321 pub contact_list_timestamp: Option<u64>, 322 } 323 324 impl Timeline { 325 /// Create a timeline from a contact list 326 pub fn contact_list(contact_list: &Note, pubkey: &[u8; 32]) -> Result<Self> { 327 let with_hashtags = false; 328 let add_pk = Some(pubkey); 329 let filter = hybrid_contacts_filter(contact_list, add_pk, with_hashtags)?; 330 331 Ok(Timeline::new( 332 TimelineKind::contact_list(Pubkey::new(*pubkey)), 333 FilterState::ready_hybrid(filter), 334 TimelineTab::full_tabs(), 335 )) 336 } 337 338 pub fn last_per_pubkey(list: &Note, list_kind: &ListKind) -> Result<Self> { 339 let kind = 1; 340 let notes_per_pk = 1; 341 let filter = filter::last_n_per_pubkey_from_tags(list, kind, notes_per_pk)?; 342 343 Ok(Timeline::new( 344 TimelineKind::last_per_pubkey(list_kind.clone()), 345 FilterState::ready(filter), 346 TimelineTab::only_notes_and_replies(), 347 )) 348 } 349 350 /// Create a hashtag timeline with ready filters. 351 pub fn hashtag(hashtag: Vec<String>) -> Self { 352 let filters = hashtag 353 .iter() 354 .filter(|tag| !tag.is_empty()) 355 .map(|tag| { 356 Filter::new() 357 .kinds([1]) 358 .limit(filter::default_limit()) 359 .tags([tag.as_str()], 't') 360 .build() 361 }) 362 .collect::<Vec<_>>(); 363 364 if filters.is_empty() { 365 warn!(?hashtag, "hashtag timeline created with no usable tags"); 366 } 367 368 Timeline::new( 369 TimelineKind::Hashtag(hashtag), 370 FilterState::ready(filters), 371 TimelineTab::only_notes_and_replies(), 372 ) 373 } 374 375 pub fn make_view_id(id: &TimelineKind, col: usize, selected_view: usize) -> egui::Id { 376 egui::Id::new((id, selected_view, col)) 377 } 378 379 pub fn view_id(&self, col: usize) -> egui::Id { 380 Timeline::make_view_id(&self.kind, col, self.selected_view) 381 } 382 383 pub fn new(kind: TimelineKind, filter_state: FilterState, views: Vec<TimelineTab>) -> Self { 384 let subscription = TimelineSub::default(); 385 let selected_view = 0; 386 387 // by default, disabled for profiles since they contain widgets above the list items 388 let enable_front_insert = !matches!(kind, TimelineKind::Profile(_)); 389 390 Timeline { 391 kind, 392 filter: filter_state, 393 views, 394 subscription, 395 selected_view, 396 enable_front_insert, 397 seen_latest_notes: false, 398 contact_list_timestamp: None, 399 } 400 } 401 402 pub fn current_view(&self) -> &TimelineTab { 403 &self.views[self.selected_view] 404 } 405 406 pub fn current_view_mut(&mut self) -> &mut TimelineTab { 407 &mut self.views[self.selected_view] 408 } 409 410 /// Get the note refs for the filter with the widest scope 411 pub fn all_or_any_entries(&self) -> &TimelineUnits { 412 let widest_filter = self 413 .views 414 .iter() 415 .map(|view| view.filter) 416 .max() 417 .expect("at least one filter exists"); 418 419 self.entries(widest_filter) 420 .expect("should have at least notes") 421 } 422 423 pub fn entries(&self, view: ViewFilter) -> Option<&TimelineUnits> { 424 self.view(view).map(|v| &v.units) 425 } 426 427 pub fn latest_note(&self, view: ViewFilter) -> Option<&NoteRef> { 428 self.view(view).and_then(|v| v.units.latest()) 429 } 430 431 pub fn view(&self, view: ViewFilter) -> Option<&TimelineTab> { 432 self.views.iter().find(|tab| tab.filter == view) 433 } 434 435 pub fn view_mut(&mut self, view: ViewFilter) -> Option<&mut TimelineTab> { 436 self.views.iter_mut().find(|tab| tab.filter == view) 437 } 438 439 /// Reset all views to an empty state, clearing all cached notes. 440 /// 441 /// Used when the contact list changes and we need to rebuild 442 /// the timeline with a new filter. 443 pub fn reset_views(&mut self) { 444 for view in &mut self.views { 445 view.reset(); 446 } 447 } 448 449 /// Initial insert of notes into a timeline. Subsequent inserts should 450 /// just use the insert function 451 #[profiling::function] 452 pub fn insert_new( 453 &mut self, 454 txn: &Transaction, 455 ndb: &Ndb, 456 note_cache: &mut NoteCache, 457 notes: &[NoteRef], 458 ) -> Option<UnknownPksOwned> { 459 let filters = { 460 let views = &self.views; 461 let filters: Vec<fn(&CachedNote, &Note) -> bool> = 462 views.iter().map(|v| v.filter.filter()).collect(); 463 filters 464 }; 465 466 let now = unix_time_secs(); 467 let mut unknown_pks = HashSet::new(); 468 for note_ref in notes { 469 profiling::scope!("inserting notes"); 470 if is_future_timestamp(note_ref.created_at, now) { 471 continue; 472 } 473 474 for (view, filter) in filters.iter().enumerate() { 475 if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { 476 if filter( 477 note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), 478 ¬e, 479 ) { 480 if let Some(resp) = self.views[view] 481 .units 482 .merge_new_notes( 483 vec![&NotePayload { 484 note, 485 key: note_ref.key, 486 }], 487 ndb, 488 txn, 489 ) 490 .tl_response 491 { 492 let pks: HashSet<Pubkey> = resp 493 .unknown_pks 494 .into_iter() 495 .map(|r| Pubkey::new(*r)) 496 .collect(); 497 498 unknown_pks.extend(pks); 499 } 500 } 501 } 502 } 503 } 504 505 Some(UnknownPksOwned { pks: unknown_pks }) 506 } 507 508 /// The main function used for inserting notes into timelines. Handles 509 /// inserting into multiple views if we have them. All timeline note 510 /// insertions should use this function. 511 #[profiling::function] 512 pub fn insert( 513 &mut self, 514 new_note_ids: &[NoteKey], 515 ndb: &Ndb, 516 txn: &Transaction, 517 unknown_ids: &mut UnknownIds, 518 note_cache: &mut NoteCache, 519 reversed: bool, 520 ) -> Result<()> { 521 let mut payloads: Vec<NotePayload> = Vec::with_capacity(new_note_ids.len()); 522 let now = unix_time_secs(); 523 524 for key in new_note_ids { 525 let note = if let Ok(note) = ndb.get_note_by_key(txn, *key) { 526 note 527 } else { 528 error!( 529 "hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", 530 key 531 ); 532 continue; 533 }; 534 535 if is_future_timestamp(note.created_at(), now) { 536 continue; 537 } 538 539 // Ensure that unknown ids are captured when inserting notes 540 // into the timeline 541 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e); 542 543 payloads.push(NotePayload { note, key: *key }); 544 } 545 546 for view in &mut self.views { 547 let should_include = view.filter.filter(); 548 let mut filtered_payloads = Vec::with_capacity(payloads.len()); 549 for payload in &payloads { 550 let cached_note = note_cache.cached_note_or_insert(payload.key, &payload.note); 551 552 if should_include(cached_note, &payload.note) { 553 filtered_payloads.push(payload); 554 } 555 } 556 557 if let Some(res) = view.insert( 558 filtered_payloads, 559 ndb, 560 txn, 561 reversed, 562 self.enable_front_insert, 563 ) { 564 res.process(unknown_ids, ndb, txn); 565 } 566 } 567 568 Ok(()) 569 } 570 571 #[profiling::function] 572 pub fn poll_notes_into_view( 573 &mut self, 574 account_pk: &Pubkey, 575 ndb: &Ndb, 576 txn: &Transaction, 577 unknown_ids: &mut UnknownIds, 578 note_cache: &mut NoteCache, 579 reversed: bool, 580 ) -> Result<()> { 581 if !self.kind.should_subscribe_locally() { 582 // don't need to poll for timelines that don't have local subscriptions 583 return Ok(()); 584 } 585 586 let sub = self 587 .subscription 588 .get_local(account_pk) 589 .ok_or(Error::App(notedeck::Error::no_active_sub()))?; 590 591 let new_note_ids = { 592 profiling::scope!("big ndb poll"); 593 ndb.poll_for_notes(sub, 500) 594 }; 595 if new_note_ids.is_empty() { 596 return Ok(()); 597 } else { 598 self.seen_latest_notes = false; 599 } 600 601 self.insert(&new_note_ids, ndb, txn, unknown_ids, note_cache, reversed) 602 } 603 604 /// Invalidate the timeline, forcing a rebuild on the next check. 605 /// 606 /// This resets all relay states to [`FilterState::NeedsRemote`] and 607 /// clears the contact list timestamp, which will trigger the filter 608 /// rebuild flow when the timeline is next polled. 609 /// 610 /// Note: We reset states rather than clearing them so that 611 /// [`Self::set_all_states`] can update them during the rebuild. 612 pub fn invalidate(&mut self) { 613 self.filter = FilterState::NeedsRemote; 614 self.contact_list_timestamp = None; 615 } 616 } 617 618 pub struct UnknownPksOwned { 619 pub pks: HashSet<Pubkey>, 620 } 621 622 impl UnknownPksOwned { 623 pub fn process(&self, ndb: &Ndb, txn: &Transaction, unknown_ids: &mut UnknownIds) { 624 self.pks 625 .iter() 626 .for_each(|p| unknown_ids.add_pubkey_if_missing(ndb, txn, p)); 627 } 628 } 629 630 pub enum MergeKind { 631 FrontInsert, 632 Spliced, 633 } 634 635 pub fn merge_sorted_vecs<T: Ord + Copy>(vec1: &[T], vec2: &[T]) -> (Vec<T>, MergeKind) { 636 let mut merged = Vec::with_capacity(vec1.len() + vec2.len()); 637 let mut i = 0; 638 let mut j = 0; 639 let mut result: Option<MergeKind> = None; 640 641 while i < vec1.len() && j < vec2.len() { 642 if vec1[i] <= vec2[j] { 643 if result.is_none() && j < vec2.len() { 644 // if we're pushing from our large list and still have 645 // some left in vec2, then this is a splice 646 result = Some(MergeKind::Spliced); 647 } 648 merged.push(vec1[i]); 649 i += 1; 650 } else { 651 merged.push(vec2[j]); 652 j += 1; 653 } 654 } 655 656 // Append any remaining elements from either vector 657 if i < vec1.len() { 658 merged.extend_from_slice(&vec1[i..]); 659 } 660 if j < vec2.len() { 661 merged.extend_from_slice(&vec2[j..]); 662 } 663 664 (merged, result.unwrap_or(MergeKind::FrontInsert)) 665 } 666 667 /// When adding a new timeline, we may have a situation where the 668 /// FilterState is NeedsRemote. This can happen if we don't yet have the 669 /// contact list, etc. For these situations, we query all of the relays 670 /// with the same sub_id. We keep track of this sub_id and update the 671 /// filter with the latest version of the returned filter (ie contact 672 /// list) when they arrive. 673 /// 674 /// We do this by maintaining this sub_id in the filter state, even when 675 /// in the ready state. See: [`FilterReady`] 676 pub fn setup_new_timeline( 677 timeline: &mut Timeline, 678 ndb: &Ndb, 679 txn: &Transaction, 680 scoped_subs: &mut ScopedSubApi<'_, '_>, 681 since_optimize: bool, 682 accounts: &Accounts, 683 ) { 684 let account_pk = *accounts.selected_account_pubkey(); 685 686 // if we're ready, setup local subs 687 if is_timeline_ready(ndb, scoped_subs, timeline, accounts) { 688 if let Err(err) = setup_initial_timeline(ndb, timeline, account_pk) { 689 error!("setup_new_timeline: {err}"); 690 } 691 } 692 693 send_initial_timeline_filter(since_optimize, ndb, txn, timeline, accounts, scoped_subs); 694 timeline.subscription.increment(account_pk); 695 } 696 697 pub fn send_initial_timeline_filter( 698 can_since_optimize: bool, 699 ndb: &Ndb, 700 txn: &Transaction, 701 timeline: &mut Timeline, 702 accounts: &Accounts, 703 scoped_subs: &mut ScopedSubApi<'_, '_>, 704 ) { 705 match &timeline.filter { 706 FilterState::Broken(err) => { 707 error!( 708 "FetchingRemote state in broken state when sending initial timeline filter? {err}" 709 ); 710 } 711 712 FilterState::FetchingRemote => { 713 error!("FetchingRemote state when sending initial timeline filter?"); 714 } 715 716 FilterState::GotRemote => { 717 error!("GotRemote state when sending initial timeline filter?"); 718 } 719 720 FilterState::Ready(filter) => { 721 let filter = filter.to_owned(); 722 let new_filters: Vec<Filter> = filter.remote().to_owned().into_iter().map(|f| { 723 // limit the size of remote filters 724 let default_limit = filter::default_remote_limit(); 725 let mut lim = f.limit().unwrap_or(default_limit); 726 let mut filter = f; 727 if lim > default_limit { 728 lim = default_limit; 729 filter = filter.limit_mut(lim); 730 } 731 732 let entries = timeline.all_or_any_entries(); 733 734 // Should we since optimize? Not always. For example 735 // if we only have a few notes locally. One way to 736 // determine this is by looking at the current filter 737 // and seeing what its limit is. If we have less 738 // notes than the limit, we might want to backfill 739 // older notes 740 if can_since_optimize && filter::should_since_optimize(lim, entries.len()) { 741 filter = filter::since_optimize_filter(filter, entries.latest()); 742 } else { 743 warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", &timeline.kind); 744 } 745 746 filter 747 }).collect(); 748 749 update_remote_timeline_subscription(timeline, new_filters, scoped_subs); 750 } 751 752 // we need some data first 753 FilterState::NeedsRemote => match &timeline.kind { 754 TimelineKind::List(ListKind::PeopleList(_)) 755 | TimelineKind::Algo(AlgoTimeline::LastPerPubkey(ListKind::PeopleList(_))) => { 756 fetch_people_list(ndb, txn, timeline); 757 } 758 _ => fetch_contact_list(timeline, accounts), 759 }, 760 } 761 } 762 763 pub fn fetch_contact_list(timeline: &mut Timeline, accounts: &Accounts) { 764 if matches!(&timeline.filter, FilterState::Ready(_)) { 765 return; 766 } 767 768 let new_filter_state = match accounts.get_selected_account().data.contacts.get_state() { 769 ContactState::Unreceived => FilterState::FetchingRemote, 770 ContactState::Received { 771 contacts: _, 772 note_key: _, 773 timestamp: _, 774 } => FilterState::GotRemote, 775 }; 776 777 timeline.filter = new_filter_state; 778 } 779 780 pub fn fetch_people_list(ndb: &Ndb, txn: &Transaction, timeline: &mut Timeline) { 781 if matches!(&timeline.filter, FilterState::Ready(_)) { 782 return; 783 } 784 785 let Some(plr) = people_list_ref(&timeline.kind) else { 786 error!("fetch_people_list called for non-people-list timeline"); 787 timeline.filter = FilterState::broken(FilterError::EmptyList); 788 return; 789 }; 790 791 let filter = people_list_note_filter(plr); 792 793 let results = match ndb.query(txn, std::slice::from_ref(&filter), 1) { 794 Ok(results) => results, 795 Err(err) => { 796 error!("people list query failed in fetch_people_list: {err}"); 797 timeline.filter = FilterState::broken(FilterError::EmptyList); 798 return; 799 } 800 }; 801 802 if results.is_empty() { 803 timeline.filter = FilterState::FetchingRemote; 804 return; 805 } 806 807 timeline.filter = FilterState::GotRemote; 808 } 809 810 /// Set up the local NDB subscription for a timeline without running 811 /// blocking queries. The actual note loading is handled by the async 812 /// timeline loader. 813 #[profiling::function] 814 fn setup_initial_timeline(ndb: &Ndb, timeline: &mut Timeline, account_pk: Pubkey) -> Result<()> { 815 let FilterState::Ready(filters) = &timeline.filter else { 816 return Err(Error::App(notedeck::Error::empty_contact_list())); 817 }; 818 819 // some timelines are one-shot and refreshed, like last_per_pubkey algo feed 820 if timeline.kind.should_subscribe_locally() { 821 timeline 822 .subscription 823 .try_add_local(account_pk, ndb, filters); 824 } 825 826 Ok(()) 827 } 828 829 #[profiling::function] 830 pub fn setup_initial_nostrdb_subs( 831 ndb: &Ndb, 832 timeline_cache: &mut TimelineCache, 833 account_pk: Pubkey, 834 ) -> Result<()> { 835 for (_kind, timeline) in timeline_cache { 836 if timeline.subscription.dependers(&account_pk) == 0 { 837 continue; 838 } 839 840 if let Err(err) = setup_initial_timeline(ndb, timeline, account_pk) { 841 error!("setup_initial_nostrdb_subs: {err}"); 842 } 843 } 844 845 Ok(()) 846 } 847 848 /// Check our timeline filter and see if we have any filter data ready. 849 /// Our timelines may require additional data before it is functional. For 850 /// example, when we have to fetch a contact list before we do the actual 851 /// following list query. 852 #[profiling::function] 853 pub fn is_timeline_ready( 854 ndb: &Ndb, 855 scoped_subs: &mut ScopedSubApi<'_, '_>, 856 timeline: &mut Timeline, 857 accounts: &Accounts, 858 ) -> bool { 859 // TODO: we should debounce the filter states a bit to make sure we have 860 // seen all of the different contact lists from each relay 861 if let FilterState::Ready(filter) = &timeline.filter { 862 let account_pk = *accounts.selected_account_pubkey(); 863 let remote_filters = filter.remote().to_vec(); 864 if timeline.subscription.dependers(&account_pk) > 0 865 && !timeline.subscription.remote_seeded(&account_pk) 866 { 867 ensure_remote_timeline_subscription(timeline, account_pk, remote_filters, scoped_subs); 868 } 869 return true; 870 } 871 872 if !matches!(&timeline.filter, FilterState::GotRemote) { 873 return false; 874 } 875 876 let note_key = match &timeline.kind { 877 TimelineKind::List(ListKind::Contact(_)) 878 | TimelineKind::Algo(AlgoTimeline::LastPerPubkey(ListKind::Contact(_))) => { 879 let ContactState::Received { 880 contacts: _, 881 note_key, 882 timestamp: _, 883 } = accounts.get_selected_account().data.contacts.get_state() 884 else { 885 return false; 886 }; 887 888 *note_key 889 } 890 TimelineKind::List(ListKind::PeopleList(plr)) 891 | TimelineKind::Algo(AlgoTimeline::LastPerPubkey(ListKind::PeopleList(plr))) => { 892 let list_filter = people_list_note_filter(plr); 893 let txn = Transaction::new(ndb).expect("txn"); 894 let results = match ndb.query(&txn, std::slice::from_ref(&list_filter), 1) { 895 Ok(results) => results, 896 Err(err) => { 897 error!("people list query failed in is_timeline_ready: {err}"); 898 return false; 899 } 900 }; 901 902 if results.is_empty() { 903 debug!("people list note not yet in ndb for {:?}", plr); 904 return false; 905 } 906 907 info!("found people list note after GotRemote!"); 908 results[0].note_key 909 } 910 _ => return false, 911 }; 912 913 let with_hashtags = false; 914 915 let filter = { 916 let txn = Transaction::new(ndb).expect("txn"); 917 let note = ndb.get_note_by_key(&txn, note_key).expect("note"); 918 let add_pk = timeline.kind.pubkey().map(|pk| pk.bytes()); 919 920 hybrid_contacts_filter(¬e, add_pk, with_hashtags) 921 }; 922 923 // TODO: into_follow_filter is hardcoded to contact lists, let's generalize 924 match filter { 925 Err(notedeck::Error::Filter(e)) => { 926 error!("got broken when building filter {e}"); 927 timeline.filter = FilterState::broken(e); 928 false 929 } 930 Err(err) => { 931 error!("got broken when building filter {err}"); 932 let reason = match &timeline.kind { 933 TimelineKind::List(ListKind::PeopleList(_)) 934 | TimelineKind::Algo(AlgoTimeline::LastPerPubkey(ListKind::PeopleList(_))) => { 935 FilterError::EmptyList 936 } 937 _ => FilterError::EmptyContactList, 938 }; 939 timeline.filter = FilterState::broken(reason); 940 false 941 } 942 Ok(filter) => { 943 // We just switched to the ready state; remote subscriptions can start now. 944 info!("Found list note! Setting up remote timeline query"); 945 timeline.filter = FilterState::ready_hybrid(filter.clone()); 946 947 update_remote_timeline_subscription(timeline, filter.remote().to_vec(), scoped_subs); 948 true 949 } 950 } 951 } 952 953 fn people_list_ref(kind: &TimelineKind) -> Option<&PeopleListRef> { 954 match kind { 955 TimelineKind::List(ListKind::PeopleList(plr)) 956 | TimelineKind::Algo(AlgoTimeline::LastPerPubkey(ListKind::PeopleList(plr))) => Some(plr), 957 _ => None, 958 } 959 }