notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

pool.rs (6322B)


      1 use crate::relay::{Relay, RelayStatus};
      2 use crate::{ClientMessage, Result};
      3 use nostrdb::Filter;
      4 
      5 use std::time::{Duration, Instant};
      6 
      7 #[cfg(not(target_arch = "wasm32"))]
      8 use ewebsock::{WsEvent, WsMessage};
      9 
     10 #[cfg(not(target_arch = "wasm32"))]
     11 use tracing::{debug, error};
     12 
     13 #[derive(Debug)]
     14 pub struct PoolEvent<'a> {
     15     pub relay: &'a str,
     16     pub event: ewebsock::WsEvent,
     17 }
     18 
     19 pub struct PoolRelay {
     20     pub relay: Relay,
     21     pub last_ping: Instant,
     22     pub last_connect_attempt: Instant,
     23     pub retry_connect_after: Duration,
     24 }
     25 
     26 impl PoolRelay {
     27     pub fn new(relay: Relay) -> PoolRelay {
     28         PoolRelay {
     29             relay,
     30             last_ping: Instant::now(),
     31             last_connect_attempt: Instant::now(),
     32             retry_connect_after: Self::initial_reconnect_duration(),
     33         }
     34     }
     35 
     36     pub fn initial_reconnect_duration() -> Duration {
     37         Duration::from_secs(5)
     38     }
     39 }
     40 
     41 pub struct RelayPool {
     42     pub relays: Vec<PoolRelay>,
     43     pub ping_rate: Duration,
     44 }
     45 
     46 impl Default for RelayPool {
     47     fn default() -> Self {
     48         RelayPool::new()
     49     }
     50 }
     51 
     52 impl RelayPool {
     53     // Constructs a new, empty RelayPool.
     54     pub fn new() -> RelayPool {
     55         RelayPool {
     56             relays: vec![],
     57             ping_rate: Duration::from_secs(25),
     58         }
     59     }
     60 
     61     pub fn ping_rate(&mut self, duration: Duration) -> &mut Self {
     62         self.ping_rate = duration;
     63         self
     64     }
     65 
     66     pub fn has(&self, url: &str) -> bool {
     67         for relay in &self.relays {
     68             if relay.relay.url == url {
     69                 return true;
     70             }
     71         }
     72 
     73         false
     74     }
     75 
     76     pub fn send(&mut self, cmd: &ClientMessage) {
     77         for relay in &mut self.relays {
     78             relay.relay.send(cmd);
     79         }
     80     }
     81 
     82     pub fn unsubscribe(&mut self, subid: String) {
     83         for relay in &mut self.relays {
     84             relay.relay.send(&ClientMessage::close(subid.clone()));
     85         }
     86     }
     87 
     88     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
     89         for relay in &mut self.relays {
     90             relay.relay.subscribe(subid.clone(), filter.clone());
     91         }
     92     }
     93 
     94     /// Keep relay connectiongs alive by pinging relays that haven't been
     95     /// pinged in awhile. Adjust ping rate with [`ping_rate`].
     96     pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) {
     97         for relay in &mut self.relays {
     98             let now = std::time::Instant::now();
     99 
    100             match relay.relay.status {
    101                 RelayStatus::Disconnected => {
    102                     let reconnect_at = relay.last_connect_attempt + relay.retry_connect_after;
    103                     if now > reconnect_at {
    104                         relay.last_connect_attempt = now;
    105                         let next_duration = Duration::from_millis(
    106                             ((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64,
    107                         );
    108                         debug!(
    109                             "bumping reconnect duration from {:?} to {:?} and retrying connect",
    110                             relay.retry_connect_after, next_duration
    111                         );
    112                         relay.retry_connect_after = next_duration;
    113                         if let Err(err) = relay.relay.connect(wakeup.clone()) {
    114                             error!("error connecting to relay: {}", err);
    115                         }
    116                     } else {
    117                         // let's wait a bit before we try again
    118                     }
    119                 }
    120 
    121                 RelayStatus::Connected => {
    122                     relay.retry_connect_after = PoolRelay::initial_reconnect_duration();
    123 
    124                     let should_ping = now - relay.last_ping > self.ping_rate;
    125                     if should_ping {
    126                         debug!("pinging {}", relay.relay.url);
    127                         relay.relay.ping();
    128                         relay.last_ping = Instant::now();
    129                     }
    130                 }
    131 
    132                 RelayStatus::Connecting => {
    133                     // cool story bro
    134                 }
    135             }
    136         }
    137     }
    138 
    139     pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) {
    140         for relay in &mut self.relays {
    141             let relay = &mut relay.relay;
    142             if relay.url == relay_url {
    143                 relay.send(cmd);
    144                 return;
    145             }
    146         }
    147     }
    148 
    149     // Adds a websocket url to the RelayPool.
    150     pub fn add_url(
    151         &mut self,
    152         url: String,
    153         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    154     ) -> Result<()> {
    155         let relay = Relay::new(url, wakeup)?;
    156         let pool_relay = PoolRelay::new(relay);
    157 
    158         self.relays.push(pool_relay);
    159 
    160         Ok(())
    161     }
    162 
    163     /// Attempts to receive a pool event from a list of relays. The
    164     /// function searches each relay in the list in order, attempting to
    165     /// receive a message from each. If a message is received, return it.
    166     /// If no message is received from any relays, None is returned.
    167     pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
    168         for relay in &mut self.relays {
    169             let relay = &mut relay.relay;
    170             if let Some(event) = relay.receiver.try_recv() {
    171                 match &event {
    172                     WsEvent::Opened => {
    173                         relay.status = RelayStatus::Connected;
    174                     }
    175                     WsEvent::Closed => {
    176                         relay.status = RelayStatus::Disconnected;
    177                     }
    178                     WsEvent::Error(err) => {
    179                         error!("{:?}", err);
    180                         relay.status = RelayStatus::Disconnected;
    181                     }
    182                     WsEvent::Message(ev) => {
    183                         // let's just handle pongs here.
    184                         // We only need to do this natively.
    185                         #[cfg(not(target_arch = "wasm32"))]
    186                         if let WsMessage::Ping(ref bs) = ev {
    187                             debug!("pong {}", &relay.url);
    188                             relay.sender.send(WsMessage::Pong(bs.to_owned()));
    189                         }
    190                     }
    191                 }
    192                 return Some(PoolEvent {
    193                     event,
    194                     relay: &relay.url,
    195                 });
    196             }
    197         }
    198 
    199         None
    200     }
    201 }