notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

thread.rs (5554B)


      1 use crate::note::NoteRef;
      2 use crate::timeline::{TimelineTab, ViewFilter};
      3 use crate::Error;
      4 use nostrdb::{Filter, FilterBuilder, Ndb, Subscription, Transaction};
      5 use std::cmp::Ordering;
      6 use std::collections::HashMap;
      7 use tracing::{debug, warn};
      8 
      9 #[derive(Default)]
     10 pub struct Thread {
     11     pub view: TimelineTab,
     12     sub: Option<Subscription>,
     13     remote_sub: Option<String>,
     14     pub subscribers: i32,
     15 }
     16 
     17 #[derive(Debug, Eq, PartialEq, Copy, Clone)]
     18 pub enum DecrementResult {
     19     LastSubscriber(Subscription),
     20     ActiveSubscribers,
     21 }
     22 
     23 impl Thread {
     24     pub fn new(notes: Vec<NoteRef>) -> Self {
     25         let mut cap = ((notes.len() as f32) * 1.5) as usize;
     26         if cap == 0 {
     27             cap = 25;
     28         }
     29         let mut view = TimelineTab::new_with_capacity(ViewFilter::NotesAndReplies, cap);
     30         view.notes = notes;
     31         let sub: Option<Subscription> = None;
     32         let remote_sub: Option<String> = None;
     33         let subscribers: i32 = 0;
     34 
     35         Thread {
     36             view,
     37             sub,
     38             remote_sub,
     39             subscribers,
     40         }
     41     }
     42 
     43     /// Look for new thread notes since our last fetch
     44     pub fn new_notes(
     45         notes: &[NoteRef],
     46         root_id: &[u8; 32],
     47         txn: &Transaction,
     48         ndb: &Ndb,
     49     ) -> Vec<NoteRef> {
     50         if notes.is_empty() {
     51             return vec![];
     52         }
     53 
     54         let last_note = notes[0];
     55         let filters = Thread::filters_since(root_id, last_note.created_at + 1);
     56 
     57         if let Ok(results) = ndb.query(txn, &filters, 1000) {
     58             debug!("got {} results from thread update", results.len());
     59             results
     60                 .into_iter()
     61                 .map(NoteRef::from_query_result)
     62                 .collect()
     63         } else {
     64             debug!("got no results from thread update",);
     65             vec![]
     66         }
     67     }
     68 
     69     pub fn decrement_sub(&mut self) -> Result<DecrementResult, Error> {
     70         self.subscribers -= 1;
     71 
     72         match self.subscribers.cmp(&0) {
     73             Ordering::Equal => {
     74                 if let Some(sub) = self.subscription() {
     75                     Ok(DecrementResult::LastSubscriber(sub))
     76                 } else {
     77                     Err(Error::no_active_sub())
     78                 }
     79             }
     80             Ordering::Less => Err(Error::unexpected_sub_count(self.subscribers)),
     81             Ordering::Greater => Ok(DecrementResult::ActiveSubscribers),
     82         }
     83     }
     84 
     85     pub fn subscription(&self) -> Option<Subscription> {
     86         self.sub
     87     }
     88 
     89     pub fn remote_subscription(&self) -> &Option<String> {
     90         &self.remote_sub
     91     }
     92 
     93     pub fn remote_subscription_mut(&mut self) -> &mut Option<String> {
     94         &mut self.remote_sub
     95     }
     96 
     97     pub fn subscription_mut(&mut self) -> &mut Option<Subscription> {
     98         &mut self.sub
     99     }
    100 
    101     fn filters_raw(root: &[u8; 32]) -> Vec<FilterBuilder> {
    102         vec![
    103             nostrdb::Filter::new().kinds([1]).event(root),
    104             nostrdb::Filter::new().ids([root]).limit(1),
    105         ]
    106     }
    107 
    108     pub fn filters_since(root: &[u8; 32], since: u64) -> Vec<Filter> {
    109         Self::filters_raw(root)
    110             .into_iter()
    111             .map(|fb| fb.since(since).build())
    112             .collect()
    113     }
    114 
    115     pub fn filters(root: &[u8; 32]) -> Vec<Filter> {
    116         Self::filters_raw(root)
    117             .into_iter()
    118             .map(|mut fb| fb.build())
    119             .collect()
    120     }
    121 }
    122 
    123 #[derive(Default)]
    124 pub struct Threads {
    125     /// root id to thread
    126     pub root_id_to_thread: HashMap<[u8; 32], Thread>,
    127 }
    128 
    129 pub enum ThreadResult<'a> {
    130     Fresh(&'a mut Thread),
    131     Stale(&'a mut Thread),
    132 }
    133 
    134 impl<'a> ThreadResult<'a> {
    135     pub fn get_ptr(self) -> &'a mut Thread {
    136         match self {
    137             Self::Fresh(ptr) => ptr,
    138             Self::Stale(ptr) => ptr,
    139         }
    140     }
    141 
    142     pub fn is_stale(&self) -> bool {
    143         match self {
    144             Self::Fresh(_ptr) => false,
    145             Self::Stale(_ptr) => true,
    146         }
    147     }
    148 }
    149 
    150 impl Threads {
    151     pub fn thread_expected_mut(&mut self, root_id: &[u8; 32]) -> &mut Thread {
    152         self.root_id_to_thread
    153             .get_mut(root_id)
    154             .expect("thread_expected_mut used but there was no thread")
    155     }
    156 
    157     pub fn thread_mut<'a>(
    158         &'a mut self,
    159         ndb: &Ndb,
    160         txn: &Transaction,
    161         root_id: &[u8; 32],
    162     ) -> ThreadResult<'a> {
    163         // we can't use the naive hashmap entry API here because lookups
    164         // require a copy, wait until we have a raw entry api. We could
    165         // also use hashbrown?
    166 
    167         if self.root_id_to_thread.contains_key(root_id) {
    168             return ThreadResult::Stale(self.root_id_to_thread.get_mut(root_id).unwrap());
    169         }
    170 
    171         // we don't have the thread, query for it!
    172         let filters = Thread::filters(root_id);
    173 
    174         let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) {
    175             results
    176                 .into_iter()
    177                 .map(NoteRef::from_query_result)
    178                 .collect()
    179         } else {
    180             debug!(
    181                 "got no results from thread lookup for {}",
    182                 hex::encode(root_id)
    183             );
    184             vec![]
    185         };
    186 
    187         if notes.is_empty() {
    188             warn!("thread query returned 0 notes? ")
    189         } else {
    190             debug!("found thread with {} notes", notes.len());
    191         }
    192 
    193         self.root_id_to_thread
    194             .insert(root_id.to_owned(), Thread::new(notes));
    195         ThreadResult::Fresh(self.root_id_to_thread.get_mut(root_id).unwrap())
    196     }
    197 
    198     //fn thread_by_id(&self, ndb: &Ndb, id: &[u8; 32]) -> &mut Thread {
    199     //}
    200 }