mod.rs (21863B)
1 use crate::{ 2 column::Columns, 3 error::{Error, FilterError}, 4 filter::{self, FilterState, FilterStates}, 5 muted::MuteFun, 6 note::NoteRef, 7 notecache::{CachedNote, NoteCache}, 8 subscriptions::{self, SubKind, Subscriptions}, 9 unknowns::UnknownIds, 10 Result, 11 }; 12 13 use std::fmt; 14 use std::sync::atomic::{AtomicU32, Ordering}; 15 16 use egui_virtual_list::VirtualList; 17 use enostr::{Relay, RelayPool}; 18 use nostrdb::{Filter, Ndb, Note, Subscription, Transaction}; 19 use serde::{Deserialize, Serialize}; 20 use std::cell::RefCell; 21 use std::hash::Hash; 22 use std::rc::Rc; 23 24 use tracing::{debug, error, info, warn}; 25 26 pub mod kind; 27 pub mod route; 28 29 pub use kind::{PubkeySource, TimelineKind}; 30 pub use route::TimelineRoute; 31 32 #[derive(Debug, Hash, Copy, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] 33 pub struct TimelineId(u32); 34 35 impl TimelineId { 36 pub fn new(id: u32) -> Self { 37 TimelineId(id) 38 } 39 } 40 41 impl fmt::Display for TimelineId { 42 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 43 write!(f, "TimelineId({})", self.0) 44 } 45 } 46 47 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] 48 pub enum ViewFilter { 49 Notes, 50 51 #[default] 52 NotesAndReplies, 53 } 54 55 impl ViewFilter { 56 pub fn name(&self) -> &'static str { 57 match self { 58 ViewFilter::Notes => "Notes", 59 ViewFilter::NotesAndReplies => "Notes & Replies", 60 } 61 } 62 63 pub fn index(&self) -> usize { 64 match self { 65 ViewFilter::Notes => 0, 66 ViewFilter::NotesAndReplies => 1, 67 } 68 } 69 70 pub fn filter_notes(cache: &CachedNote, note: &Note) -> bool { 71 !cache.reply.borrow(note.tags()).is_reply() 72 } 73 74 fn identity(_cache: &CachedNote, _note: &Note) -> bool { 75 true 76 } 77 78 pub fn filter(&self) -> fn(&CachedNote, &Note) -> bool { 79 match self { 80 ViewFilter::Notes => ViewFilter::filter_notes, 81 ViewFilter::NotesAndReplies => ViewFilter::identity, 82 } 83 } 84 } 85 86 /// A timeline view is a filtered view of notes in a timeline. Two standard views 87 /// are "Notes" and "Notes & Replies". A timeline is associated with a Filter, 88 /// but a TimelineTab is a further filtered view of this Filter that can't 89 /// be captured by a Filter itself. 90 #[derive(Default, Debug)] 91 pub struct TimelineTab { 92 pub notes: Vec<NoteRef>, 93 pub selection: i32, 94 pub filter: ViewFilter, 95 pub list: Rc<RefCell<VirtualList>>, 96 } 97 98 impl TimelineTab { 99 pub fn new(filter: ViewFilter) -> Self { 100 TimelineTab::new_with_capacity(filter, 1000) 101 } 102 103 pub fn new_with_capacity(filter: ViewFilter, cap: usize) -> Self { 104 let selection = 0i32; 105 let mut list = VirtualList::new(); 106 list.hide_on_resize(None); 107 list.over_scan(1000.0); 108 let list = Rc::new(RefCell::new(list)); 109 let notes: Vec<NoteRef> = Vec::with_capacity(cap); 110 111 TimelineTab { 112 notes, 113 selection, 114 filter, 115 list, 116 } 117 } 118 119 pub fn insert(&mut self, new_refs: &[NoteRef], reversed: bool) { 120 if new_refs.is_empty() { 121 return; 122 } 123 let num_prev_items = self.notes.len(); 124 let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs); 125 126 self.notes = notes; 127 let new_items = self.notes.len() - num_prev_items; 128 129 // TODO: technically items could have been added inbetween 130 if new_items > 0 { 131 let mut list = self.list.borrow_mut(); 132 133 match merge_kind { 134 // TODO: update egui_virtual_list to support spliced inserts 135 MergeKind::Spliced => { 136 debug!( 137 "spliced when inserting {} new notes, resetting virtual list", 138 new_refs.len() 139 ); 140 list.reset(); 141 } 142 MergeKind::FrontInsert => { 143 // only run this logic if we're reverse-chronological 144 // reversed in this case means chronological, since the 145 // default is reverse-chronological. yeah it's confusing. 146 if !reversed { 147 list.items_inserted_at_start(new_items); 148 } 149 } 150 } 151 } 152 } 153 154 pub fn select_down(&mut self) { 155 debug!("select_down {}", self.selection + 1); 156 if self.selection + 1 > self.notes.len() as i32 { 157 return; 158 } 159 160 self.selection += 1; 161 } 162 163 pub fn select_up(&mut self) { 164 debug!("select_up {}", self.selection - 1); 165 if self.selection - 1 < 0 { 166 return; 167 } 168 169 self.selection -= 1; 170 } 171 } 172 173 /// A column in a deck. Holds navigation state, loaded notes, column kind, etc. 174 #[derive(Debug)] 175 pub struct Timeline { 176 pub id: TimelineId, 177 pub kind: TimelineKind, 178 // We may not have the filter loaded yet, so let's make it an option so 179 // that codepaths have to explicitly handle it 180 pub filter: FilterStates, 181 pub views: Vec<TimelineTab>, 182 pub selected_view: i32, 183 184 /// Our nostrdb subscription 185 pub subscription: Option<Subscription>, 186 } 187 188 #[derive(Serialize, Deserialize, Clone, Debug)] 189 pub struct SerializableTimeline { 190 pub id: TimelineId, 191 pub kind: TimelineKind, 192 } 193 194 impl SerializableTimeline { 195 pub fn into_timeline(self, ndb: &Ndb, deck_user_pubkey: Option<&[u8; 32]>) -> Option<Timeline> { 196 self.kind.into_timeline(ndb, deck_user_pubkey) 197 } 198 } 199 200 impl Timeline { 201 /// Create a timeline from a contact list 202 pub fn contact_list(contact_list: &Note, pk_src: PubkeySource) -> Result<Self> { 203 let filter = filter::filter_from_tags(contact_list)?.into_follow_filter(); 204 205 Ok(Timeline::new( 206 TimelineKind::contact_list(pk_src), 207 FilterState::ready(filter), 208 )) 209 } 210 211 pub fn hashtag(hashtag: String) -> Self { 212 let filter = Filter::new() 213 .kinds([1]) 214 .limit(filter::default_limit()) 215 .tags([hashtag.clone()], 't') 216 .build(); 217 218 Timeline::new( 219 TimelineKind::Hashtag(hashtag), 220 FilterState::ready(vec![filter]), 221 ) 222 } 223 224 pub fn make_view_id(id: TimelineId, selected_view: i32) -> egui::Id { 225 egui::Id::new((id, selected_view)) 226 } 227 228 pub fn view_id(&self) -> egui::Id { 229 Timeline::make_view_id(self.id, self.selected_view) 230 } 231 232 pub fn new(kind: TimelineKind, filter_state: FilterState) -> Self { 233 // global unique id for all new timelines 234 static UIDS: AtomicU32 = AtomicU32::new(0); 235 236 let filter = FilterStates::new(filter_state); 237 let subscription: Option<Subscription> = None; 238 let notes = TimelineTab::new(ViewFilter::Notes); 239 let replies = TimelineTab::new(ViewFilter::NotesAndReplies); 240 let views = vec![notes, replies]; 241 let selected_view = 0; 242 let id = TimelineId::new(UIDS.fetch_add(1, Ordering::Relaxed)); 243 244 Timeline { 245 id, 246 kind, 247 filter, 248 views, 249 subscription, 250 selected_view, 251 } 252 } 253 254 pub fn current_view(&self) -> &TimelineTab { 255 &self.views[self.selected_view as usize] 256 } 257 258 pub fn current_view_mut(&mut self) -> &mut TimelineTab { 259 &mut self.views[self.selected_view as usize] 260 } 261 262 pub fn notes(&self, view: ViewFilter) -> &[NoteRef] { 263 &self.views[view.index()].notes 264 } 265 266 pub fn view(&self, view: ViewFilter) -> &TimelineTab { 267 &self.views[view.index()] 268 } 269 270 pub fn view_mut(&mut self, view: ViewFilter) -> &mut TimelineTab { 271 &mut self.views[view.index()] 272 } 273 274 pub fn poll_notes_into_view( 275 timeline_idx: usize, 276 mut timelines: Vec<&mut Timeline>, 277 ndb: &Ndb, 278 txn: &Transaction, 279 unknown_ids: &mut UnknownIds, 280 note_cache: &mut NoteCache, 281 is_muted: &MuteFun, 282 ) -> Result<()> { 283 let timeline = timelines 284 .get_mut(timeline_idx) 285 .ok_or(Error::TimelineNotFound)?; 286 let sub = timeline.subscription.ok_or(Error::no_active_sub())?; 287 288 let new_note_ids = ndb.poll_for_notes(sub, 500); 289 if new_note_ids.is_empty() { 290 return Ok(()); 291 } else { 292 debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids); 293 } 294 295 let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len()); 296 297 for key in new_note_ids { 298 let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { 299 note 300 } else { 301 error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key); 302 continue; 303 }; 304 if is_muted(¬e) { 305 continue; 306 } 307 308 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e); 309 310 let created_at = note.created_at(); 311 new_refs.push((note, NoteRef { key, created_at })); 312 } 313 314 // We're assuming reverse-chronological here (timelines). This 315 // flag ensures we trigger the items_inserted_at_start 316 // optimization in VirtualList. We need this flag because we can 317 // insert notes into chronological order sometimes, and this 318 // optimization doesn't make sense in those situations. 319 let reversed = false; 320 321 // ViewFilter::NotesAndReplies 322 { 323 let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect(); 324 325 let reversed = false; 326 timeline 327 .view_mut(ViewFilter::NotesAndReplies) 328 .insert(&refs, reversed); 329 } 330 331 // 332 // handle the filtered case (ViewFilter::Notes, no replies) 333 // 334 // TODO(jb55): this is mostly just copied from above, let's just use a loop 335 // I initially tried this but ran into borrow checker issues 336 { 337 let mut filtered_refs = Vec::with_capacity(new_refs.len()); 338 for (note, nr) in &new_refs { 339 let cached_note = note_cache.cached_note_or_insert(nr.key, note); 340 341 if ViewFilter::filter_notes(cached_note, note) { 342 filtered_refs.push(*nr); 343 } 344 } 345 346 timeline 347 .view_mut(ViewFilter::Notes) 348 .insert(&filtered_refs, reversed); 349 } 350 351 Ok(()) 352 } 353 354 pub fn as_serializable_timeline(&self) -> SerializableTimeline { 355 SerializableTimeline { 356 id: self.id, 357 kind: self.kind.clone(), 358 } 359 } 360 } 361 362 pub enum MergeKind { 363 FrontInsert, 364 Spliced, 365 } 366 367 pub fn merge_sorted_vecs<T: Ord + Copy>(vec1: &[T], vec2: &[T]) -> (Vec<T>, MergeKind) { 368 let mut merged = Vec::with_capacity(vec1.len() + vec2.len()); 369 let mut i = 0; 370 let mut j = 0; 371 let mut result: Option<MergeKind> = None; 372 373 while i < vec1.len() && j < vec2.len() { 374 if vec1[i] <= vec2[j] { 375 if result.is_none() && j < vec2.len() { 376 // if we're pushing from our large list and still have 377 // some left in vec2, then this is a splice 378 result = Some(MergeKind::Spliced); 379 } 380 merged.push(vec1[i]); 381 i += 1; 382 } else { 383 merged.push(vec2[j]); 384 j += 1; 385 } 386 } 387 388 // Append any remaining elements from either vector 389 if i < vec1.len() { 390 merged.extend_from_slice(&vec1[i..]); 391 } 392 if j < vec2.len() { 393 merged.extend_from_slice(&vec2[j..]); 394 } 395 396 (merged, result.unwrap_or(MergeKind::FrontInsert)) 397 } 398 399 /// When adding a new timeline, we may have a situation where the 400 /// FilterState is NeedsRemote. This can happen if we don't yet have the 401 /// contact list, etc. For these situations, we query all of the relays 402 /// with the same sub_id. We keep track of this sub_id and update the 403 /// filter with the latest version of the returned filter (ie contact 404 /// list) when they arrive. 405 /// 406 /// We do this by maintaining this sub_id in the filter state, even when 407 /// in the ready state. See: [`FilterReady`] 408 pub fn setup_new_timeline( 409 timeline: &mut Timeline, 410 ndb: &Ndb, 411 subs: &mut Subscriptions, 412 pool: &mut RelayPool, 413 note_cache: &mut NoteCache, 414 since_optimize: bool, 415 is_muted: &MuteFun, 416 ) { 417 // if we're ready, setup local subs 418 if is_timeline_ready(ndb, pool, note_cache, timeline, is_muted) { 419 if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline, is_muted) { 420 error!("setup_new_timeline: {err}"); 421 } 422 } 423 424 for relay in &mut pool.relays { 425 send_initial_timeline_filter(ndb, since_optimize, subs, &mut relay.relay, timeline); 426 } 427 } 428 429 /// Send initial filters for a specific relay. This typically gets called 430 /// when we first connect to a new relay for the first time. For 431 /// situations where you are adding a new timeline, use 432 /// setup_new_timeline. 433 pub fn send_initial_timeline_filters( 434 ndb: &Ndb, 435 since_optimize: bool, 436 columns: &mut Columns, 437 subs: &mut Subscriptions, 438 pool: &mut RelayPool, 439 relay_id: &str, 440 ) -> Option<()> { 441 info!("Sending initial filters to {}", relay_id); 442 let relay = &mut pool 443 .relays 444 .iter_mut() 445 .find(|r| r.relay.url == relay_id)? 446 .relay; 447 448 for timeline in columns.timelines_mut() { 449 send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline); 450 } 451 452 Some(()) 453 } 454 455 pub fn send_initial_timeline_filter( 456 ndb: &Ndb, 457 can_since_optimize: bool, 458 subs: &mut Subscriptions, 459 relay: &mut Relay, 460 timeline: &mut Timeline, 461 ) { 462 let filter_state = timeline.filter.get(&relay.url); 463 464 match filter_state { 465 FilterState::Broken(err) => { 466 error!( 467 "FetchingRemote state in broken state when sending initial timeline filter? {err}" 468 ); 469 } 470 471 FilterState::FetchingRemote(_unisub) => { 472 error!("FetchingRemote state when sending initial timeline filter?"); 473 } 474 475 FilterState::GotRemote(_sub) => { 476 error!("GotRemote state when sending initial timeline filter?"); 477 } 478 479 FilterState::Ready(filter) => { 480 let filter = filter.to_owned(); 481 let new_filters = filter.into_iter().map(|f| { 482 // limit the size of remote filters 483 let default_limit = filter::default_remote_limit(); 484 let mut lim = f.limit().unwrap_or(default_limit); 485 let mut filter = f; 486 if lim > default_limit { 487 lim = default_limit; 488 filter = filter.limit_mut(lim); 489 } 490 491 let notes = timeline.notes(ViewFilter::NotesAndReplies); 492 493 // Should we since optimize? Not always. For example 494 // if we only have a few notes locally. One way to 495 // determine this is by looking at the current filter 496 // and seeing what its limit is. If we have less 497 // notes than the limit, we might want to backfill 498 // older notes 499 if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { 500 filter = filter::since_optimize_filter(filter, notes); 501 } else { 502 warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); 503 } 504 505 filter 506 }).collect(); 507 508 //let sub_id = damus.gen_subid(&SubKind::Initial); 509 let sub_id = subscriptions::new_sub_id(); 510 subs.subs.insert(sub_id.clone(), SubKind::Initial); 511 512 relay.subscribe(sub_id, new_filters); 513 } 514 515 // we need some data first 516 FilterState::NeedsRemote(filter) => { 517 fetch_contact_list(filter.to_owned(), ndb, subs, relay, timeline) 518 } 519 } 520 } 521 522 fn fetch_contact_list( 523 filter: Vec<Filter>, 524 ndb: &Ndb, 525 subs: &mut Subscriptions, 526 relay: &mut Relay, 527 timeline: &mut Timeline, 528 ) { 529 let sub_kind = SubKind::FetchingContactList(timeline.id); 530 let sub_id = subscriptions::new_sub_id(); 531 let local_sub = ndb.subscribe(&filter).expect("sub"); 532 533 timeline.filter.set_relay_state( 534 relay.url.clone(), 535 FilterState::fetching_remote(sub_id.clone(), local_sub), 536 ); 537 538 subs.subs.insert(sub_id.clone(), sub_kind); 539 540 info!("fetching contact list from {}", &relay.url); 541 relay.subscribe(sub_id, filter); 542 } 543 544 fn setup_initial_timeline( 545 ndb: &Ndb, 546 timeline: &mut Timeline, 547 note_cache: &mut NoteCache, 548 filters: &[Filter], 549 is_muted: &MuteFun, 550 ) -> Result<()> { 551 timeline.subscription = Some(ndb.subscribe(filters)?); 552 let txn = Transaction::new(ndb)?; 553 debug!( 554 "querying nostrdb sub {:?} {:?}", 555 timeline.subscription, timeline.filter 556 ); 557 let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32; 558 let notes = ndb 559 .query(&txn, filters, lim)? 560 .into_iter() 561 .map(NoteRef::from_query_result) 562 .collect(); 563 564 copy_notes_into_timeline(timeline, &txn, ndb, note_cache, notes, is_muted); 565 566 Ok(()) 567 } 568 569 pub fn copy_notes_into_timeline( 570 timeline: &mut Timeline, 571 txn: &Transaction, 572 ndb: &Ndb, 573 note_cache: &mut NoteCache, 574 notes: Vec<NoteRef>, 575 is_muted: &MuteFun, 576 ) { 577 let filters = { 578 let views = &timeline.views; 579 let filters: Vec<fn(&CachedNote, &Note) -> bool> = 580 views.iter().map(|v| v.filter.filter()).collect(); 581 filters 582 }; 583 584 for note_ref in notes { 585 for (view, filter) in filters.iter().enumerate() { 586 if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { 587 if is_muted(¬e) { 588 continue; 589 } 590 if filter( 591 note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), 592 ¬e, 593 ) { 594 timeline.views[view].notes.push(note_ref) 595 } 596 } 597 } 598 } 599 } 600 601 pub fn setup_initial_nostrdb_subs( 602 ndb: &Ndb, 603 note_cache: &mut NoteCache, 604 columns: &mut Columns, 605 is_muted: &MuteFun, 606 ) -> Result<()> { 607 for timeline in columns.timelines_mut() { 608 if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline, is_muted) { 609 error!("setup_initial_nostrdb_subs: {err}"); 610 } 611 } 612 613 Ok(()) 614 } 615 616 fn setup_timeline_nostrdb_sub( 617 ndb: &Ndb, 618 note_cache: &mut NoteCache, 619 timeline: &mut Timeline, 620 is_muted: &MuteFun, 621 ) -> Result<()> { 622 let filter_state = timeline 623 .filter 624 .get_any_ready() 625 .ok_or(Error::empty_contact_list())? 626 .to_owned(); 627 628 setup_initial_timeline(ndb, timeline, note_cache, &filter_state, is_muted)?; 629 630 Ok(()) 631 } 632 633 /// Check our timeline filter and see if we have any filter data ready. 634 /// Our timelines may require additional data before it is functional. For 635 /// example, when we have to fetch a contact list before we do the actual 636 /// following list query. 637 pub fn is_timeline_ready( 638 ndb: &Ndb, 639 pool: &mut RelayPool, 640 note_cache: &mut NoteCache, 641 timeline: &mut Timeline, 642 is_muted: &MuteFun, 643 ) -> bool { 644 // TODO: we should debounce the filter states a bit to make sure we have 645 // seen all of the different contact lists from each relay 646 if let Some(_f) = timeline.filter.get_any_ready() { 647 return true; 648 } 649 650 let (relay_id, sub) = if let Some((relay_id, sub)) = timeline.filter.get_any_gotremote() { 651 (relay_id.to_string(), sub) 652 } else { 653 return false; 654 }; 655 656 // We got at least one eose for our filter request. Let's see 657 // if nostrdb is done processing it yet. 658 let res = ndb.poll_for_notes(sub, 1); 659 if res.is_empty() { 660 debug!( 661 "check_timeline_filter_state: no notes found (yet?) for timeline {:?}", 662 timeline 663 ); 664 return false; 665 } 666 667 info!("notes found for contact timeline after GotRemote!"); 668 669 let note_key = res[0]; 670 671 let filter = { 672 let txn = Transaction::new(ndb).expect("txn"); 673 let note = ndb.get_note_by_key(&txn, note_key).expect("note"); 674 filter::filter_from_tags(¬e).map(|f| f.into_follow_filter()) 675 }; 676 677 // TODO: into_follow_filter is hardcoded to contact lists, let's generalize 678 match filter { 679 Err(Error::Filter(e)) => { 680 error!("got broken when building filter {e}"); 681 timeline 682 .filter 683 .set_relay_state(relay_id, FilterState::broken(e)); 684 false 685 } 686 Err(err) => { 687 error!("got broken when building filter {err}"); 688 timeline 689 .filter 690 .set_relay_state(relay_id, FilterState::broken(FilterError::EmptyContactList)); 691 false 692 } 693 Ok(filter) => { 694 // we just switched to the ready state, we should send initial 695 // queries and setup the local subscription 696 info!("Found contact list! Setting up local and remote contact list query"); 697 setup_initial_timeline(ndb, timeline, note_cache, &filter, is_muted) 698 .expect("setup init"); 699 timeline 700 .filter 701 .set_relay_state(relay_id, FilterState::ready(filter.clone())); 702 703 //let ck = &timeline.kind; 704 //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); 705 let subid = subscriptions::new_sub_id(); 706 pool.subscribe(subid, filter); 707 true 708 } 709 } 710 }