mod.rs (22461B)
1 use crate::{ 2 column::Columns, 3 decks::DecksCache, 4 error::Error, 5 subscriptions::{self, SubKind, Subscriptions}, 6 Result, 7 }; 8 9 use notedeck::{ 10 filter, CachedNote, FilterError, FilterState, FilterStates, MuteFun, NoteCache, NoteRef, 11 UnknownIds, 12 }; 13 14 use std::fmt; 15 use std::sync::atomic::{AtomicU32, Ordering}; 16 17 use egui_virtual_list::VirtualList; 18 use enostr::{Pubkey, Relay, RelayPool}; 19 use nostrdb::{Filter, Ndb, Note, Subscription, Transaction}; 20 use std::cell::RefCell; 21 use std::hash::Hash; 22 use std::rc::Rc; 23 24 use tracing::{debug, error, info, warn}; 25 26 pub mod kind; 27 pub mod route; 28 29 pub use kind::{ColumnTitle, PubkeySource, TimelineKind}; 30 pub use route::TimelineRoute; 31 32 #[derive(Debug, Hash, Copy, Clone, Eq, PartialEq)] 33 pub struct TimelineId(u32); 34 35 impl TimelineId { 36 pub fn new(id: u32) -> Self { 37 TimelineId(id) 38 } 39 } 40 41 impl fmt::Display for TimelineId { 42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 43 write!(f, "TimelineId({})", self.0) 44 } 45 } 46 47 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] 48 pub enum ViewFilter { 49 Notes, 50 51 #[default] 52 NotesAndReplies, 53 } 54 55 impl ViewFilter { 56 pub fn name(&self) -> &'static str { 57 match self { 58 ViewFilter::Notes => "Notes", 59 ViewFilter::NotesAndReplies => "Notes & Replies", 60 } 61 } 62 63 pub fn filter_notes(cache: &CachedNote, note: &Note) -> bool { 64 !cache.reply.borrow(note.tags()).is_reply() 65 } 66 67 fn identity(_cache: &CachedNote, _note: &Note) -> bool { 68 true 69 } 70 71 pub fn filter(&self) -> fn(&CachedNote, &Note) -> bool { 72 match self { 73 ViewFilter::Notes => ViewFilter::filter_notes, 74 ViewFilter::NotesAndReplies => ViewFilter::identity, 75 } 76 } 77 } 78 79 /// A timeline view is a filtered view of notes in a timeline. Two standard views 80 /// are "Notes" and "Notes & Replies". A timeline is associated with a Filter, 81 /// but a TimelineTab is a further filtered view of this Filter that can't 82 /// be captured by a Filter itself. 83 #[derive(Default, Debug)] 84 pub struct TimelineTab { 85 pub notes: Vec<NoteRef>, 86 pub selection: i32, 87 pub filter: ViewFilter, 88 pub list: Rc<RefCell<VirtualList>>, 89 } 90 91 impl TimelineTab { 92 pub fn new(filter: ViewFilter) -> Self { 93 TimelineTab::new_with_capacity(filter, 1000) 94 } 95 96 pub fn only_notes_and_replies() -> Vec<Self> { 97 vec![TimelineTab::new(ViewFilter::NotesAndReplies)] 98 } 99 100 pub fn no_replies() -> Vec<Self> { 101 vec![TimelineTab::new(ViewFilter::Notes)] 102 } 103 104 pub fn full_tabs() -> Vec<Self> { 105 vec![ 106 TimelineTab::new(ViewFilter::Notes), 107 TimelineTab::new(ViewFilter::NotesAndReplies), 108 ] 109 } 110 111 pub fn new_with_capacity(filter: ViewFilter, cap: usize) -> Self { 112 let selection = 0i32; 113 let mut list = VirtualList::new(); 114 list.hide_on_resize(None); 115 list.over_scan(1000.0); 116 let list = Rc::new(RefCell::new(list)); 117 let notes: Vec<NoteRef> = Vec::with_capacity(cap); 118 119 TimelineTab { 120 notes, 121 selection, 122 filter, 123 list, 124 } 125 } 126 127 pub fn insert(&mut self, new_refs: &[NoteRef], reversed: bool) { 128 if new_refs.is_empty() { 129 return; 130 } 131 let num_prev_items = self.notes.len(); 132 let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs); 133 134 self.notes = notes; 135 let new_items = self.notes.len() - num_prev_items; 136 137 // TODO: technically items could have been added inbetween 138 if new_items > 0 { 139 let mut list = self.list.borrow_mut(); 140 141 match merge_kind { 142 // TODO: update egui_virtual_list to support spliced inserts 143 MergeKind::Spliced => { 144 debug!( 145 "spliced when inserting {} new notes, resetting virtual list", 146 new_refs.len() 147 ); 148 list.reset(); 149 } 150 MergeKind::FrontInsert => { 151 // only run this logic if we're reverse-chronological 152 // reversed in this case means chronological, since the 153 // default is reverse-chronological. yeah it's confusing. 154 if !reversed { 155 list.items_inserted_at_start(new_items); 156 } 157 } 158 } 159 } 160 } 161 162 pub fn select_down(&mut self) { 163 debug!("select_down {}", self.selection + 1); 164 if self.selection + 1 > self.notes.len() as i32 { 165 return; 166 } 167 168 self.selection += 1; 169 } 170 171 pub fn select_up(&mut self) { 172 debug!("select_up {}", self.selection - 1); 173 if self.selection - 1 < 0 { 174 return; 175 } 176 177 self.selection -= 1; 178 } 179 } 180 181 /// A column in a deck. Holds navigation state, loaded notes, column kind, etc. 182 #[derive(Debug)] 183 pub struct Timeline { 184 pub id: TimelineId, 185 pub kind: TimelineKind, 186 // We may not have the filter loaded yet, so let's make it an option so 187 // that codepaths have to explicitly handle it 188 pub filter: FilterStates, 189 pub views: Vec<TimelineTab>, 190 pub selected_view: usize, 191 192 /// Our nostrdb subscription 193 pub subscription: Option<Subscription>, 194 } 195 196 impl Timeline { 197 /// Create a timeline from a contact list 198 pub fn contact_list( 199 contact_list: &Note, 200 pk_src: PubkeySource, 201 deck_author: Option<&[u8; 32]>, 202 ) -> Result<Self> { 203 let our_pubkey = deck_author.map(|da| pk_src.to_pubkey_bytes(da)); 204 let filter = filter::filter_from_tags(contact_list, our_pubkey)?.into_follow_filter(); 205 206 Ok(Timeline::new( 207 TimelineKind::contact_list(pk_src), 208 FilterState::ready(filter), 209 TimelineTab::full_tabs(), 210 )) 211 } 212 213 pub fn hashtag(hashtag: String) -> Self { 214 let filter = Filter::new() 215 .kinds([1]) 216 .limit(filter::default_limit()) 217 .tags([hashtag.clone()], 't') 218 .build(); 219 220 Timeline::new( 221 TimelineKind::Hashtag(hashtag), 222 FilterState::ready(vec![filter]), 223 TimelineTab::full_tabs(), 224 ) 225 } 226 227 pub fn make_view_id(id: TimelineId, selected_view: usize) -> egui::Id { 228 egui::Id::new((id, selected_view)) 229 } 230 231 pub fn view_id(&self) -> egui::Id { 232 Timeline::make_view_id(self.id, self.selected_view) 233 } 234 235 pub fn new(kind: TimelineKind, filter_state: FilterState, views: Vec<TimelineTab>) -> Self { 236 // global unique id for all new timelines 237 static UIDS: AtomicU32 = AtomicU32::new(0); 238 239 let filter = FilterStates::new(filter_state); 240 let subscription: Option<Subscription> = None; 241 let selected_view = 0; 242 let id = TimelineId::new(UIDS.fetch_add(1, Ordering::Relaxed)); 243 244 Timeline { 245 id, 246 kind, 247 filter, 248 views, 249 subscription, 250 selected_view, 251 } 252 } 253 254 pub fn current_view(&self) -> &TimelineTab { 255 &self.views[self.selected_view] 256 } 257 258 pub fn current_view_mut(&mut self) -> &mut TimelineTab { 259 &mut self.views[self.selected_view] 260 } 261 262 /// Get the note refs for NotesAndReplies. If we only have Notes, then 263 /// just return that instead 264 pub fn all_or_any_notes(&self) -> &[NoteRef] { 265 self.notes(ViewFilter::NotesAndReplies).unwrap_or_else(|| { 266 self.notes(ViewFilter::Notes) 267 .expect("should have at least notes") 268 }) 269 } 270 271 pub fn notes(&self, view: ViewFilter) -> Option<&[NoteRef]> { 272 self.view(view).map(|v| &*v.notes) 273 } 274 275 pub fn view(&self, view: ViewFilter) -> Option<&TimelineTab> { 276 self.views.iter().find(|tab| tab.filter == view) 277 } 278 279 pub fn view_mut(&mut self, view: ViewFilter) -> Option<&mut TimelineTab> { 280 self.views.iter_mut().find(|tab| tab.filter == view) 281 } 282 283 pub fn poll_notes_into_view( 284 timeline_idx: usize, 285 mut timelines: Vec<&mut Timeline>, 286 ndb: &Ndb, 287 txn: &Transaction, 288 unknown_ids: &mut UnknownIds, 289 note_cache: &mut NoteCache, 290 is_muted: &MuteFun, 291 ) -> Result<()> { 292 let timeline = timelines 293 .get_mut(timeline_idx) 294 .ok_or(Error::TimelineNotFound)?; 295 let sub = timeline 296 .subscription 297 .ok_or(Error::App(notedeck::Error::no_active_sub()))?; 298 299 let new_note_ids = ndb.poll_for_notes(sub, 500); 300 if new_note_ids.is_empty() { 301 return Ok(()); 302 } else { 303 debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids); 304 } 305 306 let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len()); 307 308 for key in new_note_ids { 309 let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { 310 note 311 } else { 312 error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key); 313 continue; 314 }; 315 if is_muted(¬e) { 316 continue; 317 } 318 319 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e); 320 321 let created_at = note.created_at(); 322 new_refs.push((note, NoteRef { key, created_at })); 323 } 324 325 // We're assuming reverse-chronological here (timelines). This 326 // flag ensures we trigger the items_inserted_at_start 327 // optimization in VirtualList. We need this flag because we can 328 // insert notes into chronological order sometimes, and this 329 // optimization doesn't make sense in those situations. 330 let reversed = false; 331 332 // ViewFilter::NotesAndReplies 333 if let Some(view) = timeline.view_mut(ViewFilter::NotesAndReplies) { 334 let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect(); 335 336 view.insert(&refs, reversed); 337 } 338 339 // 340 // handle the filtered case (ViewFilter::Notes, no replies) 341 // 342 // TODO(jb55): this is mostly just copied from above, let's just use a loop 343 // I initially tried this but ran into borrow checker issues 344 if let Some(view) = timeline.view_mut(ViewFilter::Notes) { 345 let mut filtered_refs = Vec::with_capacity(new_refs.len()); 346 for (note, nr) in &new_refs { 347 let cached_note = note_cache.cached_note_or_insert(nr.key, note); 348 349 if ViewFilter::filter_notes(cached_note, note) { 350 filtered_refs.push(*nr); 351 } 352 } 353 354 view.insert(&filtered_refs, reversed); 355 } 356 357 Ok(()) 358 } 359 } 360 361 pub enum MergeKind { 362 FrontInsert, 363 Spliced, 364 } 365 366 pub fn merge_sorted_vecs<T: Ord + Copy>(vec1: &[T], vec2: &[T]) -> (Vec<T>, MergeKind) { 367 let mut merged = Vec::with_capacity(vec1.len() + vec2.len()); 368 let mut i = 0; 369 let mut j = 0; 370 let mut result: Option<MergeKind> = None; 371 372 while i < vec1.len() && j < vec2.len() { 373 if vec1[i] <= vec2[j] { 374 if result.is_none() && j < vec2.len() { 375 // if we're pushing from our large list and still have 376 // some left in vec2, then this is a splice 377 result = Some(MergeKind::Spliced); 378 } 379 merged.push(vec1[i]); 380 i += 1; 381 } else { 382 merged.push(vec2[j]); 383 j += 1; 384 } 385 } 386 387 // Append any remaining elements from either vector 388 if i < vec1.len() { 389 merged.extend_from_slice(&vec1[i..]); 390 } 391 if j < vec2.len() { 392 merged.extend_from_slice(&vec2[j..]); 393 } 394 395 (merged, result.unwrap_or(MergeKind::FrontInsert)) 396 } 397 398 /// When adding a new timeline, we may have a situation where the 399 /// FilterState is NeedsRemote. This can happen if we don't yet have the 400 /// contact list, etc. For these situations, we query all of the relays 401 /// with the same sub_id. We keep track of this sub_id and update the 402 /// filter with the latest version of the returned filter (ie contact 403 /// list) when they arrive. 404 /// 405 /// We do this by maintaining this sub_id in the filter state, even when 406 /// in the ready state. See: [`FilterReady`] 407 #[allow(clippy::too_many_arguments)] 408 pub fn setup_new_timeline( 409 timeline: &mut Timeline, 410 ndb: &Ndb, 411 subs: &mut Subscriptions, 412 pool: &mut RelayPool, 413 note_cache: &mut NoteCache, 414 since_optimize: bool, 415 is_muted: &MuteFun, 416 our_pk: Option<&Pubkey>, 417 ) { 418 // if we're ready, setup local subs 419 if is_timeline_ready(ndb, pool, note_cache, timeline, is_muted, our_pk) { 420 if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline, is_muted) { 421 error!("setup_new_timeline: {err}"); 422 } 423 } 424 425 for relay in &mut pool.relays { 426 send_initial_timeline_filter(ndb, since_optimize, subs, &mut relay.relay, timeline); 427 } 428 } 429 430 /// Send initial filters for a specific relay. This typically gets called 431 /// when we first connect to a new relay for the first time. For 432 /// situations where you are adding a new timeline, use 433 /// setup_new_timeline. 434 pub fn send_initial_timeline_filters( 435 ndb: &Ndb, 436 since_optimize: bool, 437 columns: &mut Columns, 438 subs: &mut Subscriptions, 439 pool: &mut RelayPool, 440 relay_id: &str, 441 ) -> Option<()> { 442 info!("Sending initial filters to {}", relay_id); 443 let relay = &mut pool 444 .relays 445 .iter_mut() 446 .find(|r| r.relay.url == relay_id)? 447 .relay; 448 449 for timeline in columns.timelines_mut() { 450 send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline); 451 } 452 453 Some(()) 454 } 455 456 pub fn send_initial_timeline_filter( 457 ndb: &Ndb, 458 can_since_optimize: bool, 459 subs: &mut Subscriptions, 460 relay: &mut Relay, 461 timeline: &mut Timeline, 462 ) { 463 let filter_state = timeline.filter.get(&relay.url); 464 465 match filter_state { 466 FilterState::Broken(err) => { 467 error!( 468 "FetchingRemote state in broken state when sending initial timeline filter? {err}" 469 ); 470 } 471 472 FilterState::FetchingRemote(_unisub) => { 473 error!("FetchingRemote state when sending initial timeline filter?"); 474 } 475 476 FilterState::GotRemote(_sub) => { 477 error!("GotRemote state when sending initial timeline filter?"); 478 } 479 480 FilterState::Ready(filter) => { 481 let filter = filter.to_owned(); 482 let new_filters = filter.into_iter().map(|f| { 483 // limit the size of remote filters 484 let default_limit = filter::default_remote_limit(); 485 let mut lim = f.limit().unwrap_or(default_limit); 486 let mut filter = f; 487 if lim > default_limit { 488 lim = default_limit; 489 filter = filter.limit_mut(lim); 490 } 491 492 let notes = timeline.all_or_any_notes(); 493 494 // Should we since optimize? Not always. For example 495 // if we only have a few notes locally. One way to 496 // determine this is by looking at the current filter 497 // and seeing what its limit is. If we have less 498 // notes than the limit, we might want to backfill 499 // older notes 500 if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { 501 filter = filter::since_optimize_filter(filter, notes); 502 } else { 503 warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); 504 } 505 506 filter 507 }).collect(); 508 509 //let sub_id = damus.gen_subid(&SubKind::Initial); 510 let sub_id = subscriptions::new_sub_id(); 511 subs.subs.insert(sub_id.clone(), SubKind::Initial); 512 513 relay.subscribe(sub_id, new_filters); 514 } 515 516 // we need some data first 517 FilterState::NeedsRemote(filter) => { 518 fetch_contact_list(filter.to_owned(), ndb, subs, relay, timeline) 519 } 520 } 521 } 522 523 fn fetch_contact_list( 524 filter: Vec<Filter>, 525 ndb: &Ndb, 526 subs: &mut Subscriptions, 527 relay: &mut Relay, 528 timeline: &mut Timeline, 529 ) { 530 let sub_kind = SubKind::FetchingContactList(timeline.id); 531 let sub_id = subscriptions::new_sub_id(); 532 let local_sub = ndb.subscribe(&filter).expect("sub"); 533 534 timeline.filter.set_relay_state( 535 relay.url.clone(), 536 FilterState::fetching_remote(sub_id.clone(), local_sub), 537 ); 538 539 subs.subs.insert(sub_id.clone(), sub_kind); 540 541 info!("fetching contact list from {}", &relay.url); 542 relay.subscribe(sub_id, filter); 543 } 544 545 fn setup_initial_timeline( 546 ndb: &Ndb, 547 timeline: &mut Timeline, 548 note_cache: &mut NoteCache, 549 filters: &[Filter], 550 is_muted: &MuteFun, 551 ) -> Result<()> { 552 timeline.subscription = Some(ndb.subscribe(filters)?); 553 let txn = Transaction::new(ndb)?; 554 debug!( 555 "querying nostrdb sub {:?} {:?}", 556 timeline.subscription, timeline.filter 557 ); 558 let lim = filters[0].limit().unwrap_or(filter::default_limit()) as i32; 559 let notes = ndb 560 .query(&txn, filters, lim)? 561 .into_iter() 562 .map(NoteRef::from_query_result) 563 .collect(); 564 565 copy_notes_into_timeline(timeline, &txn, ndb, note_cache, notes, is_muted); 566 567 Ok(()) 568 } 569 570 pub fn copy_notes_into_timeline( 571 timeline: &mut Timeline, 572 txn: &Transaction, 573 ndb: &Ndb, 574 note_cache: &mut NoteCache, 575 notes: Vec<NoteRef>, 576 is_muted: &MuteFun, 577 ) { 578 let filters = { 579 let views = &timeline.views; 580 let filters: Vec<fn(&CachedNote, &Note) -> bool> = 581 views.iter().map(|v| v.filter.filter()).collect(); 582 filters 583 }; 584 585 for note_ref in notes { 586 for (view, filter) in filters.iter().enumerate() { 587 if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { 588 if is_muted(¬e) { 589 continue; 590 } 591 if filter( 592 note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), 593 ¬e, 594 ) { 595 timeline.views[view].notes.push(note_ref) 596 } 597 } 598 } 599 } 600 } 601 602 pub fn setup_initial_nostrdb_subs( 603 ndb: &Ndb, 604 note_cache: &mut NoteCache, 605 decks_cache: &mut DecksCache, 606 is_muted: &MuteFun, 607 ) -> Result<()> { 608 for decks in decks_cache.get_all_decks_mut() { 609 for deck in decks.decks_mut() { 610 for timeline in deck.columns_mut().timelines_mut() { 611 if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline, is_muted) { 612 error!("setup_initial_nostrdb_subs: {err}"); 613 } 614 } 615 } 616 } 617 618 Ok(()) 619 } 620 621 fn setup_timeline_nostrdb_sub( 622 ndb: &Ndb, 623 note_cache: &mut NoteCache, 624 timeline: &mut Timeline, 625 is_muted: &MuteFun, 626 ) -> Result<()> { 627 let filter_state = timeline 628 .filter 629 .get_any_ready() 630 .ok_or(Error::App(notedeck::Error::empty_contact_list()))? 631 .to_owned(); 632 633 setup_initial_timeline(ndb, timeline, note_cache, &filter_state, is_muted)?; 634 635 Ok(()) 636 } 637 638 /// Check our timeline filter and see if we have any filter data ready. 639 /// Our timelines may require additional data before it is functional. For 640 /// example, when we have to fetch a contact list before we do the actual 641 /// following list query. 642 pub fn is_timeline_ready( 643 ndb: &Ndb, 644 pool: &mut RelayPool, 645 note_cache: &mut NoteCache, 646 timeline: &mut Timeline, 647 is_muted: &MuteFun, 648 our_pk: Option<&Pubkey>, 649 ) -> bool { 650 // TODO: we should debounce the filter states a bit to make sure we have 651 // seen all of the different contact lists from each relay 652 if let Some(_f) = timeline.filter.get_any_ready() { 653 return true; 654 } 655 656 let (relay_id, sub) = if let Some((relay_id, sub)) = timeline.filter.get_any_gotremote() { 657 (relay_id.to_string(), sub) 658 } else { 659 return false; 660 }; 661 662 // We got at least one eose for our filter request. Let's see 663 // if nostrdb is done processing it yet. 664 let res = ndb.poll_for_notes(sub, 1); 665 if res.is_empty() { 666 debug!( 667 "check_timeline_filter_state: no notes found (yet?) for timeline {:?}", 668 timeline 669 ); 670 return false; 671 } 672 673 info!("notes found for contact timeline after GotRemote!"); 674 675 let note_key = res[0]; 676 677 let filter = { 678 let txn = Transaction::new(ndb).expect("txn"); 679 let note = ndb.get_note_by_key(&txn, note_key).expect("note"); 680 let add_pk = timeline 681 .kind 682 .pubkey_source() 683 .as_ref() 684 .and_then(|pk_src| our_pk.map(|pk| pk_src.to_pubkey_bytes(pk))); 685 filter::filter_from_tags(¬e, add_pk).map(|f| f.into_follow_filter()) 686 }; 687 688 // TODO: into_follow_filter is hardcoded to contact lists, let's generalize 689 match filter { 690 Err(notedeck::Error::Filter(e)) => { 691 error!("got broken when building filter {e}"); 692 timeline 693 .filter 694 .set_relay_state(relay_id, FilterState::broken(e)); 695 false 696 } 697 Err(err) => { 698 error!("got broken when building filter {err}"); 699 timeline 700 .filter 701 .set_relay_state(relay_id, FilterState::broken(FilterError::EmptyContactList)); 702 false 703 } 704 Ok(filter) => { 705 // we just switched to the ready state, we should send initial 706 // queries and setup the local subscription 707 info!("Found contact list! Setting up local and remote contact list query"); 708 setup_initial_timeline(ndb, timeline, note_cache, &filter, is_muted) 709 .expect("setup init"); 710 timeline 711 .filter 712 .set_relay_state(relay_id, FilterState::ready(filter.clone())); 713 714 //let ck = &timeline.kind; 715 //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); 716 let subid = subscriptions::new_sub_id(); 717 pool.subscribe(subid, filter); 718 true 719 } 720 } 721 }