multi_subscriber.rs (4511B)
1 use enostr::{Filter, RelayPool}; 2 use nostrdb::{Ndb, Subscription}; 3 use tracing::{error, info}; 4 use uuid::Uuid; 5 6 #[derive(Debug)] 7 pub struct MultiSubscriber { 8 pub filters: Vec<Filter>, 9 pub local_subid: Option<Subscription>, 10 pub remote_subid: Option<String>, 11 local_subscribers: u32, 12 remote_subscribers: u32, 13 } 14 15 impl MultiSubscriber { 16 /// Create a MultiSubscriber with an initial local subscription. 17 pub fn with_initial_local_sub(sub: Subscription, filters: Vec<Filter>) -> Self { 18 let mut msub = MultiSubscriber::new(filters); 19 msub.local_subid = Some(sub); 20 msub.local_subscribers = 1; 21 msub 22 } 23 24 pub fn new(filters: Vec<Filter>) -> Self { 25 Self { 26 filters, 27 local_subid: None, 28 remote_subid: None, 29 local_subscribers: 0, 30 remote_subscribers: 0, 31 } 32 } 33 34 fn unsubscribe_remote(&mut self, ndb: &Ndb, pool: &mut RelayPool) { 35 let remote_subid = if let Some(remote_subid) = &self.remote_subid { 36 remote_subid 37 } else { 38 self.err_log(ndb, "unsubscribe_remote: nothing to unsubscribe from?"); 39 return; 40 }; 41 42 pool.unsubscribe(remote_subid.clone()); 43 44 self.remote_subid = None; 45 } 46 47 /// Locally unsubscribe if we have one 48 fn unsubscribe_local(&mut self, ndb: &mut Ndb) { 49 let local_sub = if let Some(local_sub) = self.local_subid { 50 local_sub 51 } else { 52 self.err_log(ndb, "unsubscribe_local: nothing to unsubscribe from?"); 53 return; 54 }; 55 56 match ndb.unsubscribe(local_sub) { 57 Err(e) => { 58 self.err_log(ndb, &format!("Failed to unsubscribe: {e}")); 59 } 60 Ok(_) => { 61 self.local_subid = None; 62 } 63 } 64 } 65 66 pub fn unsubscribe(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) -> bool { 67 if self.local_subscribers == 0 && self.remote_subscribers == 0 { 68 self.err_log( 69 ndb, 70 "Called multi_subscriber unsubscribe when both sub counts are 0", 71 ); 72 return false; 73 } 74 75 self.local_subscribers = self.local_subscribers.saturating_sub(1); 76 self.remote_subscribers = self.remote_subscribers.saturating_sub(1); 77 78 if self.local_subscribers == 0 && self.remote_subscribers == 0 { 79 self.info_log(ndb, "Locally unsubscribing"); 80 self.unsubscribe_local(ndb); 81 self.unsubscribe_remote(ndb, pool); 82 self.local_subscribers = 0; 83 self.remote_subscribers = 0; 84 true 85 } else { 86 false 87 } 88 } 89 90 fn info_log(&self, ndb: &Ndb, msg: &str) { 91 info!( 92 "{msg}. {}/{}/{} active ndb/local/remote subscriptions.", 93 ndb.subscription_count(), 94 self.local_subscribers, 95 self.remote_subscribers, 96 ); 97 } 98 99 fn err_log(&self, ndb: &Ndb, msg: &str) { 100 error!( 101 "{msg}. {}/{}/{} active ndb/local/remote subscriptions.", 102 ndb.subscription_count(), 103 self.local_subscribers, 104 self.remote_subscribers, 105 ); 106 } 107 108 pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) { 109 self.local_subscribers += 1; 110 self.remote_subscribers += 1; 111 112 if self.remote_subscribers == 1 { 113 if self.remote_subid.is_some() { 114 self.err_log( 115 ndb, 116 "Object is first subscriber, but it already had a subscription", 117 ); 118 return; 119 } else { 120 let subid = Uuid::new_v4().to_string(); 121 pool.subscribe(subid.clone(), self.filters.clone()); 122 self.info_log(ndb, "First remote subscription"); 123 self.remote_subid = Some(subid); 124 } 125 } 126 127 if self.local_subscribers == 1 { 128 if self.local_subid.is_some() { 129 self.err_log(ndb, "Should not have a local subscription already"); 130 return; 131 } 132 133 match ndb.subscribe(&self.filters) { 134 Ok(sub) => { 135 self.info_log(ndb, "First local subscription"); 136 self.local_subid = Some(sub); 137 } 138 139 Err(err) => { 140 error!("multi_subscriber: error subscribing locally: '{err}'") 141 } 142 } 143 } 144 } 145 }