notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

pool.rs (7056B)


      1 use crate::relay::{Relay, RelayStatus};
      2 use crate::{ClientMessage, Result};
      3 use nostrdb::Filter;
      4 
      5 use std::time::{Duration, Instant};
      6 
      7 use url::Url;
      8 
      9 #[cfg(not(target_arch = "wasm32"))]
     10 use ewebsock::{WsEvent, WsMessage};
     11 
     12 #[cfg(not(target_arch = "wasm32"))]
     13 use tracing::{debug, error};
     14 
     15 #[derive(Debug)]
     16 pub struct PoolEvent<'a> {
     17     pub relay: &'a str,
     18     pub event: ewebsock::WsEvent,
     19 }
     20 
     21 impl<'a> PoolEvent<'a> {
     22     pub fn into_owned(self) -> PoolEventBuf {
     23         PoolEventBuf {
     24             relay: self.relay.to_owned(),
     25             event: self.event,
     26         }
     27     }
     28 }
     29 
     30 pub struct PoolEventBuf {
     31     pub relay: String,
     32     pub event: ewebsock::WsEvent,
     33 }
     34 
     35 pub struct PoolRelay {
     36     pub relay: Relay,
     37     pub last_ping: Instant,
     38     pub last_connect_attempt: Instant,
     39     pub retry_connect_after: Duration,
     40 }
     41 
     42 impl PoolRelay {
     43     pub fn new(relay: Relay) -> PoolRelay {
     44         PoolRelay {
     45             relay,
     46             last_ping: Instant::now(),
     47             last_connect_attempt: Instant::now(),
     48             retry_connect_after: Self::initial_reconnect_duration(),
     49         }
     50     }
     51 
     52     pub fn initial_reconnect_duration() -> Duration {
     53         Duration::from_secs(5)
     54     }
     55 }
     56 
     57 pub struct RelayPool {
     58     pub relays: Vec<PoolRelay>,
     59     pub ping_rate: Duration,
     60 }
     61 
     62 impl Default for RelayPool {
     63     fn default() -> Self {
     64         RelayPool::new()
     65     }
     66 }
     67 
     68 impl RelayPool {
     69     // Constructs a new, empty RelayPool.
     70     pub fn new() -> RelayPool {
     71         RelayPool {
     72             relays: vec![],
     73             ping_rate: Duration::from_secs(25),
     74         }
     75     }
     76 
     77     pub fn ping_rate(&mut self, duration: Duration) -> &mut Self {
     78         self.ping_rate = duration;
     79         self
     80     }
     81 
     82     pub fn has(&self, url: &str) -> bool {
     83         for relay in &self.relays {
     84             if relay.relay.url == url {
     85                 return true;
     86             }
     87         }
     88 
     89         false
     90     }
     91 
     92     pub fn send(&mut self, cmd: &ClientMessage) {
     93         for relay in &mut self.relays {
     94             relay.relay.send(cmd);
     95         }
     96     }
     97 
     98     pub fn unsubscribe(&mut self, subid: String) {
     99         for relay in &mut self.relays {
    100             relay.relay.send(&ClientMessage::close(subid.clone()));
    101         }
    102     }
    103 
    104     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
    105         for relay in &mut self.relays {
    106             relay.relay.subscribe(subid.clone(), filter.clone());
    107         }
    108     }
    109 
    110     /// Keep relay connectiongs alive by pinging relays that haven't been
    111     /// pinged in awhile. Adjust ping rate with [`ping_rate`].
    112     pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) {
    113         for relay in &mut self.relays {
    114             let now = std::time::Instant::now();
    115 
    116             match relay.relay.status {
    117                 RelayStatus::Disconnected => {
    118                     let reconnect_at = relay.last_connect_attempt + relay.retry_connect_after;
    119                     if now > reconnect_at {
    120                         relay.last_connect_attempt = now;
    121                         let next_duration = Duration::from_millis(
    122                             ((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64,
    123                         );
    124                         debug!(
    125                             "bumping reconnect duration from {:?} to {:?} and retrying connect",
    126                             relay.retry_connect_after, next_duration
    127                         );
    128                         relay.retry_connect_after = next_duration;
    129                         if let Err(err) = relay.relay.connect(wakeup.clone()) {
    130                             error!("error connecting to relay: {}", err);
    131                         }
    132                     } else {
    133                         // let's wait a bit before we try again
    134                     }
    135                 }
    136 
    137                 RelayStatus::Connected => {
    138                     relay.retry_connect_after = PoolRelay::initial_reconnect_duration();
    139 
    140                     let should_ping = now - relay.last_ping > self.ping_rate;
    141                     if should_ping {
    142                         debug!("pinging {}", relay.relay.url);
    143                         relay.relay.ping();
    144                         relay.last_ping = Instant::now();
    145                     }
    146                 }
    147 
    148                 RelayStatus::Connecting => {
    149                     // cool story bro
    150                 }
    151             }
    152         }
    153     }
    154 
    155     pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) {
    156         for relay in &mut self.relays {
    157             let relay = &mut relay.relay;
    158             if relay.url == relay_url {
    159                 relay.send(cmd);
    160                 return;
    161             }
    162         }
    163     }
    164 
    165     // Adds a websocket url to the RelayPool.
    166     pub fn add_url(
    167         &mut self,
    168         url: String,
    169         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    170     ) -> Result<()> {
    171         let url = Self::canonicalize_url(url);
    172         // Check if the URL already exists in the pool.
    173         if self.has(&url) {
    174             return Ok(());
    175         }
    176         let relay = Relay::new(url, wakeup)?;
    177         let pool_relay = PoolRelay::new(relay);
    178 
    179         self.relays.push(pool_relay);
    180 
    181         Ok(())
    182     }
    183 
    184     // standardize the format (ie, trailing slashes)
    185     fn canonicalize_url(url: String) -> String {
    186         match Url::parse(&url) {
    187             Ok(parsed_url) => parsed_url.to_string(),
    188             Err(_) => url, // If parsing fails, return the original URL.
    189         }
    190     }
    191 
    192     /// Attempts to receive a pool event from a list of relays. The
    193     /// function searches each relay in the list in order, attempting to
    194     /// receive a message from each. If a message is received, return it.
    195     /// If no message is received from any relays, None is returned.
    196     pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
    197         for relay in &mut self.relays {
    198             let relay = &mut relay.relay;
    199             if let Some(event) = relay.receiver.try_recv() {
    200                 match &event {
    201                     WsEvent::Opened => {
    202                         relay.status = RelayStatus::Connected;
    203                     }
    204                     WsEvent::Closed => {
    205                         relay.status = RelayStatus::Disconnected;
    206                     }
    207                     WsEvent::Error(err) => {
    208                         error!("{:?}", err);
    209                         relay.status = RelayStatus::Disconnected;
    210                     }
    211                     WsEvent::Message(ev) => {
    212                         // let's just handle pongs here.
    213                         // We only need to do this natively.
    214                         #[cfg(not(target_arch = "wasm32"))]
    215                         if let WsMessage::Ping(ref bs) = ev {
    216                             debug!("pong {}", &relay.url);
    217                             relay.sender.send(WsMessage::Pong(bs.to_owned()));
    218                         }
    219                     }
    220                 }
    221                 return Some(PoolEvent {
    222                     event,
    223                     relay: &relay.url,
    224                 });
    225             }
    226         }
    227 
    228         None
    229     }
    230 }