notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

pool.rs (12616B)


      1 use crate::relay::{setup_multicast_relay, MulticastRelay, Relay, RelayStatus};
      2 use crate::{ClientMessage, Result};
      3 use nostrdb::Filter;
      4 
      5 use std::collections::BTreeSet;
      6 use std::time::{Duration, Instant};
      7 
      8 use url::Url;
      9 
     10 #[cfg(not(target_arch = "wasm32"))]
     11 use ewebsock::{WsEvent, WsMessage};
     12 
     13 #[cfg(not(target_arch = "wasm32"))]
     14 use tracing::{debug, error};
     15 
     16 use super::subs_debug::SubsDebug;
     17 
     18 #[derive(Debug)]
     19 pub struct PoolEvent<'a> {
     20     pub relay: &'a str,
     21     pub event: ewebsock::WsEvent,
     22 }
     23 
     24 impl PoolEvent<'_> {
     25     pub fn into_owned(self) -> PoolEventBuf {
     26         PoolEventBuf {
     27             relay: self.relay.to_owned(),
     28             event: self.event,
     29         }
     30     }
     31 }
     32 
     33 pub struct PoolEventBuf {
     34     pub relay: String,
     35     pub event: ewebsock::WsEvent,
     36 }
     37 
     38 pub enum PoolRelay {
     39     Websocket(WebsocketRelay),
     40     Multicast(MulticastRelay),
     41 }
     42 
     43 pub struct WebsocketRelay {
     44     pub relay: Relay,
     45     pub last_ping: Instant,
     46     pub last_connect_attempt: Instant,
     47     pub retry_connect_after: Duration,
     48 }
     49 
     50 impl PoolRelay {
     51     pub fn url(&self) -> &str {
     52         match self {
     53             Self::Websocket(wsr) => &wsr.relay.url,
     54             Self::Multicast(_wsr) => "multicast",
     55         }
     56     }
     57 
     58     pub fn set_status(&mut self, status: RelayStatus) {
     59         match self {
     60             Self::Websocket(wsr) => {
     61                 wsr.relay.status = status;
     62             }
     63             Self::Multicast(_mcr) => {}
     64         }
     65     }
     66 
     67     pub fn try_recv(&self) -> Option<WsEvent> {
     68         match self {
     69             Self::Websocket(recvr) => recvr.relay.receiver.try_recv(),
     70             Self::Multicast(recvr) => recvr.try_recv(),
     71         }
     72     }
     73 
     74     pub fn status(&self) -> RelayStatus {
     75         match self {
     76             Self::Websocket(wsr) => wsr.relay.status,
     77             Self::Multicast(mcr) => mcr.status,
     78         }
     79     }
     80 
     81     pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
     82         match self {
     83             Self::Websocket(wsr) => {
     84                 wsr.relay.send(msg);
     85                 Ok(())
     86             }
     87 
     88             Self::Multicast(mcr) => {
     89                 // we only send event client messages at the moment
     90                 if let ClientMessage::Event(ecm) = msg {
     91                     mcr.send(ecm)?;
     92                 }
     93                 Ok(())
     94             }
     95         }
     96     }
     97 
     98     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) -> Result<()> {
     99         self.send(&ClientMessage::req(subid, filter))
    100     }
    101 
    102     pub fn websocket(relay: Relay) -> Self {
    103         Self::Websocket(WebsocketRelay::new(relay))
    104     }
    105 
    106     pub fn multicast(wakeup: impl Fn() + Send + Sync + Clone + 'static) -> Result<Self> {
    107         Ok(Self::Multicast(setup_multicast_relay(wakeup)?))
    108     }
    109 }
    110 
    111 impl WebsocketRelay {
    112     pub fn new(relay: Relay) -> Self {
    113         Self {
    114             relay,
    115             last_ping: Instant::now(),
    116             last_connect_attempt: Instant::now(),
    117             retry_connect_after: Self::initial_reconnect_duration(),
    118         }
    119     }
    120 
    121     pub fn initial_reconnect_duration() -> Duration {
    122         Duration::from_secs(5)
    123     }
    124 }
    125 
    126 pub struct RelayPool {
    127     pub relays: Vec<PoolRelay>,
    128     pub ping_rate: Duration,
    129     pub debug: Option<SubsDebug>,
    130 }
    131 
    132 impl Default for RelayPool {
    133     fn default() -> Self {
    134         RelayPool::new()
    135     }
    136 }
    137 
    138 impl RelayPool {
    139     // Constructs a new, empty RelayPool.
    140     pub fn new() -> RelayPool {
    141         RelayPool {
    142             relays: vec![],
    143             ping_rate: Duration::from_secs(45),
    144             debug: None,
    145         }
    146     }
    147 
    148     pub fn add_multicast_relay(
    149         &mut self,
    150         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    151     ) -> Result<()> {
    152         let multicast_relay = PoolRelay::multicast(wakeup)?;
    153         self.relays.push(multicast_relay);
    154         Ok(())
    155     }
    156 
    157     pub fn use_debug(&mut self) {
    158         self.debug = Some(SubsDebug::default());
    159     }
    160 
    161     pub fn ping_rate(&mut self, duration: Duration) -> &mut Self {
    162         self.ping_rate = duration;
    163         self
    164     }
    165 
    166     pub fn has(&self, url: &str) -> bool {
    167         for relay in &self.relays {
    168             if relay.url() == url {
    169                 return true;
    170             }
    171         }
    172 
    173         false
    174     }
    175 
    176     pub fn urls(&self) -> BTreeSet<String> {
    177         self.relays
    178             .iter()
    179             .map(|pool_relay| pool_relay.url().to_string())
    180             .collect()
    181     }
    182 
    183     pub fn send(&mut self, cmd: &ClientMessage) {
    184         for relay in &mut self.relays {
    185             if let Some(debug) = &mut self.debug {
    186                 debug.send_cmd(relay.url().to_owned(), cmd);
    187             }
    188             if let Err(err) = relay.send(cmd) {
    189                 error!("error sending {:?} to {}: {err}", cmd, relay.url());
    190             }
    191         }
    192     }
    193 
    194     pub fn unsubscribe(&mut self, subid: String) {
    195         for relay in &mut self.relays {
    196             let cmd = ClientMessage::close(subid.clone());
    197             if let Some(debug) = &mut self.debug {
    198                 debug.send_cmd(relay.url().to_owned(), &cmd);
    199             }
    200             if let Err(err) = relay.send(&cmd) {
    201                 error!(
    202                     "error unsubscribing from {} on {}: {err}",
    203                     &subid,
    204                     relay.url()
    205                 );
    206             }
    207         }
    208     }
    209 
    210     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
    211         for relay in &mut self.relays {
    212             if let Some(debug) = &mut self.debug {
    213                 debug.send_cmd(
    214                     relay.url().to_owned(),
    215                     &ClientMessage::req(subid.clone(), filter.clone()),
    216                 );
    217             }
    218 
    219             if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) {
    220                 error!("error subscribing to {}: {err}", relay.url());
    221             }
    222         }
    223     }
    224 
    225     /// Keep relay connectiongs alive by pinging relays that haven't been
    226     /// pinged in awhile. Adjust ping rate with [`ping_rate`].
    227     pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) {
    228         for relay in &mut self.relays {
    229             let now = std::time::Instant::now();
    230 
    231             match relay {
    232                 PoolRelay::Multicast(_) => {}
    233                 PoolRelay::Websocket(relay) => {
    234                     match relay.relay.status {
    235                         RelayStatus::Disconnected => {
    236                             let reconnect_at =
    237                                 relay.last_connect_attempt + relay.retry_connect_after;
    238                             if now > reconnect_at {
    239                                 relay.last_connect_attempt = now;
    240                                 let next_duration = Duration::from_millis(3000);
    241                                 debug!(
    242                                     "bumping reconnect duration from {:?} to {:?} and retrying connect",
    243                                     relay.retry_connect_after, next_duration
    244                                 );
    245                                 relay.retry_connect_after = next_duration;
    246                                 if let Err(err) = relay.relay.connect(wakeup.clone()) {
    247                                     error!("error connecting to relay: {}", err);
    248                                 }
    249                             } else {
    250                                 // let's wait a bit before we try again
    251                             }
    252                         }
    253 
    254                         RelayStatus::Connected => {
    255                             relay.retry_connect_after =
    256                                 WebsocketRelay::initial_reconnect_duration();
    257 
    258                             let should_ping = now - relay.last_ping > self.ping_rate;
    259                             if should_ping {
    260                                 debug!("pinging {}", relay.relay.url);
    261                                 relay.relay.ping();
    262                                 relay.last_ping = Instant::now();
    263                             }
    264                         }
    265 
    266                         RelayStatus::Connecting => {
    267                             // cool story bro
    268                         }
    269                     }
    270                 }
    271             }
    272         }
    273     }
    274 
    275     pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) {
    276         for relay in &mut self.relays {
    277             if relay.url() == relay_url {
    278                 if let Some(debug) = &mut self.debug {
    279                     debug.send_cmd(relay.url().to_owned(), cmd);
    280                 }
    281                 if let Err(err) = relay.send(cmd) {
    282                     error!("send_to err: {err}");
    283                 }
    284                 return;
    285             }
    286         }
    287     }
    288 
    289     /// check whether a relay url is valid to add
    290     pub fn is_valid_url(&self, url: &str) -> bool {
    291         if url.is_empty() {
    292             return false;
    293         }
    294         let url = match Url::parse(url) {
    295             Ok(parsed_url) => parsed_url.to_string(),
    296             Err(_err) => {
    297                 // debug!("bad relay url \"{}\": {:?}", url, err);
    298                 return false;
    299             }
    300         };
    301         if self.has(&url) {
    302             return false;
    303         }
    304         true
    305     }
    306 
    307     // Adds a websocket url to the RelayPool.
    308     pub fn add_url(
    309         &mut self,
    310         url: String,
    311         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    312     ) -> Result<()> {
    313         let url = Self::canonicalize_url(url);
    314         // Check if the URL already exists in the pool.
    315         if self.has(&url) {
    316             return Ok(());
    317         }
    318         let relay = Relay::new(url, wakeup)?;
    319         let pool_relay = PoolRelay::websocket(relay);
    320 
    321         self.relays.push(pool_relay);
    322 
    323         Ok(())
    324     }
    325 
    326     pub fn add_urls(
    327         &mut self,
    328         urls: BTreeSet<String>,
    329         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    330     ) -> Result<()> {
    331         for url in urls {
    332             self.add_url(url, wakeup.clone())?;
    333         }
    334         Ok(())
    335     }
    336 
    337     pub fn remove_urls(&mut self, urls: &BTreeSet<String>) {
    338         self.relays
    339             .retain(|pool_relay| !urls.contains(pool_relay.url()));
    340     }
    341 
    342     // standardize the format (ie, trailing slashes)
    343     fn canonicalize_url(url: String) -> String {
    344         match Url::parse(&url) {
    345             Ok(parsed_url) => parsed_url.to_string(),
    346             Err(_) => url, // If parsing fails, return the original URL.
    347         }
    348     }
    349 
    350     /// Attempts to receive a pool event from a list of relays. The
    351     /// function searches each relay in the list in order, attempting to
    352     /// receive a message from each. If a message is received, return it.
    353     /// If no message is received from any relays, None is returned.
    354     pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
    355         for relay in &mut self.relays {
    356             if let PoolRelay::Multicast(mcr) = relay {
    357                 // try rejoin on multicast
    358                 if mcr.should_rejoin() {
    359                     if let Err(err) = mcr.rejoin() {
    360                         error!("multicast: rejoin error: {err}");
    361                     }
    362                 }
    363             }
    364 
    365             if let Some(event) = relay.try_recv() {
    366                 match &event {
    367                     WsEvent::Opened => {
    368                         relay.set_status(RelayStatus::Connected);
    369                     }
    370                     WsEvent::Closed => {
    371                         relay.set_status(RelayStatus::Disconnected);
    372                     }
    373                     WsEvent::Error(err) => {
    374                         error!("{:?}", err);
    375                         relay.set_status(RelayStatus::Disconnected);
    376                     }
    377                     WsEvent::Message(ev) => {
    378                         // let's just handle pongs here.
    379                         // We only need to do this natively.
    380                         #[cfg(not(target_arch = "wasm32"))]
    381                         if let WsMessage::Ping(ref bs) = ev {
    382                             debug!("pong {}", relay.url());
    383                             match relay {
    384                                 PoolRelay::Websocket(wsr) => {
    385                                     wsr.relay.sender.send(WsMessage::Pong(bs.to_owned()));
    386                                 }
    387                                 PoolRelay::Multicast(_mcr) => {}
    388                             }
    389                         }
    390                     }
    391                 }
    392 
    393                 if let Some(debug) = &mut self.debug {
    394                     debug.receive_cmd(relay.url().to_owned(), (&event).into());
    395                 }
    396 
    397                 let pool_event = PoolEvent {
    398                     event,
    399                     relay: relay.url(),
    400                 };
    401 
    402                 return Some(pool_event);
    403             }
    404         }
    405 
    406         None
    407     }
    408 }