notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

multi_subscriber.rs (4229B)


      1 use enostr::{Filter, RelayPool};
      2 use nostrdb::{Ndb, Note, Transaction};
      3 use tracing::{debug, error, info};
      4 use uuid::Uuid;
      5 
      6 use crate::{filter::UnifiedSubscription, muted::MuteFun, note::NoteRef, Error};
      7 
      8 pub struct MultiSubscriber {
      9     filters: Vec<Filter>,
     10     sub: Option<UnifiedSubscription>,
     11     subscribers: u32,
     12 }
     13 
     14 impl MultiSubscriber {
     15     pub fn new(filters: Vec<Filter>) -> Self {
     16         Self {
     17             filters,
     18             sub: None,
     19             subscribers: 0,
     20         }
     21     }
     22 
     23     fn real_subscribe(
     24         ndb: &Ndb,
     25         pool: &mut RelayPool,
     26         filters: Vec<Filter>,
     27     ) -> Option<UnifiedSubscription> {
     28         let subid = Uuid::new_v4().to_string();
     29         let sub = ndb.subscribe(&filters).ok()?;
     30 
     31         pool.subscribe(subid.clone(), filters);
     32 
     33         Some(UnifiedSubscription {
     34             local: sub,
     35             remote: subid,
     36         })
     37     }
     38 
     39     pub fn unsubscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
     40         if self.subscribers == 0 {
     41             error!("No subscribers to unsubscribe from");
     42             return;
     43         }
     44 
     45         self.subscribers -= 1;
     46         if self.subscribers == 0 {
     47             let sub = match self.sub {
     48                 Some(ref sub) => sub,
     49                 None => {
     50                     error!("No remote subscription to unsubscribe from");
     51                     return;
     52                 }
     53             };
     54             let local_sub = &sub.local;
     55             if let Err(e) = ndb.unsubscribe(*local_sub) {
     56                 error!(
     57                     "failed to unsubscribe from object: {e}, subid:{}, {} active subscriptions",
     58                     local_sub.id(),
     59                     ndb.subscription_count()
     60                 );
     61             } else {
     62                 info!(
     63                     "Unsubscribed from object subid:{}. {} active subscriptions",
     64                     local_sub.id(),
     65                     ndb.subscription_count()
     66                 );
     67             }
     68 
     69             // unsub from remote
     70             pool.unsubscribe(sub.remote.clone());
     71             self.sub = None;
     72         } else {
     73             info!(
     74                 "Locally unsubscribing. {} active ndb subscriptions. {} active subscriptions for this object",
     75                 ndb.subscription_count(),
     76                 self.subscribers,
     77             );
     78         }
     79     }
     80 
     81     pub fn subscribe(&mut self, ndb: &Ndb, pool: &mut RelayPool) {
     82         self.subscribers += 1;
     83         if self.subscribers == 1 {
     84             if self.sub.is_some() {
     85                 error!("Object is first subscriber, but it already had remote subscription");
     86                 return;
     87             }
     88 
     89             self.sub = Self::real_subscribe(ndb, pool, self.filters.clone());
     90             info!(
     91                 "Remotely subscribing to object. {} total active subscriptions, {} on this object",
     92                 ndb.subscription_count(),
     93                 self.subscribers,
     94             );
     95 
     96             if self.sub.is_none() {
     97                 error!("Error subscribing remotely to object");
     98             }
     99         } else {
    100             info!(
    101                 "Locally subscribing. {} total active subscriptions, {} for this object",
    102                 ndb.subscription_count(),
    103                 self.subscribers,
    104             )
    105         }
    106     }
    107 
    108     pub fn poll_for_notes(
    109         &mut self,
    110         ndb: &Ndb,
    111         txn: &Transaction,
    112         is_muted: &MuteFun,
    113     ) -> Result<Vec<NoteRef>, Error> {
    114         let sub = self.sub.as_ref().ok_or(Error::no_active_sub())?;
    115         let new_note_keys = ndb.poll_for_notes(sub.local, 500);
    116 
    117         if new_note_keys.is_empty() {
    118             return Ok(vec![]);
    119         } else {
    120             debug!("{} new notes! {:?}", new_note_keys.len(), new_note_keys);
    121         }
    122 
    123         let mut notes: Vec<Note<'_>> = Vec::with_capacity(new_note_keys.len());
    124         for key in new_note_keys {
    125             let note = if let Ok(note) = ndb.get_note_by_key(txn, key) {
    126                 note
    127             } else {
    128                 continue;
    129             };
    130 
    131             if is_muted(&note) {
    132                 continue;
    133             }
    134 
    135             notes.push(note);
    136         }
    137 
    138         let note_refs: Vec<NoteRef> = notes.iter().map(|n| NoteRef::from_note(n)).collect();
    139 
    140         Ok(note_refs)
    141     }
    142 }