notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

multi_subscriber.rs (4236B)


      1 use enostr::{Filter, RelayPool};
      2 use nostrdb::{Ndb, Note, Transaction};
      3 use tracing::{debug, error, info};
      4 use uuid::Uuid;
      5 
      6 use crate::Error;
      7 use notedeck::{MuteFun, NoteRef, UnifiedSubscription};
      8 
      9 pub struct MultiSubscriber {
     10     filters: Vec<Filter>,
     11     sub: Option<UnifiedSubscription>,
     12     subscribers: u32,
     13 }
     14 
     15 impl MultiSubscriber {
     16     pub fn new(filters: Vec<Filter>) -> Self {
     17         Self {
     18             filters,
     19             sub: None,
     20             subscribers: 0,
     21         }
     22     }
     23 
     24     fn real_subscribe(
     25         ndb: &Ndb,
     26         pool: &mut RelayPool,
     27         filters: Vec<Filter>,
     28     ) -> Option<UnifiedSubscription> {
     29         let subid = Uuid::new_v4().to_string();
     30         let sub = ndb.subscribe(&filters).ok()?;
     31 
     32         pool.subscribe(subid.clone(), filters);
     33 
     34         Some(UnifiedSubscription {
     35             local: sub,
     36             remote: subid,
     37         })
     38     }
     39 
     40     pub fn unsubscribe(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) {
     41         if self.subscribers == 0 {
     42             error!("No subscribers to unsubscribe from");
     43             return;
     44         }
     45 
     46         self.subscribers -= 1;
     47         if self.subscribers == 0 {
     48             let sub = match self.sub {
     49                 Some(ref sub) => sub,
     50                 None => {
     51                     error!("No remote subscription to unsubscribe from");
     52                     return;
     53                 }
     54             };
     55             let local_sub = &sub.local;
     56             if let Err(e) = ndb.unsubscribe(*local_sub) {
     57                 error!(
     58                     "failed to unsubscribe from object: {e}, subid:{}, {} active subscriptions",
     59                     local_sub.id(),
     60                     ndb.subscription_count()
     61                 );
     62             } else {
     63                 info!(
     64                     "Unsubscribed from object subid:{}. {} active subscriptions",
     65                     local_sub.id(),
     66                     ndb.subscription_count()
     67                 );
     68             }
     69 
     70             // unsub from remote
     71             pool.unsubscribe(sub.remote.clone());
     72             self.sub = None;
     73         } else {
     74             info!(
     75                 "Locally unsubscribing. {} active ndb subscriptions. {} active subscriptions for this object",
     76                 ndb.subscription_count(),
     77                 self.subscribers,
     78             );
     79         }
     80     }
     81 
     82     pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
     83         self.subscribers += 1;
     84         if self.subscribers == 1 {
     85             if self.sub.is_some() {
     86                 error!("Object is first subscriber, but it already had remote subscription");
     87                 return;
     88             }
     89 
     90             self.sub = Self::real_subscribe(ndb, pool, self.filters.clone());
     91             info!(
     92                 "Remotely subscribing to object. {} total active subscriptions, {} on this object",
     93                 ndb.subscription_count(),
     94                 self.subscribers,
     95             );
     96 
     97             if self.sub.is_none() {
     98                 error!("Error subscribing remotely to object");
     99             }
    100         } else {
    101             info!(
    102                 "Locally subscribing. {} total active subscriptions, {} for this object",
    103                 ndb.subscription_count(),
    104                 self.subscribers,
    105             )
    106         }
    107     }
    108 
    109     pub fn poll_for_notes(
    110         &mut self,
    111         ndb: &Ndb,
    112         txn: &Transaction,
    113         is_muted: &MuteFun,
    114     ) -> Result<Vec<NoteRef>, Error> {
    115         let sub = self.sub.as_ref().ok_or(notedeck::Error::no_active_sub())?;
    116         let new_note_keys = ndb.poll_for_notes(sub.local, 500);
    117 
    118         if new_note_keys.is_empty() {
    119             return Ok(vec![]);
    120         } else {
    121             debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys);
    122         }
    123 
    124         let mut notes: Vec<Note<'_>> = Vec::with_capacity(new_note_keys.len());
    125         for key in new_note_keys {
    126             let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
    127                 note
    128             } else {
    129                 continue;
    130             };
    131 
    132             if is_muted(&note) {
    133                 continue;
    134             }
    135 
    136             notes.push(note);
    137         }
    138 
    139         let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect();
    140 
    141         Ok(note_refs)
    142     }
    143 }