mod.rs (21092B)
1 use crate::{ 2 column::Columns, 3 error::{Error, FilterError}, 4 filter::{self, FilterState, FilterStates}, 5 note::NoteRef, 6 notecache::{CachedNote, NoteCache}, 7 subscriptions::{self, SubKind, Subscriptions}, 8 unknowns::UnknownIds, 9 Result, 10 }; 11 12 use std::fmt; 13 use std::sync::atomic::{AtomicU32, Ordering}; 14 15 use egui_virtual_list::VirtualList; 16 use enostr::{Relay, RelayPool}; 17 use nostrdb::{Filter, Ndb, Note, Subscription, Transaction}; 18 use serde::{Deserialize, Serialize}; 19 use std::cell::RefCell; 20 use std::hash::Hash; 21 use std::rc::Rc; 22 23 use tracing::{debug, error, info, warn}; 24 25 pub mod kind; 26 pub mod route; 27 28 pub use kind::{PubkeySource, TimelineKind}; 29 pub use route::TimelineRoute; 30 31 #[derive(Debug, Hash, Copy, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)] 32 pub struct TimelineId(u32); 33 34 impl TimelineId { 35 pub fn new(id: u32) -> Self { 36 TimelineId(id) 37 } 38 } 39 40 impl fmt::Display for TimelineId { 41 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 42 write!(f, "TimelineId({})", self.0) 43 } 44 } 45 46 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default)] 47 pub enum ViewFilter { 48 Notes, 49 50 #[default] 51 NotesAndReplies, 52 } 53 54 impl ViewFilter { 55 pub fn name(&self) -> &'static str { 56 match self { 57 ViewFilter::Notes => "Notes", 58 ViewFilter::NotesAndReplies => "Notes & Replies", 59 } 60 } 61 62 pub fn index(&self) -> usize { 63 match self { 64 ViewFilter::Notes => 0, 65 ViewFilter::NotesAndReplies => 1, 66 } 67 } 68 69 pub fn filter_notes(cache: &CachedNote, note: &Note) -> bool { 70 !cache.reply.borrow(note.tags()).is_reply() 71 } 72 73 fn identity(_cache: &CachedNote, _note: &Note) -> bool { 74 true 75 } 76 77 pub fn filter(&self) -> fn(&CachedNote, &Note) -> bool { 78 match self { 79 ViewFilter::Notes => ViewFilter::filter_notes, 80 ViewFilter::NotesAndReplies => ViewFilter::identity, 81 } 82 } 83 } 84 85 /// A timeline view is a filtered view of notes in a timeline. Two standard views 86 /// are "Notes" and "Notes & Replies". A timeline is associated with a Filter, 87 /// but a TimelineTab is a further filtered view of this Filter that can't 88 /// be captured by a Filter itself. 89 #[derive(Default, Debug)] 90 pub struct TimelineTab { 91 pub notes: Vec<NoteRef>, 92 pub selection: i32, 93 pub filter: ViewFilter, 94 pub list: Rc<RefCell<VirtualList>>, 95 } 96 97 impl TimelineTab { 98 pub fn new(filter: ViewFilter) -> Self { 99 TimelineTab::new_with_capacity(filter, 1000) 100 } 101 102 pub fn new_with_capacity(filter: ViewFilter, cap: usize) -> Self { 103 let selection = 0i32; 104 let mut list = VirtualList::new(); 105 list.hide_on_resize(None); 106 list.over_scan(1000.0); 107 let list = Rc::new(RefCell::new(list)); 108 let notes: Vec<NoteRef> = Vec::with_capacity(cap); 109 110 TimelineTab { 111 notes, 112 selection, 113 filter, 114 list, 115 } 116 } 117 118 pub fn insert(&mut self, new_refs: &[NoteRef], reversed: bool) { 119 if new_refs.is_empty() { 120 return; 121 } 122 let num_prev_items = self.notes.len(); 123 let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs); 124 125 self.notes = notes; 126 let new_items = self.notes.len() - num_prev_items; 127 128 // TODO: technically items could have been added inbetween 129 if new_items > 0 { 130 let mut list = self.list.borrow_mut(); 131 132 match merge_kind { 133 // TODO: update egui_virtual_list to support spliced inserts 134 MergeKind::Spliced => { 135 debug!( 136 "spliced when inserting {} new notes, resetting virtual list", 137 new_refs.len() 138 ); 139 list.reset(); 140 } 141 MergeKind::FrontInsert => { 142 // only run this logic if we're reverse-chronological 143 // reversed in this case means chronological, since the 144 // default is reverse-chronological. yeah it's confusing. 145 if !reversed { 146 list.items_inserted_at_start(new_items); 147 } 148 } 149 } 150 } 151 } 152 153 pub fn select_down(&mut self) { 154 debug!("select_down {}", self.selection + 1); 155 if self.selection + 1 > self.notes.len() as i32 { 156 return; 157 } 158 159 self.selection += 1; 160 } 161 162 pub fn select_up(&mut self) { 163 debug!("select_up {}", self.selection - 1); 164 if self.selection - 1 < 0 { 165 return; 166 } 167 168 self.selection -= 1; 169 } 170 } 171 172 /// A column in a deck. Holds navigation state, loaded notes, column kind, etc. 173 #[derive(Debug)] 174 pub struct Timeline { 175 pub id: TimelineId, 176 pub kind: TimelineKind, 177 // We may not have the filter loaded yet, so let's make it an option so 178 // that codepaths have to explicitly handle it 179 pub filter: FilterStates, 180 pub views: Vec<TimelineTab>, 181 pub selected_view: i32, 182 183 /// Our nostrdb subscription 184 pub subscription: Option<Subscription>, 185 } 186 187 #[derive(Serialize, Deserialize, Clone, Debug)] 188 pub struct SerializableTimeline { 189 pub id: TimelineId, 190 pub kind: TimelineKind, 191 } 192 193 impl SerializableTimeline { 194 pub fn into_timeline(self, ndb: &Ndb, deck_user_pubkey: Option<&[u8; 32]>) -> Option<Timeline> { 195 self.kind.into_timeline(ndb, deck_user_pubkey) 196 } 197 } 198 199 impl Timeline { 200 /// Create a timeline from a contact list 201 pub fn contact_list(contact_list: &Note, pk_src: PubkeySource) -> Result<Self> { 202 let filter = filter::filter_from_tags(contact_list)?.into_follow_filter(); 203 204 Ok(Timeline::new( 205 TimelineKind::contact_list(pk_src), 206 FilterState::ready(filter), 207 )) 208 } 209 210 pub fn make_view_id(id: TimelineId, selected_view: i32) -> egui::Id { 211 egui::Id::new((id, selected_view)) 212 } 213 214 pub fn view_id(&self) -> egui::Id { 215 Timeline::make_view_id(self.id, self.selected_view) 216 } 217 218 pub fn new(kind: TimelineKind, filter_state: FilterState) -> Self { 219 // global unique id for all new timelines 220 static UIDS: AtomicU32 = AtomicU32::new(0); 221 222 let filter = FilterStates::new(filter_state); 223 let subscription: Option<Subscription> = None; 224 let notes = TimelineTab::new(ViewFilter::Notes); 225 let replies = TimelineTab::new(ViewFilter::NotesAndReplies); 226 let views = vec![notes, replies]; 227 let selected_view = 0; 228 let id = TimelineId::new(UIDS.fetch_add(1, Ordering::Relaxed)); 229 230 Timeline { 231 id, 232 kind, 233 filter, 234 views, 235 subscription, 236 selected_view, 237 } 238 } 239 240 pub fn current_view(&self) -> &TimelineTab { 241 &self.views[self.selected_view as usize] 242 } 243 244 pub fn current_view_mut(&mut self) -> &mut TimelineTab { 245 &mut self.views[self.selected_view as usize] 246 } 247 248 pub fn notes(&self, view: ViewFilter) -> &[NoteRef] { 249 &self.views[view.index()].notes 250 } 251 252 pub fn view(&self, view: ViewFilter) -> &TimelineTab { 253 &self.views[view.index()] 254 } 255 256 pub fn view_mut(&mut self, view: ViewFilter) -> &mut TimelineTab { 257 &mut self.views[view.index()] 258 } 259 260 pub fn poll_notes_into_view( 261 timeline_idx: usize, 262 mut timelines: Vec<&mut Timeline>, 263 ndb: &Ndb, 264 txn: &Transaction, 265 unknown_ids: &mut UnknownIds, 266 note_cache: &mut NoteCache, 267 ) -> Result<()> { 268 let timeline = timelines 269 .get_mut(timeline_idx) 270 .ok_or(Error::TimelineNotFound)?; 271 let sub = timeline.subscription.ok_or(Error::no_active_sub())?; 272 273 let new_note_ids = ndb.poll_for_notes(sub, 500); 274 if new_note_ids.is_empty() { 275 return Ok(()); 276 } else { 277 debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids); 278 } 279 280 let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len()); 281 282 for key in new_note_ids { 283 let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { 284 note 285 } else { 286 error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key); 287 continue; 288 }; 289 290 UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, ¬e); 291 292 let created_at = note.created_at(); 293 new_refs.push((note, NoteRef { key, created_at })); 294 } 295 296 // We're assuming reverse-chronological here (timelines). This 297 // flag ensures we trigger the items_inserted_at_start 298 // optimization in VirtualList. We need this flag because we can 299 // insert notes into chronological order sometimes, and this 300 // optimization doesn't make sense in those situations. 301 let reversed = false; 302 303 // ViewFilter::NotesAndReplies 304 { 305 let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect(); 306 307 let reversed = false; 308 timeline 309 .view_mut(ViewFilter::NotesAndReplies) 310 .insert(&refs, reversed); 311 } 312 313 // 314 // handle the filtered case (ViewFilter::Notes, no replies) 315 // 316 // TODO(jb55): this is mostly just copied from above, let's just use a loop 317 // I initially tried this but ran into borrow checker issues 318 { 319 let mut filtered_refs = Vec::with_capacity(new_refs.len()); 320 for (note, nr) in &new_refs { 321 let cached_note = note_cache.cached_note_or_insert(nr.key, note); 322 323 if ViewFilter::filter_notes(cached_note, note) { 324 filtered_refs.push(*nr); 325 } 326 } 327 328 timeline 329 .view_mut(ViewFilter::Notes) 330 .insert(&filtered_refs, reversed); 331 } 332 333 Ok(()) 334 } 335 336 pub fn as_serializable_timeline(&self) -> SerializableTimeline { 337 SerializableTimeline { 338 id: self.id, 339 kind: self.kind.clone(), 340 } 341 } 342 } 343 344 pub enum MergeKind { 345 FrontInsert, 346 Spliced, 347 } 348 349 pub fn merge_sorted_vecs<T: Ord + Copy>(vec1: &[T], vec2: &[T]) -> (Vec<T>, MergeKind) { 350 let mut merged = Vec::with_capacity(vec1.len() + vec2.len()); 351 let mut i = 0; 352 let mut j = 0; 353 let mut result: Option<MergeKind> = None; 354 355 while i < vec1.len() && j < vec2.len() { 356 if vec1[i] <= vec2[j] { 357 if result.is_none() && j < vec2.len() { 358 // if we're pushing from our large list and still have 359 // some left in vec2, then this is a splice 360 result = Some(MergeKind::Spliced); 361 } 362 merged.push(vec1[i]); 363 i += 1; 364 } else { 365 merged.push(vec2[j]); 366 j += 1; 367 } 368 } 369 370 // Append any remaining elements from either vector 371 if i < vec1.len() { 372 merged.extend_from_slice(&vec1[i..]); 373 } 374 if j < vec2.len() { 375 merged.extend_from_slice(&vec2[j..]); 376 } 377 378 (merged, result.unwrap_or(MergeKind::FrontInsert)) 379 } 380 381 /// When adding a new timeline, we may have a situation where the 382 /// FilterState is NeedsRemote. This can happen if we don't yet have the 383 /// contact list, etc. For these situations, we query all of the relays 384 /// with the same sub_id. We keep track of this sub_id and update the 385 /// filter with the latest version of the returned filter (ie contact 386 /// list) when they arrive. 387 /// 388 /// We do this by maintaining this sub_id in the filter state, even when 389 /// in the ready state. See: [`FilterReady`] 390 pub fn setup_new_timeline( 391 timeline: &mut Timeline, 392 ndb: &Ndb, 393 subs: &mut Subscriptions, 394 pool: &mut RelayPool, 395 note_cache: &mut NoteCache, 396 since_optimize: bool, 397 ) { 398 // if we're ready, setup local subs 399 if is_timeline_ready(ndb, pool, note_cache, timeline) { 400 if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline) { 401 error!("setup_new_timeline: {err}"); 402 } 403 } 404 405 for relay in &mut pool.relays { 406 send_initial_timeline_filter(ndb, since_optimize, subs, &mut relay.relay, timeline); 407 } 408 } 409 410 /// Send initial filters for a specific relay. This typically gets called 411 /// when we first connect to a new relay for the first time. For 412 /// situations where you are adding a new timeline, use 413 /// setup_new_timeline. 414 pub fn send_initial_timeline_filters( 415 ndb: &Ndb, 416 since_optimize: bool, 417 columns: &mut Columns, 418 subs: &mut Subscriptions, 419 pool: &mut RelayPool, 420 relay_id: &str, 421 ) -> Option<()> { 422 info!("Sending initial filters to {}", relay_id); 423 let relay = &mut pool 424 .relays 425 .iter_mut() 426 .find(|r| r.relay.url == relay_id)? 427 .relay; 428 429 for timeline in columns.timelines_mut() { 430 send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline); 431 } 432 433 Some(()) 434 } 435 436 pub fn send_initial_timeline_filter( 437 ndb: &Ndb, 438 can_since_optimize: bool, 439 subs: &mut Subscriptions, 440 relay: &mut Relay, 441 timeline: &mut Timeline, 442 ) { 443 let filter_state = timeline.filter.get(&relay.url); 444 445 match filter_state { 446 FilterState::Broken(err) => { 447 error!( 448 "FetchingRemote state in broken state when sending initial timeline filter? {err}" 449 ); 450 } 451 452 FilterState::FetchingRemote(_unisub) => { 453 error!("FetchingRemote state when sending initial timeline filter?"); 454 } 455 456 FilterState::GotRemote(_sub) => { 457 error!("GotRemote state when sending initial timeline filter?"); 458 } 459 460 FilterState::Ready(filter) => { 461 let filter = filter.to_owned(); 462 let new_filters = filter.into_iter().map(|f| { 463 // limit the size of remote filters 464 let default_limit = filter::default_remote_limit(); 465 let mut lim = f.limit().unwrap_or(default_limit); 466 let mut filter = f; 467 if lim > default_limit { 468 lim = default_limit; 469 filter = filter.limit_mut(lim); 470 } 471 472 let notes = timeline.notes(ViewFilter::NotesAndReplies); 473 474 // Should we since optimize? Not always. For example 475 // if we only have a few notes locally. One way to 476 // determine this is by looking at the current filter 477 // and seeing what its limit is. If we have less 478 // notes than the limit, we might want to backfill 479 // older notes 480 if can_since_optimize && filter::should_since_optimize(lim, notes.len()) { 481 filter = filter::since_optimize_filter(filter, notes); 482 } else { 483 warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter); 484 } 485 486 filter 487 }).collect(); 488 489 //let sub_id = damus.gen_subid(&SubKind::Initial); 490 let sub_id = subscriptions::new_sub_id(); 491 subs.subs.insert(sub_id.clone(), SubKind::Initial); 492 493 relay.subscribe(sub_id, new_filters); 494 } 495 496 // we need some data first 497 FilterState::NeedsRemote(filter) => { 498 fetch_contact_list(filter.to_owned(), ndb, subs, relay, timeline) 499 } 500 } 501 } 502 503 fn fetch_contact_list( 504 filter: Vec<Filter>, 505 ndb: &Ndb, 506 subs: &mut Subscriptions, 507 relay: &mut Relay, 508 timeline: &mut Timeline, 509 ) { 510 let sub_kind = SubKind::FetchingContactList(timeline.id); 511 let sub_id = subscriptions::new_sub_id(); 512 let local_sub = ndb.subscribe(&filter).expect("sub"); 513 514 timeline.filter.set_relay_state( 515 relay.url.clone(), 516 FilterState::fetching_remote(sub_id.clone(), local_sub), 517 ); 518 519 subs.subs.insert(sub_id.clone(), sub_kind); 520 521 info!("fetching contact list from {}", &relay.url); 522 relay.subscribe(sub_id, filter); 523 } 524 525 fn setup_initial_timeline( 526 ndb: &Ndb, 527 timeline: &mut Timeline, 528 note_cache: &mut NoteCache, 529 filters: &[Filter], 530 ) -> Result<()> { 531 timeline.subscription = Some(ndb.subscribe(filters)?); 532 let txn = Transaction::new(ndb)?; 533 debug!( 534 "querying nostrdb sub {:?} {:?}", 535 timeline.subscription, timeline.filter 536 ); 537 let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32; 538 let notes = ndb 539 .query(&txn, filters, lim)? 540 .into_iter() 541 .map(NoteRef::from_query_result) 542 .collect(); 543 544 copy_notes_into_timeline(timeline, &txn, ndb, note_cache, notes); 545 546 Ok(()) 547 } 548 549 pub fn copy_notes_into_timeline( 550 timeline: &mut Timeline, 551 txn: &Transaction, 552 ndb: &Ndb, 553 note_cache: &mut NoteCache, 554 notes: Vec<NoteRef>, 555 ) { 556 let filters = { 557 let views = &timeline.views; 558 let filters: Vec<fn(&CachedNote, &Note) -> bool> = 559 views.iter().map(|v| v.filter.filter()).collect(); 560 filters 561 }; 562 563 for note_ref in notes { 564 for (view, filter) in filters.iter().enumerate() { 565 if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) { 566 if filter( 567 note_cache.cached_note_or_insert_mut(note_ref.key, ¬e), 568 ¬e, 569 ) { 570 timeline.views[view].notes.push(note_ref) 571 } 572 } 573 } 574 } 575 } 576 577 pub fn setup_initial_nostrdb_subs( 578 ndb: &Ndb, 579 note_cache: &mut NoteCache, 580 columns: &mut Columns, 581 ) -> Result<()> { 582 for timeline in columns.timelines_mut() { 583 if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline) { 584 error!("setup_initial_nostrdb_subs: {err}"); 585 } 586 } 587 588 Ok(()) 589 } 590 591 fn setup_timeline_nostrdb_sub( 592 ndb: &Ndb, 593 note_cache: &mut NoteCache, 594 timeline: &mut Timeline, 595 ) -> Result<()> { 596 let filter_state = timeline 597 .filter 598 .get_any_ready() 599 .ok_or(Error::empty_contact_list())? 600 .to_owned(); 601 602 setup_initial_timeline(ndb, timeline, note_cache, &filter_state)?; 603 604 Ok(()) 605 } 606 607 /// Check our timeline filter and see if we have any filter data ready. 608 /// Our timelines may require additional data before it is functional. For 609 /// example, when we have to fetch a contact list before we do the actual 610 /// following list query. 611 pub fn is_timeline_ready( 612 ndb: &Ndb, 613 pool: &mut RelayPool, 614 note_cache: &mut NoteCache, 615 timeline: &mut Timeline, 616 ) -> bool { 617 // TODO: we should debounce the filter states a bit to make sure we have 618 // seen all of the different contact lists from each relay 619 if let Some(_f) = timeline.filter.get_any_ready() { 620 return true; 621 } 622 623 let (relay_id, sub) = if let Some((relay_id, sub)) = timeline.filter.get_any_gotremote() { 624 (relay_id.to_string(), sub) 625 } else { 626 return false; 627 }; 628 629 // We got at least one eose for our filter request. Let's see 630 // if nostrdb is done processing it yet. 631 let res = ndb.poll_for_notes(sub, 1); 632 if res.is_empty() { 633 debug!( 634 "check_timeline_filter_state: no notes found (yet?) for timeline {:?}", 635 timeline 636 ); 637 return false; 638 } 639 640 info!("notes found for contact timeline after GotRemote!"); 641 642 let note_key = res[0]; 643 644 let filter = { 645 let txn = Transaction::new(ndb).expect("txn"); 646 let note = ndb.get_note_by_key(&txn, note_key).expect("note"); 647 filter::filter_from_tags(¬e).map(|f| f.into_follow_filter()) 648 }; 649 650 // TODO: into_follow_filter is hardcoded to contact lists, let's generalize 651 match filter { 652 Err(Error::Filter(e)) => { 653 error!("got broken when building filter {e}"); 654 timeline 655 .filter 656 .set_relay_state(relay_id, FilterState::broken(e)); 657 false 658 } 659 Err(err) => { 660 error!("got broken when building filter {err}"); 661 timeline 662 .filter 663 .set_relay_state(relay_id, FilterState::broken(FilterError::EmptyContactList)); 664 false 665 } 666 Ok(filter) => { 667 // we just switched to the ready state, we should send initial 668 // queries and setup the local subscription 669 info!("Found contact list! Setting up local and remote contact list query"); 670 setup_initial_timeline(ndb, timeline, note_cache, &filter).expect("setup init"); 671 timeline 672 .filter 673 .set_relay_state(relay_id, FilterState::ready(filter.clone())); 674 675 //let ck = &timeline.kind; 676 //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); 677 let subid = subscriptions::new_sub_id(); 678 pool.subscribe(subid, filter); 679 true 680 } 681 } 682 }