notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

mod.rs (21092B)


      1 use crate::{
      2     column::Columns,
      3     error::{Error, FilterError},
      4     filter::{self, FilterState, FilterStates},
      5     note::NoteRef,
      6     notecache::{CachedNote, NoteCache},
      7     subscriptions::{self, SubKind, Subscriptions},
      8     unknowns::UnknownIds,
      9     Result,
     10 };
     11 
     12 use std::fmt;
     13 use std::sync::atomic::{AtomicU32, Ordering};
     14 
     15 use egui_virtual_list::VirtualList;
     16 use enostr::{Relay, RelayPool};
     17 use nostrdb::{Filter, Ndb, Note, Subscription, Transaction};
     18 use serde::{Deserialize, Serialize};
     19 use std::cell::RefCell;
     20 use std::hash::Hash;
     21 use std::rc::Rc;
     22 
     23 use tracing::{debug, error, info, warn};
     24 
     25 pub mod kind;
     26 pub mod route;
     27 
     28 pub use kind::{PubkeySource, TimelineKind};
     29 pub use route::TimelineRoute;
     30 
     31 #[derive(Debug, Hash, Copy, Clone, Eq, PartialEq, serde::Serialize, serde::Deserialize)]
     32 pub struct TimelineId(u32);
     33 
     34 impl TimelineId {
     35     pub fn new(id: u32) -> Self {
     36         TimelineId(id)
     37     }
     38 }
     39 
     40 impl fmt::Display for TimelineId {
     41     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
     42         write!(f, "TimelineId({})", self.0)
     43     }
     44 }
     45 
     46 #[derive(Copy, Clone, Eq, PartialEq, Debug, Default)]
     47 pub enum ViewFilter {
     48     Notes,
     49 
     50     #[default]
     51     NotesAndReplies,
     52 }
     53 
     54 impl ViewFilter {
     55     pub fn name(&self) -> &'static str {
     56         match self {
     57             ViewFilter::Notes => "Notes",
     58             ViewFilter::NotesAndReplies => "Notes & Replies",
     59         }
     60     }
     61 
     62     pub fn index(&self) -> usize {
     63         match self {
     64             ViewFilter::Notes => 0,
     65             ViewFilter::NotesAndReplies => 1,
     66         }
     67     }
     68 
     69     pub fn filter_notes(cache: &CachedNote, note: &Note) -> bool {
     70         !cache.reply.borrow(note.tags()).is_reply()
     71     }
     72 
     73     fn identity(_cache: &CachedNote, _note: &Note) -> bool {
     74         true
     75     }
     76 
     77     pub fn filter(&self) -> fn(&CachedNote, &Note) -> bool {
     78         match self {
     79             ViewFilter::Notes => ViewFilter::filter_notes,
     80             ViewFilter::NotesAndReplies => ViewFilter::identity,
     81         }
     82     }
     83 }
     84 
     85 /// A timeline view is a filtered view of notes in a timeline. Two standard views
     86 /// are "Notes" and "Notes & Replies". A timeline is associated with a Filter,
     87 /// but a TimelineTab is a further filtered view of this Filter that can't
     88 /// be captured by a Filter itself.
     89 #[derive(Default, Debug)]
     90 pub struct TimelineTab {
     91     pub notes: Vec<NoteRef>,
     92     pub selection: i32,
     93     pub filter: ViewFilter,
     94     pub list: Rc<RefCell<VirtualList>>,
     95 }
     96 
     97 impl TimelineTab {
     98     pub fn new(filter: ViewFilter) -> Self {
     99         TimelineTab::new_with_capacity(filter, 1000)
    100     }
    101 
    102     pub fn new_with_capacity(filter: ViewFilter, cap: usize) -> Self {
    103         let selection = 0i32;
    104         let mut list = VirtualList::new();
    105         list.hide_on_resize(None);
    106         list.over_scan(1000.0);
    107         let list = Rc::new(RefCell::new(list));
    108         let notes: Vec<NoteRef> = Vec::with_capacity(cap);
    109 
    110         TimelineTab {
    111             notes,
    112             selection,
    113             filter,
    114             list,
    115         }
    116     }
    117 
    118     pub fn insert(&mut self, new_refs: &[NoteRef], reversed: bool) {
    119         if new_refs.is_empty() {
    120             return;
    121         }
    122         let num_prev_items = self.notes.len();
    123         let (notes, merge_kind) = crate::timeline::merge_sorted_vecs(&self.notes, new_refs);
    124 
    125         self.notes = notes;
    126         let new_items = self.notes.len() - num_prev_items;
    127 
    128         // TODO: technically items could have been added inbetween
    129         if new_items > 0 {
    130             let mut list = self.list.borrow_mut();
    131 
    132             match merge_kind {
    133                 // TODO: update egui_virtual_list to support spliced inserts
    134                 MergeKind::Spliced => {
    135                     debug!(
    136                         "spliced when inserting {} new notes, resetting virtual list",
    137                         new_refs.len()
    138                     );
    139                     list.reset();
    140                 }
    141                 MergeKind::FrontInsert => {
    142                     // only run this logic if we're reverse-chronological
    143                     // reversed in this case means chronological, since the
    144                     // default is reverse-chronological. yeah it's confusing.
    145                     if !reversed {
    146                         list.items_inserted_at_start(new_items);
    147                     }
    148                 }
    149             }
    150         }
    151     }
    152 
    153     pub fn select_down(&mut self) {
    154         debug!("select_down {}", self.selection + 1);
    155         if self.selection + 1 > self.notes.len() as i32 {
    156             return;
    157         }
    158 
    159         self.selection += 1;
    160     }
    161 
    162     pub fn select_up(&mut self) {
    163         debug!("select_up {}", self.selection - 1);
    164         if self.selection - 1 < 0 {
    165             return;
    166         }
    167 
    168         self.selection -= 1;
    169     }
    170 }
    171 
    172 /// A column in a deck. Holds navigation state, loaded notes, column kind, etc.
    173 #[derive(Debug)]
    174 pub struct Timeline {
    175     pub id: TimelineId,
    176     pub kind: TimelineKind,
    177     // We may not have the filter loaded yet, so let's make it an option so
    178     // that codepaths have to explicitly handle it
    179     pub filter: FilterStates,
    180     pub views: Vec<TimelineTab>,
    181     pub selected_view: i32,
    182 
    183     /// Our nostrdb subscription
    184     pub subscription: Option<Subscription>,
    185 }
    186 
    187 #[derive(Serialize, Deserialize, Clone, Debug)]
    188 pub struct SerializableTimeline {
    189     pub id: TimelineId,
    190     pub kind: TimelineKind,
    191 }
    192 
    193 impl SerializableTimeline {
    194     pub fn into_timeline(self, ndb: &Ndb, deck_user_pubkey: Option<&[u8; 32]>) -> Option<Timeline> {
    195         self.kind.into_timeline(ndb, deck_user_pubkey)
    196     }
    197 }
    198 
    199 impl Timeline {
    200     /// Create a timeline from a contact list
    201     pub fn contact_list(contact_list: &Note, pk_src: PubkeySource) -> Result<Self> {
    202         let filter = filter::filter_from_tags(contact_list)?.into_follow_filter();
    203 
    204         Ok(Timeline::new(
    205             TimelineKind::contact_list(pk_src),
    206             FilterState::ready(filter),
    207         ))
    208     }
    209 
    210     pub fn make_view_id(id: TimelineId, selected_view: i32) -> egui::Id {
    211         egui::Id::new((id, selected_view))
    212     }
    213 
    214     pub fn view_id(&self) -> egui::Id {
    215         Timeline::make_view_id(self.id, self.selected_view)
    216     }
    217 
    218     pub fn new(kind: TimelineKind, filter_state: FilterState) -> Self {
    219         // global unique id for all new timelines
    220         static UIDS: AtomicU32 = AtomicU32::new(0);
    221 
    222         let filter = FilterStates::new(filter_state);
    223         let subscription: Option<Subscription> = None;
    224         let notes = TimelineTab::new(ViewFilter::Notes);
    225         let replies = TimelineTab::new(ViewFilter::NotesAndReplies);
    226         let views = vec![notes, replies];
    227         let selected_view = 0;
    228         let id = TimelineId::new(UIDS.fetch_add(1, Ordering::Relaxed));
    229 
    230         Timeline {
    231             id,
    232             kind,
    233             filter,
    234             views,
    235             subscription,
    236             selected_view,
    237         }
    238     }
    239 
    240     pub fn current_view(&self) -> &TimelineTab {
    241         &self.views[self.selected_view as usize]
    242     }
    243 
    244     pub fn current_view_mut(&mut self) -> &mut TimelineTab {
    245         &mut self.views[self.selected_view as usize]
    246     }
    247 
    248     pub fn notes(&self, view: ViewFilter) -> &[NoteRef] {
    249         &self.views[view.index()].notes
    250     }
    251 
    252     pub fn view(&self, view: ViewFilter) -> &TimelineTab {
    253         &self.views[view.index()]
    254     }
    255 
    256     pub fn view_mut(&mut self, view: ViewFilter) -> &mut TimelineTab {
    257         &mut self.views[view.index()]
    258     }
    259 
    260     pub fn poll_notes_into_view(
    261         timeline_idx: usize,
    262         mut timelines: Vec<&mut Timeline>,
    263         ndb: &Ndb,
    264         txn: &Transaction,
    265         unknown_ids: &mut UnknownIds,
    266         note_cache: &mut NoteCache,
    267     ) -> Result<()> {
    268         let timeline = timelines
    269             .get_mut(timeline_idx)
    270             .ok_or(Error::TimelineNotFound)?;
    271         let sub = timeline.subscription.ok_or(Error::no_active_sub())?;
    272 
    273         let new_note_ids = ndb.poll_for_notes(sub, 500);
    274         if new_note_ids.is_empty() {
    275             return Ok(());
    276         } else {
    277             debug!("{} new notes! {:?}", new_note_ids.len(), new_note_ids);
    278         }
    279 
    280         let mut new_refs: Vec<(Note, NoteRef)> = Vec::with_capacity(new_note_ids.len());
    281 
    282         for key in new_note_ids {
    283             let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
    284                 note
    285             } else {
    286                 error!("hit race condition in poll_notes_into_view: https://github.com/damus-io/nostrdb/issues/35 note {:?} was not added to timeline", key);
    287                 continue;
    288             };
    289 
    290             UnknownIds::update_from_note(txn, ndb, unknown_ids, note_cache, &note);
    291 
    292             let created_at = note.created_at();
    293             new_refs.push((note, NoteRef { key, created_at }));
    294         }
    295 
    296         // We're assuming reverse-chronological here (timelines). This
    297         // flag ensures we trigger the items_inserted_at_start
    298         // optimization in VirtualList. We need this flag because we can
    299         // insert notes into chronological order sometimes, and this
    300         // optimization doesn't make sense in those situations.
    301         let reversed = false;
    302 
    303         // ViewFilter::NotesAndReplies
    304         {
    305             let refs: Vec<NoteRef> = new_refs.iter().map(|(_note, nr)| *nr).collect();
    306 
    307             let reversed = false;
    308             timeline
    309                 .view_mut(ViewFilter::NotesAndReplies)
    310                 .insert(&refs, reversed);
    311         }
    312 
    313         //
    314         // handle the filtered case (ViewFilter::Notes, no replies)
    315         //
    316         // TODO(jb55): this is mostly just copied from above, let's just use a loop
    317         //             I initially tried this but ran into borrow checker issues
    318         {
    319             let mut filtered_refs = Vec::with_capacity(new_refs.len());
    320             for (note, nr) in &new_refs {
    321                 let cached_note = note_cache.cached_note_or_insert(nr.key, note);
    322 
    323                 if ViewFilter::filter_notes(cached_note, note) {
    324                     filtered_refs.push(*nr);
    325                 }
    326             }
    327 
    328             timeline
    329                 .view_mut(ViewFilter::Notes)
    330                 .insert(&filtered_refs, reversed);
    331         }
    332 
    333         Ok(())
    334     }
    335 
    336     pub fn as_serializable_timeline(&self) -> SerializableTimeline {
    337         SerializableTimeline {
    338             id: self.id,
    339             kind: self.kind.clone(),
    340         }
    341     }
    342 }
    343 
    344 pub enum MergeKind {
    345     FrontInsert,
    346     Spliced,
    347 }
    348 
    349 pub fn merge_sorted_vecs<T: Ord + Copy>(vec1: &[T], vec2: &[T]) -> (Vec<T>, MergeKind) {
    350     let mut merged = Vec::with_capacity(vec1.len() + vec2.len());
    351     let mut i = 0;
    352     let mut j = 0;
    353     let mut result: Option<MergeKind> = None;
    354 
    355     while i < vec1.len() && j < vec2.len() {
    356         if vec1[i] <= vec2[j] {
    357             if result.is_none() && j < vec2.len() {
    358                 // if we're pushing from our large list and still have
    359                 // some left in vec2, then this is a splice
    360                 result = Some(MergeKind::Spliced);
    361             }
    362             merged.push(vec1[i]);
    363             i += 1;
    364         } else {
    365             merged.push(vec2[j]);
    366             j += 1;
    367         }
    368     }
    369 
    370     // Append any remaining elements from either vector
    371     if i < vec1.len() {
    372         merged.extend_from_slice(&vec1[i..]);
    373     }
    374     if j < vec2.len() {
    375         merged.extend_from_slice(&vec2[j..]);
    376     }
    377 
    378     (merged, result.unwrap_or(MergeKind::FrontInsert))
    379 }
    380 
    381 /// When adding a new timeline, we may have a situation where the
    382 /// FilterState is NeedsRemote. This can happen if we don't yet have the
    383 /// contact list, etc. For these situations, we query all of the relays
    384 /// with the same sub_id. We keep track of this sub_id and update the
    385 /// filter with the latest version of the returned filter (ie contact
    386 /// list) when they arrive.
    387 ///
    388 /// We do this by maintaining this sub_id in the filter state, even when
    389 /// in the ready state. See: [`FilterReady`]
    390 pub fn setup_new_timeline(
    391     timeline: &mut Timeline,
    392     ndb: &Ndb,
    393     subs: &mut Subscriptions,
    394     pool: &mut RelayPool,
    395     note_cache: &mut NoteCache,
    396     since_optimize: bool,
    397 ) {
    398     // if we're ready, setup local subs
    399     if is_timeline_ready(ndb, pool, note_cache, timeline) {
    400         if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline) {
    401             error!("setup_new_timeline: {err}");
    402         }
    403     }
    404 
    405     for relay in &mut pool.relays {
    406         send_initial_timeline_filter(ndb, since_optimize, subs, &mut relay.relay, timeline);
    407     }
    408 }
    409 
    410 /// Send initial filters for a specific relay. This typically gets called
    411 /// when we first connect to a new relay for the first time. For
    412 /// situations where you are adding a new timeline, use
    413 /// setup_new_timeline.
    414 pub fn send_initial_timeline_filters(
    415     ndb: &Ndb,
    416     since_optimize: bool,
    417     columns: &mut Columns,
    418     subs: &mut Subscriptions,
    419     pool: &mut RelayPool,
    420     relay_id: &str,
    421 ) -> Option<()> {
    422     info!("Sending initial filters to {}", relay_id);
    423     let relay = &mut pool
    424         .relays
    425         .iter_mut()
    426         .find(|r| r.relay.url == relay_id)?
    427         .relay;
    428 
    429     for timeline in columns.timelines_mut() {
    430         send_initial_timeline_filter(ndb, since_optimize, subs, relay, timeline);
    431     }
    432 
    433     Some(())
    434 }
    435 
    436 pub fn send_initial_timeline_filter(
    437     ndb: &Ndb,
    438     can_since_optimize: bool,
    439     subs: &mut Subscriptions,
    440     relay: &mut Relay,
    441     timeline: &mut Timeline,
    442 ) {
    443     let filter_state = timeline.filter.get(&relay.url);
    444 
    445     match filter_state {
    446         FilterState::Broken(err) => {
    447             error!(
    448                 "FetchingRemote state in broken state when sending initial timeline filter? {err}"
    449             );
    450         }
    451 
    452         FilterState::FetchingRemote(_unisub) => {
    453             error!("FetchingRemote state when sending initial timeline filter?");
    454         }
    455 
    456         FilterState::GotRemote(_sub) => {
    457             error!("GotRemote state when sending initial timeline filter?");
    458         }
    459 
    460         FilterState::Ready(filter) => {
    461             let filter = filter.to_owned();
    462             let new_filters = filter.into_iter().map(|f| {
    463                 // limit the size of remote filters
    464                 let default_limit = filter::default_remote_limit();
    465                 let mut lim = f.limit().unwrap_or(default_limit);
    466                 let mut filter = f;
    467                 if lim > default_limit {
    468                     lim = default_limit;
    469                     filter = filter.limit_mut(lim);
    470                 }
    471 
    472                 let notes = timeline.notes(ViewFilter::NotesAndReplies);
    473 
    474                 // Should we since optimize? Not always. For example
    475                 // if we only have a few notes locally. One way to
    476                 // determine this is by looking at the current filter
    477                 // and seeing what its limit is. If we have less
    478                 // notes than the limit, we might want to backfill
    479                 // older notes
    480                 if can_since_optimize && filter::should_since_optimize(lim, notes.len()) {
    481                     filter = filter::since_optimize_filter(filter, notes);
    482                 } else {
    483                     warn!("Skipping since optimization for {:?}: number of local notes is less than limit, attempting to backfill.", filter);
    484                 }
    485 
    486                 filter
    487             }).collect();
    488 
    489             //let sub_id = damus.gen_subid(&SubKind::Initial);
    490             let sub_id = subscriptions::new_sub_id();
    491             subs.subs.insert(sub_id.clone(), SubKind::Initial);
    492 
    493             relay.subscribe(sub_id, new_filters);
    494         }
    495 
    496         // we need some data first
    497         FilterState::NeedsRemote(filter) => {
    498             fetch_contact_list(filter.to_owned(), ndb, subs, relay, timeline)
    499         }
    500     }
    501 }
    502 
    503 fn fetch_contact_list(
    504     filter: Vec<Filter>,
    505     ndb: &Ndb,
    506     subs: &mut Subscriptions,
    507     relay: &mut Relay,
    508     timeline: &mut Timeline,
    509 ) {
    510     let sub_kind = SubKind::FetchingContactList(timeline.id);
    511     let sub_id = subscriptions::new_sub_id();
    512     let local_sub = ndb.subscribe(&filter).expect("sub");
    513 
    514     timeline.filter.set_relay_state(
    515         relay.url.clone(),
    516         FilterState::fetching_remote(sub_id.clone(), local_sub),
    517     );
    518 
    519     subs.subs.insert(sub_id.clone(), sub_kind);
    520 
    521     info!("fetching contact list from {}", &relay.url);
    522     relay.subscribe(sub_id, filter);
    523 }
    524 
    525 fn setup_initial_timeline(
    526     ndb: &Ndb,
    527     timeline: &mut Timeline,
    528     note_cache: &mut NoteCache,
    529     filters: &[Filter],
    530 ) -> Result<()> {
    531     timeline.subscription = Some(ndb.subscribe(filters)?);
    532     let txn = Transaction::new(ndb)?;
    533     debug!(
    534         "querying nostrdb sub {:?} {:?}",
    535         timeline.subscription, timeline.filter
    536     );
    537     let lim = filters[0].limit().unwrap_or(crate::filter::default_limit()) as i32;
    538     let notes = ndb
    539         .query(&txn, filters, lim)?
    540         .into_iter()
    541         .map(NoteRef::from_query_result)
    542         .collect();
    543 
    544     copy_notes_into_timeline(timeline, &txn, ndb, note_cache, notes);
    545 
    546     Ok(())
    547 }
    548 
    549 pub fn copy_notes_into_timeline(
    550     timeline: &mut Timeline,
    551     txn: &Transaction,
    552     ndb: &Ndb,
    553     note_cache: &mut NoteCache,
    554     notes: Vec<NoteRef>,
    555 ) {
    556     let filters = {
    557         let views = &timeline.views;
    558         let filters: Vec<fn(&CachedNote, &Note) -> bool> =
    559             views.iter().map(|v| v.filter.filter()).collect();
    560         filters
    561     };
    562 
    563     for note_ref in notes {
    564         for (view, filter) in filters.iter().enumerate() {
    565             if let Ok(note) = ndb.get_note_by_key(txn, note_ref.key) {
    566                 if filter(
    567                     note_cache.cached_note_or_insert_mut(note_ref.key, &note),
    568                     &note,
    569                 ) {
    570                     timeline.views[view].notes.push(note_ref)
    571                 }
    572             }
    573         }
    574     }
    575 }
    576 
    577 pub fn setup_initial_nostrdb_subs(
    578     ndb: &Ndb,
    579     note_cache: &mut NoteCache,
    580     columns: &mut Columns,
    581 ) -> Result<()> {
    582     for timeline in columns.timelines_mut() {
    583         if let Err(err) = setup_timeline_nostrdb_sub(ndb, note_cache, timeline) {
    584             error!("setup_initial_nostrdb_subs: {err}");
    585         }
    586     }
    587 
    588     Ok(())
    589 }
    590 
    591 fn setup_timeline_nostrdb_sub(
    592     ndb: &Ndb,
    593     note_cache: &mut NoteCache,
    594     timeline: &mut Timeline,
    595 ) -> Result<()> {
    596     let filter_state = timeline
    597         .filter
    598         .get_any_ready()
    599         .ok_or(Error::empty_contact_list())?
    600         .to_owned();
    601 
    602     setup_initial_timeline(ndb, timeline, note_cache, &filter_state)?;
    603 
    604     Ok(())
    605 }
    606 
    607 /// Check our timeline filter and see if we have any filter data ready.
    608 /// Our timelines may require additional data before it is functional. For
    609 /// example, when we have to fetch a contact list before we do the actual
    610 /// following list query.
    611 pub fn is_timeline_ready(
    612     ndb: &Ndb,
    613     pool: &mut RelayPool,
    614     note_cache: &mut NoteCache,
    615     timeline: &mut Timeline,
    616 ) -> bool {
    617     // TODO: we should debounce the filter states a bit to make sure we have
    618     // seen all of the different contact lists from each relay
    619     if let Some(_f) = timeline.filter.get_any_ready() {
    620         return true;
    621     }
    622 
    623     let (relay_id, sub) = if let Some((relay_id, sub)) = timeline.filter.get_any_gotremote() {
    624         (relay_id.to_string(), sub)
    625     } else {
    626         return false;
    627     };
    628 
    629     // We got at least one eose for our filter request. Let's see
    630     // if nostrdb is done processing it yet.
    631     let res = ndb.poll_for_notes(sub, 1);
    632     if res.is_empty() {
    633         debug!(
    634             "check_timeline_filter_state: no notes found (yet?) for timeline {:?}",
    635             timeline
    636         );
    637         return false;
    638     }
    639 
    640     info!("notes found for contact timeline after GotRemote!");
    641 
    642     let note_key = res[0];
    643 
    644     let filter = {
    645         let txn = Transaction::new(ndb).expect("txn");
    646         let note = ndb.get_note_by_key(&txn, note_key).expect("note");
    647         filter::filter_from_tags(&note).map(|f| f.into_follow_filter())
    648     };
    649 
    650     // TODO: into_follow_filter is hardcoded to contact lists, let's generalize
    651     match filter {
    652         Err(Error::Filter(e)) => {
    653             error!("got broken when building filter {e}");
    654             timeline
    655                 .filter
    656                 .set_relay_state(relay_id, FilterState::broken(e));
    657             false
    658         }
    659         Err(err) => {
    660             error!("got broken when building filter {err}");
    661             timeline
    662                 .filter
    663                 .set_relay_state(relay_id, FilterState::broken(FilterError::EmptyContactList));
    664             false
    665         }
    666         Ok(filter) => {
    667             // we just switched to the ready state, we should send initial
    668             // queries and setup the local subscription
    669             info!("Found contact list! Setting up local and remote contact list query");
    670             setup_initial_timeline(ndb, timeline, note_cache, &filter).expect("setup init");
    671             timeline
    672                 .filter
    673                 .set_relay_state(relay_id, FilterState::ready(filter.clone()));
    674 
    675             //let ck = &timeline.kind;
    676             //let subid = damus.gen_subid(&SubKind::Column(ck.clone()));
    677             let subid = subscriptions::new_sub_id();
    678             pool.subscribe(subid, filter);
    679             true
    680         }
    681     }
    682 }