multi_subscriber.rs (4236B)
1 use enostr::{Filter, RelayPool}; 2 use nostrdb::{Ndb, Note, Transaction}; 3 use tracing::{debug, error, info}; 4 use uuid::Uuid; 5 6 use crate::Error; 7 use notedeck::{MuteFun, NoteRef, UnifiedSubscription}; 8 9 pub struct MultiSubscriber { 10 filters: Vec<Filter>, 11 sub: Option<UnifiedSubscription>, 12 subscribers: u32, 13 } 14 15 impl MultiSubscriber { 16 pub fn new(filters: Vec<Filter>) -> Self { 17 Self { 18 filters, 19 sub: None, 20 subscribers: 0, 21 } 22 } 23 24 fn real_subscribe( 25 ndb: &Ndb, 26 pool: &mut RelayPool, 27 filters: Vec<Filter>, 28 ) -> Option<UnifiedSubscription> { 29 let subid = Uuid::new_v4().to_string(); 30 let sub = ndb.subscribe(&filters).ok()?; 31 32 pool.subscribe(subid.clone(), filters); 33 34 Some(UnifiedSubscription { 35 local: sub, 36 remote: subid, 37 }) 38 } 39 40 pub fn unsubscribe(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { 41 if self.subscribers == 0 { 42 error!("No subscribers to unsubscribe from"); 43 return; 44 } 45 46 self.subscribers -= 1; 47 if self.subscribers == 0 { 48 let sub = match self.sub { 49 Some(ref sub) => sub, 50 None => { 51 error!("No remote subscription to unsubscribe from"); 52 return; 53 } 54 }; 55 let local_sub = &sub.local; 56 if let Err(e) = ndb.unsubscribe(*local_sub) { 57 error!( 58 "failed to unsubscribe from object: {e}, subid:{}, {} active subscriptions", 59 local_sub.id(), 60 ndb.subscription_count() 61 ); 62 } else { 63 info!( 64 "Unsubscribed from object subid:{}. {} active subscriptions", 65 local_sub.id(), 66 ndb.subscription_count() 67 ); 68 } 69 70 // unsub from remote 71 pool.unsubscribe(sub.remote.clone()); 72 self.sub = None; 73 } else { 74 info!( 75 "Locally unsubscribing. {} active ndb subscriptions. {} active subscriptions for this object", 76 ndb.subscription_count(), 77 self.subscribers, 78 ); 79 } 80 } 81 82 pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { 83 self.subscribers += 1; 84 if self.subscribers == 1 { 85 if self.sub.is_some() { 86 error!("Object is first subscriber, but it already had remote subscription"); 87 return; 88 } 89 90 self.sub = Self::real_subscribe(ndb, pool, self.filters.clone()); 91 info!( 92 "Remotely subscribing to object. {} total active subscriptions, {} on this object", 93 ndb.subscription_count(), 94 self.subscribers, 95 ); 96 97 if self.sub.is_none() { 98 error!("Error subscribing remotely to object"); 99 } 100 } else { 101 info!( 102 "Locally subscribing. {} total active subscriptions, {} for this object", 103 ndb.subscription_count(), 104 self.subscribers, 105 ) 106 } 107 } 108 109 pub fn poll_for_notes( 110 &mut self, 111 ndb: &Ndb, 112 txn: &Transaction, 113 is_muted: &MuteFun, 114 ) -> Result<Vec<NoteRef>, Error> { 115 let sub = self.sub.as_ref().ok_or(notedeck::Error::no_active_sub())?; 116 let new_note_keys = ndb.poll_for_notes(sub.local, 500); 117 118 if new_note_keys.is_empty() { 119 return Ok(vec![]); 120 } else { 121 debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys); 122 } 123 124 let mut notes: Vec<Note<'_>> = Vec::with_capacity(new_note_keys.len()); 125 for key in new_note_keys { 126 let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { 127 note 128 } else { 129 continue; 130 }; 131 132 if is_muted(¬e) { 133 continue; 134 } 135 136 notes.push(note); 137 } 138 139 let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect(); 140 141 Ok(note_refs) 142 } 143 }