notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

multi_subscriber.rs (4511B)


      1 use enostr::{Filter, RelayPool};
      2 use nostrdb::{Ndb, Subscription};
      3 use tracing::{error, info};
      4 use uuid::Uuid;
      5 
      6 #[derive(Debug)]
      7 pub struct MultiSubscriber {
      8     pub filters: Vec<Filter>,
      9     pub local_subid: Option<Subscription>,
     10     pub remote_subid: Option<String>,
     11     local_subscribers: u32,
     12     remote_subscribers: u32,
     13 }
     14 
     15 impl MultiSubscriber {
     16     /// Create a MultiSubscriber with an initial local subscription.
     17     pub fn with_initial_local_sub(sub: Subscription, filters: Vec<Filter>) -> Self {
     18         let mut msub = MultiSubscriber::new(filters);
     19         msub.local_subid = Some(sub);
     20         msub.local_subscribers = 1;
     21         msub
     22     }
     23 
     24     pub fn new(filters: Vec<Filter>) -> Self {
     25         Self {
     26             filters,
     27             local_subid: None,
     28             remote_subid: None,
     29             local_subscribers: 0,
     30             remote_subscribers: 0,
     31         }
     32     }
     33 
     34     fn unsubscribe_remote(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
     35         let remote_subid = if let Some(remote_subid) = &self.remote_subid {
     36             remote_subid
     37         } else {
     38             self.err_log(ndb, "unsubscribe_remote: nothing to unsubscribe from?");
     39             return;
     40         };
     41 
     42         pool.unsubscribe(remote_subid.clone());
     43 
     44         self.remote_subid = None;
     45     }
     46 
     47     /// Locally unsubscribe if we have one
     48     fn unsubscribe_local(&mut self, ndb: &mut Ndb) {
     49         let local_sub = if let Some(local_sub) = self.local_subid {
     50             local_sub
     51         } else {
     52             self.err_log(ndb, "unsubscribe_local: nothing to unsubscribe from?");
     53             return;
     54         };
     55 
     56         match ndb.unsubscribe(local_sub) {
     57             Err(e) => {
     58                 self.err_log(ndb, &format!("Failed to unsubscribe: {e}"));
     59             }
     60             Ok(_) => {
     61                 self.local_subid = None;
     62             }
     63         }
     64     }
     65 
     66     pub fn unsubscribe(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) -> bool {
     67         if self.local_subscribers == 0 && self.remote_subscribers == 0 {
     68             self.err_log(
     69                 ndb,
     70                 "Called multi_subscriber unsubscribe when both sub counts are 0",
     71             );
     72             return false;
     73         }
     74 
     75         self.local_subscribers = self.local_subscribers.saturating_sub(1);
     76         self.remote_subscribers = self.remote_subscribers.saturating_sub(1);
     77 
     78         if self.local_subscribers == 0 && self.remote_subscribers == 0 {
     79             self.info_log(ndb, "Locally unsubscribing");
     80             self.unsubscribe_local(ndb);
     81             self.unsubscribe_remote(ndb, pool);
     82             self.local_subscribers = 0;
     83             self.remote_subscribers = 0;
     84             true
     85         } else {
     86             false
     87         }
     88     }
     89 
     90     fn info_log(&self, ndb: &Ndb, msg: &str) {
     91         info!(
     92             "{msg}. {}/{}/{} active ndb/local/remote subscriptions.",
     93             ndb.subscription_count(),
     94             self.local_subscribers,
     95             self.remote_subscribers,
     96         );
     97     }
     98 
     99     fn err_log(&self, ndb: &Ndb, msg: &str) {
    100         error!(
    101             "{msg}. {}/{}/{} active ndb/local/remote subscriptions.",
    102             ndb.subscription_count(),
    103             self.local_subscribers,
    104             self.remote_subscribers,
    105         );
    106     }
    107 
    108     pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
    109         self.local_subscribers += 1;
    110         self.remote_subscribers += 1;
    111 
    112         if self.remote_subscribers == 1 {
    113             if self.remote_subid.is_some() {
    114                 self.err_log(
    115                     ndb,
    116                     "Object is first subscriber, but it already had a subscription",
    117                 );
    118                 return;
    119             } else {
    120                 let subid = Uuid::new_v4().to_string();
    121                 pool.subscribe(subid.clone(), self.filters.clone());
    122                 self.info_log(ndb, "First remote subscription");
    123                 self.remote_subid = Some(subid);
    124             }
    125         }
    126 
    127         if self.local_subscribers == 1 {
    128             if self.local_subid.is_some() {
    129                 self.err_log(ndb, "Should not have a local subscription already");
    130                 return;
    131             }
    132 
    133             match ndb.subscribe(&self.filters) {
    134                 Ok(sub) => {
    135                     self.info_log(ndb, "First local subscription");
    136                     self.local_subid = Some(sub);
    137                 }
    138 
    139                 Err(err) => {
    140                     error!("multi_subscriber: error subscribing locally: '{err}'")
    141                 }
    142             }
    143         }
    144     }
    145 }