mod.rs (27363B)
1 use crate::{ 2 error::Error, 3 multi_subscriber::TimelineSub, 4 subscriptions::{self, SubKind, Subscriptions}, 5 timeline::kind::ListKind, 6 Result, 7 }; 8 9 use notedeck::{ 10 contacts::hybrid_contacts_filter, 11 debouncer::Debouncer, 12 filter::{self, HybridFilter}, 13 tr, Accounts, CachedNote, ContactState, FilterError, FilterState, FilterStates, Localization, 14 NoteCache, NoteRef, UnknownIds, 15 }; 16 17 use egui_virtual_list::VirtualList; 18 use enostr::{PoolRelay, Pubkey, RelayPool}; 19 use nostrdb::{Filter, Ndb, Note, NoteKey, Transaction}; 20 use std::{ 21 cell::RefCell, 22 time::{Duration, UNIX_EPOCH}, 23 }; 24 use std::{rc::Rc, time::SystemTime}; 25 26 use tracing::{debug, error, info, warn}; 27 28 pub mod cache; 29 pub mod kind; 30 pub mod route; 31 pub mod thread; 32 33 pub use cache::TimelineCache; 34 pub use kind::{ColumnTitle, PubkeySource, ThreadSelection, TimelineKind}; 35 36 //#[derive(Debug, Hash, Clone, Eq, PartialEq)] 37 //pub type TimelineId = TimelineKind; 38 39 /* 40 41 impl TimelineId { 42 pub fn kind(&self) -> &TimelineKind { 43 &self.kind 44 } 45 46 pub fn new(id: TimelineKind) -> Self { 47 TimelineId(id) 48 } 49 50 pub fn profile(pubkey: Pubkey) -> Self { 51 TimelineId::new(TimelineKind::Profile(PubkeySource::pubkey(pubkey))) 52 } 53 } 54 55 impl fmt::Display for TimelineId { 56 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 57 write!(f, "TimelineId({})", self.0) 58 } 59 } 60 */ 61 62 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] 63 pub enum ViewFilter { 64 Notes, 65 66 #[default] 67 NotesAndReplies, 68 } 69 70 impl ViewFilter { 71 pub fn name(&self, i18n: &mut Localization) -> String { 72 match self { 73 ViewFilter::Notes => tr!(i18n, "Notes", "Filter label for notes only view"), 74 ViewFilter::NotesAndReplies => { 75 tr!( 76 i18n, 77 "Notes & Replies", 78 "Filter label for notes and replies view" 79 ) 80 } 81 } 82 } 83 84 pub fn filter_notes(cache: &CachedNote, note: &Note) -> bool { 85 !cache.reply.borrow(note.tags()).is_reply() 86 } 87 88 fn identity(_cache: &CachedNote, _note: &Note) -> bool { 89 true 90 } 91 92 pub fn filter(&self) -> fn(&CachedNote, &Note) -> bool { 93 match self { 94 ViewFilter::Notes => ViewFilter::filter_notes, 95 ViewFilter::NotesAndReplies => ViewFilter::identity, 96 } 97 } 98 } 99 100 /// A timeline view is a filtered view of notes in a timeline. Two standard views 101 /// are "Notes" and "Notes & Replies". A timeline is associated with a Filter, 102 /// but a TimelineTab is a further filtered view of this Filter that can't 103 /// be captured by a Filter itself. 104 #[derive(Default, Debug)] 105 pub struct TimelineTab { 106 pub notes: Vec<NoteRef>, 107 pub selection: i32, 108 pub filter: ViewFilter, 109 pub list: Rc<RefCell<VirtualList>>, 110 pub freshness: NotesFreshness, 111 } 112 113 impl TimelineTab { 114 pub fn new(filter: ViewFilter) -> Self { 115 TimelineTab::new_with_capacity(filter, 1000) 116 } 117 118 pub fn only_notes_and_replies() -> Vec<Self> { 119 vec![TimelineTab::new(ViewFilter::NotesAndReplies)] 120 } 121 122 pub fn no_replies() -> Vec<Self> { 123 vec![TimelineTab::new(ViewFilter::Notes)] 124 } 125 126 pub fn full_tabs() -> Vec<Self> { 127 vec![ 128 TimelineTab::new(ViewFilter::Notes), 129 TimelineTab::new(ViewFilter::NotesAndReplies), 130 ] 131 } 132 133 pub fn new_with_capacity(filter: ViewFilter, cap: usize) -> Self { 134 let selection = 0i32; 135 let mut list = VirtualList::new(); 136 list.hide_on_resize(None); 137 list.over_scan(50.0); 138 let list = Rc::new(RefCell::new(list)); 139 let notes: Vec<NoteRef> = Vec::with_capacity(cap); 140 141 TimelineTab { 142 notes, 143 selection, 144 filter, 145 list, 146 freshness: NotesFreshness::default(), 147 } 148 } 149 150 fn insert(&mut self, new_refs: &[NoteRef], reversed: bool) { 151 if new_refs.is_empty() { 152 return; 153 } 154 let num_prev_items = self.notes.len(); 155 let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs); 156 157 self.notes = notes; 158 let new_items = self.notes.len() - num_prev_items; 159 160 // TODO: technically items could have been added inbetween 161 if new_items > 0 { 162 let mut list = self.list.borrow_mut(); 163 164 match merge_kind { 165 // TODO: update egui_virtual_list to support spliced inserts 166 MergeKind::Spliced => { 167 debug!( 168 "spliced when inserting {} new notes, resetting virtual list", 169 new_refs.len() 170 ); 171 list.reset(); 172 } 173 MergeKind::FrontInsert => { 174 // only run this logic if we're reverse-chronological 175 // reversed in this case means chronological, since the 176 // default is reverse-chronological. yeah it's confusing. 177 if !reversed { 178 debug!("inserting {} new notes at start", new_refs.len()); 179 list.items_inserted_at_start(new_items); 180 } 181 } 182 } 183 } 184 } 185 186 pub fn select_down(&mut self) { 187 debug!("select_down {}", self.selection + 1); 188 if self.selection + 1 > self.notes.len() as i32 { 189 return; 190 } 191 192 self.selection += 1; 193 } 194 195 pub fn select_up(&mut self) { 196 debug!("select_up {}", self.selection - 1); 197 if self.selection - 1 < 0 { 198 return; 199 } 200 201 self.selection -= 1; 202 } 203 } 204 205 /// A column in a deck. Holds navigation state, loaded notes, column kind, etc. 206 #[derive(Debug)] 207 pub struct Timeline { 208 pub kind: TimelineKind, 209 // We may not have the filter loaded yet, so let's make it an option so 210 // that codepaths have to explicitly handle it 211 pub filter: FilterStates, 212 pub views: Vec<TimelineTab>, 213 pub selected_view: usize, 214 215 pub subscription: TimelineSub, 216 } 217 218 impl Timeline { 219 /// Create a timeline from a contact list 220 pub fn contact_list(contact_list: &Note, pubkey: &[u8; 32]) -> Result<Self> { 221 let with_hashtags = false; 222 let add_pk = Some(pubkey); 223 let filter = hybrid_contacts_filter(contact_list, add_pk, with_hashtags)?; 224 225 Ok(Timeline::new( 226 TimelineKind::contact_list(Pubkey::new(*pubkey)), 227 FilterState::ready_hybrid(filter), 228 TimelineTab::full_tabs(), 229 )) 230 } 231 232 pub fn last_per_pubkey(list: &Note, list_kind: &ListKind) -> Result<Self> { 233 let kind = 1; 234 let notes_per_pk = 1; 235 let filter = filter::last_n_per_pubkey_from_tags(list, kind, notes_per_pk)?; 236 237 Ok(Timeline::new( 238 TimelineKind::last_per_pubkey(*list_kind), 239 FilterState::ready(filter), 240 TimelineTab::only_notes_and_replies(), 241 )) 242 } 243 244 pub fn hashtag(hashtag: Vec<String>) -> Self { 245 let filters = hashtag 246 .iter() 247 .filter(|tag| !tag.is_empty()) 248 .map(|tag| { 249 Filter::new() 250 .kinds([1]) 251 .limit(filter::default_limit()) 252 .tags([tag.as_str()], 't') 253 .build() 254 }) 255 .collect::<Vec<_>>(); 256 257 Timeline::new( 258 TimelineKind::Hashtag(hashtag), 259 FilterState::ready(filters), 260 TimelineTab::only_notes_and_replies(), 261 ) 262 } 263 264 pub fn make_view_id(id: &TimelineKind, col: usize, selected_view: usize) -> egui::Id { 265 egui::Id::new((id, selected_view, col)) 266 } 267 268 pub fn view_id(&self, col: usize) -> egui::Id { 269 Timeline::make_view_id(&self.kind, col, self.selected_view) 270 } 271 272 pub fn new(kind: TimelineKind, filter_state: FilterState, views: Vec<TimelineTab>) -> Self { 273 let filter = FilterStates::new(filter_state); 274 let subscription = TimelineSub::default(); 275 let selected_view = 0; 276 277 Timeline { 278 kind, 279 filter, 280 views, 281 subscription, 282 selected_view, 283 } 284 } 285 286 pub fn current_view(&self) -> &TimelineTab { 287 &self.views[self.selected_view] 288 } 289 290 pub fn current_view_mut(&mut self) -> &mut TimelineTab { 291 &mut self.views[self.selected_view] 292 } 293 294 /// Get the note refs for NotesAndReplies. If we only have Notes, then 295 /// just return that instead 296 pub fn all_or_any_notes(&self) -> &[NoteRef] { 297 self.notes(ViewFilter::NotesAndReplies).unwrap_or_else(|| { 298 self.notes(ViewFilter::Notes) 299 .expect("should have at least notes") 300 }) 301 } 302 303 pub fn notes(&self, view: ViewFilter) -> Option<&[NoteRef]> { 304 self.view(view).map(|v| &*v.notes) 305 } 306 307 pub fn view(&self, view: ViewFilter) -> Option<&TimelineTab> { 308 self.views.iter().find(|tab| tab.filter == view) 309 } 310 311 pub fn view_mut(&mut self, view: ViewFilter) -> Option<&mut TimelineTab> { 312 self.views.iter_mut().find(|tab| tab.filter == view) 313 } 314 315 /// Initial insert of notes into a timeline. Subsequent inserts should 316 /// just use the insert function 317 pub fn insert_new( 318 &mut self, 319 txn: &Transaction, 320 ndb: &Ndb, 321 note_cache: &mut NoteCache, 322 notes: &[NoteRef], 323 ) { 324 let filters = { 325 let views = &self.views; 326 let filters: Vec<fn(&CachedNote, &Note) -> bool> = 327 views.iter().map(|v| v.filter.filter()).collect(); 328 filters 329 }; 330 331 for note_ref in notes { 332 for (view, filter) in filters.iter().enumerate() { 333 if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { 334 if filter( 335 note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), 336 ¬e, 337 ) { 338 self.views[view].notes.push(*note_ref) 339 } 340 } 341 } 342 } 343 } 344 345 /// The main function used for inserting notes into timelines. Handles 346 /// inserting into multiple views if we have them. All timeline note 347 /// insertions should use this function. 348 pub fn insert( 349 &mut self, 350 new_note_ids: &[NoteKey], 351 ndb: &Ndb, 352 txn: &Transaction, 353 unknown_ids: &mut UnknownIds, 354 note_cache: &mut NoteCache, 355 reversed: bool, 356 ) -> Result<()> { 357 let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len()); 358 359 for key in new_note_ids { 360 let note = if let Ok(note) = ndb.get_note_by_key(txn, *key) { 361 note 362 } else { 363 error!( 364 "hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", 365 key 366 ); 367 continue; 368 }; 369 370 // Ensure that unknown ids are captured when inserting notes 371 // into the timeline 372 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e); 373 374 let created_at = note.created_at(); 375 new_refs.push(( 376 note, 377 NoteRef { 378 key: *key, 379 created_at, 380 }, 381 )); 382 } 383 384 for view in &mut self.views { 385 match view.filter { 386 ViewFilter::NotesAndReplies => { 387 let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect(); 388 389 view.insert(&refs, reversed); 390 } 391 392 ViewFilter::Notes => { 393 let mut filtered_refs = Vec::with_capacity(new_refs.len()); 394 for (note, nr) in &new_refs { 395 let cached_note = note_cache.cached_note_or_insert(nr.key, note); 396 397 if ViewFilter::filter_notes(cached_note, note) { 398 filtered_refs.push(*nr); 399 } 400 } 401 402 view.insert(&filtered_refs, reversed); 403 } 404 } 405 } 406 407 Ok(()) 408 } 409 410 pub fn poll_notes_into_view( 411 &mut self, 412 ndb: &Ndb, 413 txn: &Transaction, 414 unknown_ids: &mut UnknownIds, 415 note_cache: &mut NoteCache, 416 reversed: bool, 417 ) -> Result<()> { 418 if !self.kind.should_subscribe_locally() { 419 // don't need to poll for timelines that don't have local subscriptions 420 return Ok(()); 421 } 422 423 let sub = self 424 .subscription 425 .get_local() 426 .ok_or(Error::App(notedeck::Error::no_active_sub()))?; 427 428 let new_note_ids = ndb.poll_for_notes(sub, 500); 429 if new_note_ids.is_empty() { 430 return Ok(()); 431 } else { 432 debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids); 433 } 434 435 self.insert(&new_note_ids, ndb, txn, unknown_ids, note_cache, reversed) 436 } 437 } 438 439 pub enum MergeKind { 440 FrontInsert, 441 Spliced, 442 } 443 444 pub fn merge_sorted_vecs<T: Ord + Copy>(vec1: &[T], vec2: &[T]) -> (Vec<T>, MergeKind) { 445 let mut merged = Vec::with_capacity(vec1.len() + vec2.len()); 446 let mut i = 0; 447 let mut j = 0; 448 let mut result: Option<MergeKind> = None; 449 450 while i < vec1.len() && j < vec2.len() { 451 if vec1[i] <= vec2[j] { 452 if result.is_none() && j < vec2.len() { 453 // if we're pushing from our large list and still have 454 // some left in vec2, then this is a splice 455 result = Some(MergeKind::Spliced); 456 } 457 merged.push(vec1[i]); 458 i += 1; 459 } else { 460 merged.push(vec2[j]); 461 j += 1; 462 } 463 } 464 465 // Append any remaining elements from either vector 466 if i < vec1.len() { 467 merged.extend_from_slice(&vec1[i..]); 468 } 469 if j < vec2.len() { 470 merged.extend_from_slice(&vec2[j..]); 471 } 472 473 (merged, result.unwrap_or(MergeKind::FrontInsert)) 474 } 475 476 /// When adding a new timeline, we may have a situation where the 477 /// FilterState is NeedsRemote. This can happen if we don't yet have the 478 /// contact list, etc. For these situations, we query all of the relays 479 /// with the same sub_id. We keep track of this sub_id and update the 480 /// filter with the latest version of the returned filter (ie contact 481 /// list) when they arrive. 482 /// 483 /// We do this by maintaining this sub_id in the filter state, even when 484 /// in the ready state. See: [`FilterReady`] 485 #[allow(clippy::too_many_arguments)] 486 pub fn setup_new_timeline( 487 timeline: &mut Timeline, 488 ndb: &Ndb, 489 txn: &Transaction, 490 subs: &mut Subscriptions, 491 pool: &mut RelayPool, 492 note_cache: &mut NoteCache, 493 since_optimize: bool, 494 accounts: &Accounts, 495 ) { 496 // if we're ready, setup local subs 497 if is_timeline_ready(ndb, pool, note_cache, timeline, accounts) { 498 if let Err(err) = setup_timeline_nostrdb_sub(ndb, txn, note_cache, timeline) { 499 error!("setup_new_timeline: {err}"); 500 } 501 } 502 503 for relay in &mut pool.relays { 504 send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts); 505 } 506 timeline.subscription.increment(); 507 } 508 509 /// Send initial filters for a specific relay. This typically gets called 510 /// when we first connect to a new relay for the first time. For 511 /// situations where you are adding a new timeline, use 512 /// setup_new_timeline. 513 pub fn send_initial_timeline_filters( 514 since_optimize: bool, 515 timeline_cache: &mut TimelineCache, 516 subs: &mut Subscriptions, 517 pool: &mut RelayPool, 518 relay_id: &str, 519 accounts: &Accounts, 520 ) -> Option<()> { 521 info!("Sending initial filters to {}", relay_id); 522 let relay = &mut pool.relays.iter_mut().find(|r| r.url() == relay_id)?; 523 524 for (_kind, timeline) in timeline_cache { 525 send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts); 526 } 527 528 Some(()) 529 } 530 531 pub fn send_initial_timeline_filter( 532 can_since_optimize: bool, 533 subs: &mut Subscriptions, 534 relay: &mut PoolRelay, 535 timeline: &mut Timeline, 536 accounts: &Accounts, 537 ) { 538 let filter_state = timeline.filter.get_mut(relay.url()); 539 540 match filter_state { 541 FilterState::Broken(err) => { 542 error!( 543 "FetchingRemote state in broken state when sending initial timeline filter? {err}" 544 ); 545 } 546 547 FilterState::FetchingRemote(_unisub) => { 548 error!("FetchingRemote state when sending initial timeline filter?"); 549 } 550 551 FilterState::GotRemote(_sub) => { 552 error!("GotRemote state when sending initial timeline filter?"); 553 } 554 555 FilterState::Ready(filter) => { 556 let filter = filter.to_owned(); 557 let new_filters: Vec<Filter> = filter.remote().to_owned().into_iter().map(|f| { 558 // limit the size of remote filters 559 let default_limit = filter::default_remote_limit(); 560 let mut lim = f.limit().unwrap_or(default_limit); 561 let mut filter = f; 562 if lim > default_limit { 563 lim = default_limit; 564 filter = filter.limit_mut(lim); 565 } 566 567 let notes = timeline.all_or_any_notes(); 568 569 // Should we since optimize? Not always. For example 570 // if we only have a few notes locally. One way to 571 // determine this is by looking at the current filter 572 // and seeing what its limit is. If we have less 573 // notes than the limit, we might want to backfill 574 // older notes 575 if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { 576 filter = filter::since_optimize_filter(filter, notes); 577 } else { 578 warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", &timeline.kind); 579 } 580 581 filter 582 }).collect(); 583 584 //let sub_id = damus.gen_subid(&SubKind::Initial); 585 let sub_id = subscriptions::new_sub_id(); 586 subs.subs.insert(sub_id.clone(), SubKind::Initial); 587 588 if let Err(err) = relay.subscribe(sub_id.clone(), new_filters.clone()) { 589 error!("error subscribing: {err}"); 590 } else { 591 timeline.subscription.force_add_remote(sub_id); 592 } 593 } 594 595 // we need some data first 596 FilterState::NeedsRemote => fetch_contact_list(subs, relay, timeline, accounts), 597 } 598 } 599 600 pub fn fetch_contact_list( 601 subs: &mut Subscriptions, 602 relay: &mut PoolRelay, 603 timeline: &mut Timeline, 604 accounts: &Accounts, 605 ) { 606 let sub_kind = SubKind::FetchingContactList(timeline.kind.clone()); 607 let sub = &accounts.get_subs().contacts; 608 609 let new_filter_state = match accounts.get_selected_account().data.contacts.get_state() { 610 ContactState::Unreceived => { 611 FilterState::FetchingRemote(filter::FetchingRemoteType::Contact) 612 } 613 ContactState::Received { 614 contacts: _, 615 note_key: _, 616 timestamp: _, 617 } => FilterState::GotRemote(filter::GotRemoteType::Contact), 618 }; 619 620 timeline 621 .filter 622 .set_relay_state(relay.url().to_string(), new_filter_state); 623 624 subs.subs.insert(sub.remote.clone(), sub_kind); 625 } 626 627 fn setup_initial_timeline( 628 ndb: &Ndb, 629 txn: &Transaction, 630 timeline: &mut Timeline, 631 note_cache: &mut NoteCache, 632 filters: &HybridFilter, 633 ) -> Result<()> { 634 // some timelines are one-shot and a refreshed, like last_per_pubkey algo feed 635 if timeline.kind.should_subscribe_locally() { 636 timeline.subscription.try_add_local(ndb, filters); 637 } 638 639 debug!( 640 "querying nostrdb sub {:?} {:?}", 641 timeline.subscription, timeline.filter 642 ); 643 644 let mut lim = 0i32; 645 for filter in filters.local() { 646 lim += filter.limit().unwrap_or(1) as i32; 647 } 648 649 debug!("setup_initial_timeline: limit for local filter is {}", lim); 650 651 let notes: Vec<NoteRef> = ndb 652 .query(txn, filters.local(), lim)? 653 .into_iter() 654 .map(NoteRef::from_query_result) 655 .collect(); 656 657 timeline.insert_new(txn, ndb, note_cache, ¬es); 658 659 Ok(()) 660 } 661 662 pub fn setup_initial_nostrdb_subs( 663 ndb: &Ndb, 664 note_cache: &mut NoteCache, 665 timeline_cache: &mut TimelineCache, 666 ) -> Result<()> { 667 for (_kind, timeline) in timeline_cache { 668 let txn = Transaction::new(ndb).expect("txn"); 669 if let Err(err) = setup_timeline_nostrdb_sub(ndb, &txn, note_cache, timeline) { 670 error!("setup_initial_nostrdb_subs: {err}"); 671 } 672 } 673 674 Ok(()) 675 } 676 677 fn setup_timeline_nostrdb_sub( 678 ndb: &Ndb, 679 txn: &Transaction, 680 note_cache: &mut NoteCache, 681 timeline: &mut Timeline, 682 ) -> Result<()> { 683 let filter_state = timeline 684 .filter 685 .get_any_ready() 686 .ok_or(Error::App(notedeck::Error::empty_contact_list()))? 687 .to_owned(); 688 689 setup_initial_timeline(ndb, txn, timeline, note_cache, &filter_state)?; 690 691 Ok(()) 692 } 693 694 /// Check our timeline filter and see if we have any filter data ready. 695 /// Our timelines may require additional data before it is functional. For 696 /// example, when we have to fetch a contact list before we do the actual 697 /// following list query. 698 pub fn is_timeline_ready( 699 ndb: &Ndb, 700 pool: &mut RelayPool, 701 note_cache: &mut NoteCache, 702 timeline: &mut Timeline, 703 accounts: &Accounts, 704 ) -> bool { 705 // TODO: we should debounce the filter states a bit to make sure we have 706 // seen all of the different contact lists from each relay 707 if let Some(_f) = timeline.filter.get_any_ready() { 708 return true; 709 } 710 711 let Some(res) = timeline.filter.get_any_gotremote() else { 712 return false; 713 }; 714 715 let (relay_id, note_key) = match res { 716 filter::GotRemoteResult::Normal { relay_id, sub_id } => { 717 // We got at least one eose for our filter request. Let's see 718 // if nostrdb is done processing it yet. 719 let res = ndb.poll_for_notes(sub_id, 1); 720 if res.is_empty() { 721 debug!( 722 "check_timeline_filter_state: no notes found (yet?) for timeline {:?}", 723 timeline 724 ); 725 return false; 726 } 727 728 info!("notes found for contact timeline after GotRemote!"); 729 730 (relay_id, res[0]) 731 } 732 filter::GotRemoteResult::Contact { relay_id } => { 733 let ContactState::Received { 734 contacts: _, 735 note_key, 736 timestamp: _, 737 } = accounts.get_selected_account().data.contacts.get_state() 738 else { 739 return false; 740 }; 741 742 (relay_id, *note_key) 743 } 744 }; 745 746 let with_hashtags = false; 747 748 let filter = { 749 let txn = Transaction::new(ndb).expect("txn"); 750 let note = ndb.get_note_by_key(&txn, note_key).expect("note"); 751 let add_pk = timeline.kind.pubkey().map(|pk| pk.bytes()); 752 753 hybrid_contacts_filter(¬e, add_pk, with_hashtags) 754 }; 755 756 // TODO: into_follow_filter is hardcoded to contact lists, let's generalize 757 match filter { 758 Err(notedeck::Error::Filter(e)) => { 759 error!("got broken when building filter {e}"); 760 timeline 761 .filter 762 .set_relay_state(relay_id, FilterState::broken(e)); 763 false 764 } 765 Err(err) => { 766 error!("got broken when building filter {err}"); 767 timeline 768 .filter 769 .set_relay_state(relay_id, FilterState::broken(FilterError::EmptyContactList)); 770 false 771 } 772 Ok(filter) => { 773 // we just switched to the ready state, we should send initial 774 // queries and setup the local subscription 775 info!("Found contact list! Setting up local and remote contact list query"); 776 let txn = Transaction::new(ndb).expect("txn"); 777 setup_initial_timeline(ndb, &txn, timeline, note_cache, &filter).expect("setup init"); 778 timeline 779 .filter 780 .set_relay_state(relay_id, FilterState::ready_hybrid(filter.clone())); 781 782 //let ck = &timeline.kind; 783 //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); 784 timeline.subscription.try_add_remote(pool, &filter); 785 true 786 } 787 } 788 } 789 790 #[derive(Debug)] 791 pub struct NotesFreshness { 792 debouncer: Debouncer, 793 state: NotesFreshnessState, 794 } 795 796 #[derive(Debug)] 797 enum NotesFreshnessState { 798 Fresh { 799 timestamp_viewed: u64, 800 }, 801 Stale { 802 have_unseen: bool, 803 timestamp_last_viewed: u64, 804 }, 805 } 806 807 impl Default for NotesFreshness { 808 fn default() -> Self { 809 Self { 810 debouncer: Debouncer::new(Duration::from_secs(2)), 811 state: NotesFreshnessState::Stale { 812 have_unseen: true, 813 timestamp_last_viewed: 0, 814 }, 815 } 816 } 817 } 818 819 impl NotesFreshness { 820 pub fn set_fresh(&mut self) { 821 if !self.debouncer.should_act() { 822 return; 823 } 824 self.state = NotesFreshnessState::Fresh { 825 timestamp_viewed: timestamp_now(), 826 }; 827 self.debouncer.bounce(); 828 } 829 830 pub fn update(&mut self, check_have_unseen: impl FnOnce(u64) -> bool) { 831 if !self.debouncer.should_act() { 832 return; 833 } 834 835 match &self.state { 836 NotesFreshnessState::Fresh { timestamp_viewed } => { 837 let Ok(dur) = SystemTime::now() 838 .duration_since(UNIX_EPOCH + Duration::from_secs(*timestamp_viewed)) 839 else { 840 return; 841 }; 842 843 if dur > Duration::from_secs(2) { 844 self.state = NotesFreshnessState::Stale { 845 have_unseen: check_have_unseen(*timestamp_viewed), 846 timestamp_last_viewed: *timestamp_viewed, 847 }; 848 } 849 } 850 NotesFreshnessState::Stale { 851 have_unseen, 852 timestamp_last_viewed, 853 } => { 854 if *have_unseen { 855 return; 856 } 857 858 self.state = NotesFreshnessState::Stale { 859 have_unseen: check_have_unseen(*timestamp_last_viewed), 860 timestamp_last_viewed: *timestamp_last_viewed, 861 }; 862 } 863 } 864 865 self.debouncer.bounce(); 866 } 867 868 pub fn has_unseen(&self) -> bool { 869 match &self.state { 870 NotesFreshnessState::Fresh { 871 timestamp_viewed: _, 872 } => false, 873 NotesFreshnessState::Stale { 874 have_unseen, 875 timestamp_last_viewed: _, 876 } => *have_unseen, 877 } 878 } 879 } 880 881 fn timestamp_now() -> u64 { 882 std::time::SystemTime::now() 883 .duration_since(std::time::UNIX_EPOCH) 884 .unwrap_or(Duration::ZERO) 885 .as_secs() 886 }