multi_subscriber.rs (4229B)
1 use enostr::{Filter, RelayPool}; 2 use nostrdb::{Ndb, Note, Transaction}; 3 use tracing::{debug, error, info}; 4 use uuid::Uuid; 5 6 use crate::{filter::UnifiedSubscription, muted::MuteFun, note::NoteRef, Error}; 7 8 pub struct MultiSubscriber { 9 filters: Vec<Filter>, 10 sub: Option<UnifiedSubscription>, 11 subscribers: u32, 12 } 13 14 impl MultiSubscriber { 15 pub fn new(filters: Vec<Filter>) -> Self { 16 Self { 17 filters, 18 sub: None, 19 subscribers: 0, 20 } 21 } 22 23 fn real_subscribe( 24 ndb: &Ndb, 25 pool: &mut RelayPool, 26 filters: Vec<Filter>, 27 ) -> Option<UnifiedSubscription> { 28 let subid = Uuid::new_v4().to_string(); 29 let sub = ndb.subscribe(&filters).ok()?; 30 31 pool.subscribe(subid.clone(), filters); 32 33 Some(UnifiedSubscription { 34 local: sub, 35 remote: subid, 36 }) 37 } 38 39 pub fn unsubscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { 40 if self.subscribers == 0 { 41 error!("No subscribers to unsubscribe from"); 42 return; 43 } 44 45 self.subscribers -= 1; 46 if self.subscribers == 0 { 47 let sub = match self.sub { 48 Some(ref sub) => sub, 49 None => { 50 error!("No remote subscription to unsubscribe from"); 51 return; 52 } 53 }; 54 let local_sub = &sub.local; 55 if let Err(e) = ndb.unsubscribe(*local_sub) { 56 error!( 57 "failed to unsubscribe from object: {e}, subid:{}, {} active subscriptions", 58 local_sub.id(), 59 ndb.subscription_count() 60 ); 61 } else { 62 info!( 63 "Unsubscribed from object subid:{}. {} active subscriptions", 64 local_sub.id(), 65 ndb.subscription_count() 66 ); 67 } 68 69 // unsub from remote 70 pool.unsubscribe(sub.remote.clone()); 71 self.sub = None; 72 } else { 73 info!( 74 "Locally unsubscribing. {} active ndb subscriptions. {} active subscriptions for this object", 75 ndb.subscription_count(), 76 self.subscribers, 77 ); 78 } 79 } 80 81 pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { 82 self.subscribers += 1; 83 if self.subscribers == 1 { 84 if self.sub.is_some() { 85 error!("Object is first subscriber, but it already had remote subscription"); 86 return; 87 } 88 89 self.sub = Self::real_subscribe(ndb, pool, self.filters.clone()); 90 info!( 91 "Remotely subscribing to object. {} total active subscriptions, {} on this object", 92 ndb.subscription_count(), 93 self.subscribers, 94 ); 95 96 if self.sub.is_none() { 97 error!("Error subscribing remotely to object"); 98 } 99 } else { 100 info!( 101 "Locally subscribing. {} total active subscriptions, {} for this object", 102 ndb.subscription_count(), 103 self.subscribers, 104 ) 105 } 106 } 107 108 pub fn poll_for_notes( 109 &mut self, 110 ndb: &Ndb, 111 txn: &Transaction, 112 is_muted: &MuteFun, 113 ) -> Result<Vec<NoteRef>, Error> { 114 let sub = self.sub.as_ref().ok_or(Error::no_active_sub())?; 115 let new_note_keys = ndb.poll_for_notes(sub.local, 500); 116 117 if new_note_keys.is_empty() { 118 return Ok(vec![]); 119 } else { 120 debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys); 121 } 122 123 let mut notes: Vec<Note<'_>> = Vec::with_capacity(new_note_keys.len()); 124 for key in new_note_keys { 125 let note = if let Ok(note) = ndb.get_note_by_key(txn, key) { 126 note 127 } else { 128 continue; 129 }; 130 131 if is_muted(¬e) { 132 continue; 133 } 134 135 notes.push(note); 136 } 137 138 let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect(); 139 140 Ok(note_refs) 141 } 142 }