thread.rs (5554B)
1 use crate::note::NoteRef; 2 use crate::timeline::{TimelineTab, ViewFilter}; 3 use crate::Error; 4 use nostrdb::{Filter, FilterBuilder, Ndb, Subscription, Transaction}; 5 use std::cmp::Ordering; 6 use std::collections::HashMap; 7 use tracing::{debug, warn}; 8 9 #[derive(Default)] 10 pub struct Thread { 11 pub view: TimelineTab, 12 sub: Option<Subscription>, 13 remote_sub: Option<String>, 14 pub subscribers: i32, 15 } 16 17 #[derive(Debug, Eq, PartialEq, Copy, Clone)] 18 pub enum DecrementResult { 19 LastSubscriber(Subscription), 20 ActiveSubscribers, 21 } 22 23 impl Thread { 24 pub fn new(notes: Vec<NoteRef>) -> Self { 25 let mut cap = ((notes.len() as f32) * 1.5) as usize; 26 if cap == 0 { 27 cap = 25; 28 } 29 let mut view = TimelineTab::new_with_capacity(ViewFilter::NotesAndReplies, cap); 30 view.notes = notes; 31 let sub: Option<Subscription> = None; 32 let remote_sub: Option<String> = None; 33 let subscribers: i32 = 0; 34 35 Thread { 36 view, 37 sub, 38 remote_sub, 39 subscribers, 40 } 41 } 42 43 /// Look for new thread notes since our last fetch 44 pub fn new_notes( 45 notes: &[NoteRef], 46 root_id: &[u8; 32], 47 txn: &Transaction, 48 ndb: &Ndb, 49 ) -> Vec<NoteRef> { 50 if notes.is_empty() { 51 return vec![]; 52 } 53 54 let last_note = notes[0]; 55 let filters = Thread::filters_since(root_id, last_note.created_at + 1); 56 57 if let Ok(results) = ndb.query(txn, &filters, 1000) { 58 debug!("got {} results from thread update", results.len()); 59 results 60 .into_iter() 61 .map(NoteRef::from_query_result) 62 .collect() 63 } else { 64 debug!("got no results from thread update",); 65 vec![] 66 } 67 } 68 69 pub fn decrement_sub(&mut self) -> Result<DecrementResult, Error> { 70 self.subscribers -= 1; 71 72 match self.subscribers.cmp(&0) { 73 Ordering::Equal => { 74 if let Some(sub) = self.subscription() { 75 Ok(DecrementResult::LastSubscriber(sub)) 76 } else { 77 Err(Error::no_active_sub()) 78 } 79 } 80 Ordering::Less => Err(Error::unexpected_sub_count(self.subscribers)), 81 Ordering::Greater => Ok(DecrementResult::ActiveSubscribers), 82 } 83 } 84 85 pub fn subscription(&self) -> Option<Subscription> { 86 self.sub 87 } 88 89 pub fn remote_subscription(&self) -> &Option<String> { 90 &self.remote_sub 91 } 92 93 pub fn remote_subscription_mut(&mut self) -> &mut Option<String> { 94 &mut self.remote_sub 95 } 96 97 pub fn subscription_mut(&mut self) -> &mut Option<Subscription> { 98 &mut self.sub 99 } 100 101 fn filters_raw(root: &[u8; 32]) -> Vec<FilterBuilder> { 102 vec![ 103 nostrdb::Filter::new().kinds([1]).event(root), 104 nostrdb::Filter::new().ids([root]).limit(1), 105 ] 106 } 107 108 pub fn filters_since(root: &[u8; 32], since: u64) -> Vec<Filter> { 109 Self::filters_raw(root) 110 .into_iter() 111 .map(|fb| fb.since(since).build()) 112 .collect() 113 } 114 115 pub fn filters(root: &[u8; 32]) -> Vec<Filter> { 116 Self::filters_raw(root) 117 .into_iter() 118 .map(|mut fb| fb.build()) 119 .collect() 120 } 121 } 122 123 #[derive(Default)] 124 pub struct Threads { 125 /// root id to thread 126 pub root_id_to_thread: HashMap<[u8; 32], Thread>, 127 } 128 129 pub enum ThreadResult<'a> { 130 Fresh(&'a mut Thread), 131 Stale(&'a mut Thread), 132 } 133 134 impl<'a> ThreadResult<'a> { 135 pub fn get_ptr(self) -> &'a mut Thread { 136 match self { 137 Self::Fresh(ptr) => ptr, 138 Self::Stale(ptr) => ptr, 139 } 140 } 141 142 pub fn is_stale(&self) -> bool { 143 match self { 144 Self::Fresh(_ptr) => false, 145 Self::Stale(_ptr) => true, 146 } 147 } 148 } 149 150 impl Threads { 151 pub fn thread_expected_mut(&mut self, root_id: &[u8; 32]) -> &mut Thread { 152 self.root_id_to_thread 153 .get_mut(root_id) 154 .expect("thread_expected_mut used but there was no thread") 155 } 156 157 pub fn thread_mut<'a>( 158 &'a mut self, 159 ndb: &Ndb, 160 txn: &Transaction, 161 root_id: &[u8; 32], 162 ) -> ThreadResult<'a> { 163 // we can't use the naive hashmap entry API here because lookups 164 // require a copy, wait until we have a raw entry api. We could 165 // also use hashbrown? 166 167 if self.root_id_to_thread.contains_key(root_id) { 168 return ThreadResult::Stale(self.root_id_to_thread.get_mut(root_id).unwrap()); 169 } 170 171 // we don't have the thread, query for it! 172 let filters = Thread::filters(root_id); 173 174 let notes = if let Ok(results) = ndb.query(txn, &filters, 1000) { 175 results 176 .into_iter() 177 .map(NoteRef::from_query_result) 178 .collect() 179 } else { 180 debug!( 181 "got no results from thread lookup for {}", 182 hex::encode(root_id) 183 ); 184 vec![] 185 }; 186 187 if notes.is_empty() { 188 warn!("thread query returned 0 notes? ") 189 } else { 190 debug!("found thread with {} notes", notes.len()); 191 } 192 193 self.root_id_to_thread 194 .insert(root_id.to_owned(), Thread::new(notes)); 195 ThreadResult::Fresh(self.root_id_to_thread.get_mut(root_id).unwrap()) 196 } 197 198 //fn thread_by_id(&self, ndb: &Ndb, id: &[u8; 32]) -> &mut Thread { 199 //} 200 }