notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

pool.rs (7679B)


      1 use crate::relay::{Relay, RelayStatus};
      2 use crate::{ClientMessage, Result};
      3 use nostrdb::Filter;
      4 
      5 use std::collections::BTreeSet;
      6 use std::time::{Duration, Instant};
      7 
      8 use url::Url;
      9 
     10 #[cfg(not(target_arch = "wasm32"))]
     11 use ewebsock::{WsEvent, WsMessage};
     12 
     13 #[cfg(not(target_arch = "wasm32"))]
     14 use tracing::{debug, error};
     15 
     16 #[derive(Debug)]
     17 pub struct PoolEvent<'a> {
     18     pub relay: &'a str,
     19     pub event: ewebsock::WsEvent,
     20 }
     21 
     22 impl PoolEvent<'_> {
     23     pub fn into_owned(self) -> PoolEventBuf {
     24         PoolEventBuf {
     25             relay: self.relay.to_owned(),
     26             event: self.event,
     27         }
     28     }
     29 }
     30 
     31 pub struct PoolEventBuf {
     32     pub relay: String,
     33     pub event: ewebsock::WsEvent,
     34 }
     35 
     36 pub struct PoolRelay {
     37     pub relay: Relay,
     38     pub last_ping: Instant,
     39     pub last_connect_attempt: Instant,
     40     pub retry_connect_after: Duration,
     41 }
     42 
     43 impl PoolRelay {
     44     pub fn new(relay: Relay) -> PoolRelay {
     45         PoolRelay {
     46             relay,
     47             last_ping: Instant::now(),
     48             last_connect_attempt: Instant::now(),
     49             retry_connect_after: Self::initial_reconnect_duration(),
     50         }
     51     }
     52 
     53     pub fn initial_reconnect_duration() -> Duration {
     54         Duration::from_secs(5)
     55     }
     56 }
     57 
     58 pub struct RelayPool {
     59     pub relays: Vec<PoolRelay>,
     60     pub ping_rate: Duration,
     61 }
     62 
     63 impl Default for RelayPool {
     64     fn default() -> Self {
     65         RelayPool::new()
     66     }
     67 }
     68 
     69 impl RelayPool {
     70     // Constructs a new, empty RelayPool.
     71     pub fn new() -> RelayPool {
     72         RelayPool {
     73             relays: vec![],
     74             ping_rate: Duration::from_secs(25),
     75         }
     76     }
     77 
     78     pub fn ping_rate(&mut self, duration: Duration) -> &mut Self {
     79         self.ping_rate = duration;
     80         self
     81     }
     82 
     83     pub fn has(&self, url: &str) -> bool {
     84         for relay in &self.relays {
     85             if relay.relay.url == url {
     86                 return true;
     87             }
     88         }
     89 
     90         false
     91     }
     92 
     93     pub fn urls(&self) -> BTreeSet<String> {
     94         self.relays
     95             .iter()
     96             .map(|pool_relay| pool_relay.relay.url.clone())
     97             .collect()
     98     }
     99 
    100     pub fn send(&mut self, cmd: &ClientMessage) {
    101         for relay in &mut self.relays {
    102             relay.relay.send(cmd);
    103         }
    104     }
    105 
    106     pub fn unsubscribe(&mut self, subid: String) {
    107         for relay in &mut self.relays {
    108             relay.relay.send(&ClientMessage::close(subid.clone()));
    109         }
    110     }
    111 
    112     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
    113         for relay in &mut self.relays {
    114             relay.relay.subscribe(subid.clone(), filter.clone());
    115         }
    116     }
    117 
    118     /// Keep relay connectiongs alive by pinging relays that haven't been
    119     /// pinged in awhile. Adjust ping rate with [`ping_rate`].
    120     pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) {
    121         for relay in &mut self.relays {
    122             let now = std::time::Instant::now();
    123 
    124             match relay.relay.status {
    125                 RelayStatus::Disconnected => {
    126                     let reconnect_at = relay.last_connect_attempt + relay.retry_connect_after;
    127                     if now > reconnect_at {
    128                         relay.last_connect_attempt = now;
    129                         let next_duration = Duration::from_millis(
    130                             ((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64,
    131                         );
    132                         debug!(
    133                             "bumping reconnect duration from {:?} to {:?} and retrying connect",
    134                             relay.retry_connect_after, next_duration
    135                         );
    136                         relay.retry_connect_after = next_duration;
    137                         if let Err(err) = relay.relay.connect(wakeup.clone()) {
    138                             error!("error connecting to relay: {}", err);
    139                         }
    140                     } else {
    141                         // let's wait a bit before we try again
    142                     }
    143                 }
    144 
    145                 RelayStatus::Connected => {
    146                     relay.retry_connect_after = PoolRelay::initial_reconnect_duration();
    147 
    148                     let should_ping = now - relay.last_ping > self.ping_rate;
    149                     if should_ping {
    150                         debug!("pinging {}", relay.relay.url);
    151                         relay.relay.ping();
    152                         relay.last_ping = Instant::now();
    153                     }
    154                 }
    155 
    156                 RelayStatus::Connecting => {
    157                     // cool story bro
    158                 }
    159             }
    160         }
    161     }
    162 
    163     pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) {
    164         for relay in &mut self.relays {
    165             let relay = &mut relay.relay;
    166             if relay.url == relay_url {
    167                 relay.send(cmd);
    168                 return;
    169             }
    170         }
    171     }
    172 
    173     // Adds a websocket url to the RelayPool.
    174     pub fn add_url(
    175         &mut self,
    176         url: String,
    177         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    178     ) -> Result<()> {
    179         let url = Self::canonicalize_url(url);
    180         // Check if the URL already exists in the pool.
    181         if self.has(&url) {
    182             return Ok(());
    183         }
    184         let relay = Relay::new(url, wakeup)?;
    185         let pool_relay = PoolRelay::new(relay);
    186 
    187         self.relays.push(pool_relay);
    188 
    189         Ok(())
    190     }
    191 
    192     pub fn add_urls(
    193         &mut self,
    194         urls: BTreeSet<String>,
    195         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    196     ) -> Result<()> {
    197         for url in urls {
    198             self.add_url(url, wakeup.clone())?;
    199         }
    200         Ok(())
    201     }
    202 
    203     pub fn remove_urls(&mut self, urls: &BTreeSet<String>) {
    204         self.relays
    205             .retain(|pool_relay| !urls.contains(&pool_relay.relay.url));
    206     }
    207 
    208     // standardize the format (ie, trailing slashes)
    209     fn canonicalize_url(url: String) -> String {
    210         match Url::parse(&url) {
    211             Ok(parsed_url) => parsed_url.to_string(),
    212             Err(_) => url, // If parsing fails, return the original URL.
    213         }
    214     }
    215 
    216     /// Attempts to receive a pool event from a list of relays. The
    217     /// function searches each relay in the list in order, attempting to
    218     /// receive a message from each. If a message is received, return it.
    219     /// If no message is received from any relays, None is returned.
    220     pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
    221         for relay in &mut self.relays {
    222             let relay = &mut relay.relay;
    223             if let Some(event) = relay.receiver.try_recv() {
    224                 match &event {
    225                     WsEvent::Opened => {
    226                         relay.status = RelayStatus::Connected;
    227                     }
    228                     WsEvent::Closed => {
    229                         relay.status = RelayStatus::Disconnected;
    230                     }
    231                     WsEvent::Error(err) => {
    232                         error!("{:?}", err);
    233                         relay.status = RelayStatus::Disconnected;
    234                     }
    235                     WsEvent::Message(ev) => {
    236                         // let's just handle pongs here.
    237                         // We only need to do this natively.
    238                         #[cfg(not(target_arch = "wasm32"))]
    239                         if let WsMessage::Ping(ref bs) = ev {
    240                             debug!("pong {}", &relay.url);
    241                             relay.sender.send(WsMessage::Pong(bs.to_owned()));
    242                         }
    243                     }
    244                 }
    245                 return Some(PoolEvent {
    246                     event,
    247                     relay: &relay.url,
    248                 });
    249             }
    250         }
    251 
    252         None
    253     }
    254 }