notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

pool.rs (12201B)


      1 use crate::relay::multicast::{setup_multicast_relay, MulticastRelay};
      2 use crate::relay::{RelayStatus, WebsocketConn, WebsocketRelay};
      3 use crate::{ClientMessage, Error, Result};
      4 use nostrdb::Filter;
      5 
      6 use std::collections::BTreeSet;
      7 use std::time::{Duration, Instant};
      8 
      9 use url::Url;
     10 
     11 use ewebsock::{WsEvent, WsMessage};
     12 use tracing::{debug, error, trace};
     13 
     14 use super::subs_debug::SubsDebug;
     15 
     16 #[derive(Debug)]
     17 pub struct PoolEvent<'a> {
     18     pub relay: &'a str,
     19     pub event: ewebsock::WsEvent,
     20 }
     21 
     22 impl PoolEvent<'_> {
     23     pub fn into_owned(self) -> PoolEventBuf {
     24         PoolEventBuf {
     25             relay: self.relay.to_owned(),
     26             event: self.event,
     27         }
     28     }
     29 }
     30 
     31 pub struct PoolEventBuf {
     32     pub relay: String,
     33     pub event: ewebsock::WsEvent,
     34 }
     35 
     36 pub enum PoolRelay {
     37     Websocket(WebsocketRelay),
     38     Multicast(MulticastRelay),
     39 }
     40 
     41 impl PoolRelay {
     42     pub fn url(&self) -> &str {
     43         match self {
     44             Self::Websocket(wsr) => wsr.conn.url.as_str(),
     45             Self::Multicast(_wsr) => "multicast",
     46         }
     47     }
     48 
     49     pub fn set_status(&mut self, status: RelayStatus) {
     50         match self {
     51             Self::Websocket(wsr) => {
     52                 wsr.conn.status = status;
     53             }
     54             Self::Multicast(_mcr) => {}
     55         }
     56     }
     57 
     58     pub fn try_recv(&self) -> Option<WsEvent> {
     59         match self {
     60             Self::Websocket(recvr) => recvr.conn.receiver.try_recv(),
     61             Self::Multicast(recvr) => recvr.try_recv(),
     62         }
     63     }
     64 
     65     pub fn status(&self) -> RelayStatus {
     66         match self {
     67             Self::Websocket(wsr) => wsr.conn.status,
     68             Self::Multicast(mcr) => mcr.status(),
     69         }
     70     }
     71 
     72     pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
     73         match self {
     74             Self::Websocket(wsr) => {
     75                 wsr.conn.send(msg);
     76                 Ok(())
     77             }
     78 
     79             Self::Multicast(mcr) => {
     80                 // we only send event client messages at the moment
     81                 if let ClientMessage::Event(ecm) = msg {
     82                     mcr.send(ecm)?;
     83                 }
     84                 Ok(())
     85             }
     86         }
     87     }
     88 
     89     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) -> Result<()> {
     90         self.send(&ClientMessage::req(subid, filter))
     91     }
     92 
     93     pub fn websocket(relay: WebsocketConn) -> Self {
     94         Self::Websocket(WebsocketRelay::new(relay))
     95     }
     96 
     97     pub fn multicast(wakeup: impl Fn() + Send + Sync + Clone + 'static) -> Result<Self> {
     98         Ok(Self::Multicast(setup_multicast_relay(wakeup)?))
     99     }
    100 }
    101 
    102 pub struct RelayPool {
    103     pub relays: Vec<PoolRelay>,
    104     pub ping_rate: Duration,
    105     pub debug: Option<SubsDebug>,
    106 }
    107 
    108 impl Default for RelayPool {
    109     fn default() -> Self {
    110         RelayPool::new()
    111     }
    112 }
    113 
    114 impl RelayPool {
    115     // Constructs a new, empty RelayPool.
    116     pub fn new() -> RelayPool {
    117         RelayPool {
    118             relays: vec![],
    119             ping_rate: Duration::from_secs(45),
    120             debug: None,
    121         }
    122     }
    123 
    124     pub fn add_multicast_relay(
    125         &mut self,
    126         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    127     ) -> Result<()> {
    128         let multicast_relay = PoolRelay::multicast(wakeup)?;
    129         self.relays.push(multicast_relay);
    130         Ok(())
    131     }
    132 
    133     pub fn use_debug(&mut self) {
    134         self.debug = Some(SubsDebug::default());
    135     }
    136 
    137     pub fn ping_rate(&mut self, duration: Duration) -> &mut Self {
    138         self.ping_rate = duration;
    139         self
    140     }
    141 
    142     pub fn has(&self, url: &str) -> bool {
    143         for relay in &self.relays {
    144             if relay.url() == url {
    145                 return true;
    146             }
    147         }
    148 
    149         false
    150     }
    151 
    152     pub fn urls(&self) -> BTreeSet<String> {
    153         self.relays
    154             .iter()
    155             .map(|pool_relay| pool_relay.url().to_string())
    156             .collect()
    157     }
    158 
    159     pub fn send(&mut self, cmd: &ClientMessage) {
    160         for relay in &mut self.relays {
    161             if let Some(debug) = &mut self.debug {
    162                 debug.send_cmd(relay.url().to_owned(), cmd);
    163             }
    164             if let Err(err) = relay.send(cmd) {
    165                 error!("error sending {:?} to {}: {err}", cmd, relay.url());
    166             }
    167         }
    168     }
    169 
    170     pub fn unsubscribe(&mut self, subid: String) {
    171         for relay in &mut self.relays {
    172             let cmd = ClientMessage::close(subid.clone());
    173             if let Some(debug) = &mut self.debug {
    174                 debug.send_cmd(relay.url().to_owned(), &cmd);
    175             }
    176             if let Err(err) = relay.send(&cmd) {
    177                 error!(
    178                     "error unsubscribing from {} on {}: {err}",
    179                     &subid,
    180                     relay.url()
    181                 );
    182             }
    183         }
    184     }
    185 
    186     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
    187         for relay in &mut self.relays {
    188             if let Some(debug) = &mut self.debug {
    189                 debug.send_cmd(
    190                     relay.url().to_owned(),
    191                     &ClientMessage::req(subid.clone(), filter.clone()),
    192                 );
    193             }
    194 
    195             if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) {
    196                 error!("error subscribing to {}: {err}", relay.url());
    197             }
    198         }
    199     }
    200 
    201     /// Keep relay connectiongs alive by pinging relays that haven't been
    202     /// pinged in awhile. Adjust ping rate with [`ping_rate`].
    203     pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) {
    204         for relay in &mut self.relays {
    205             let now = std::time::Instant::now();
    206 
    207             match relay {
    208                 PoolRelay::Multicast(_) => {}
    209                 PoolRelay::Websocket(relay) => {
    210                     match relay.conn.status {
    211                         RelayStatus::Disconnected => {
    212                             let reconnect_at =
    213                                 relay.last_connect_attempt + relay.retry_connect_after;
    214                             if now > reconnect_at {
    215                                 relay.last_connect_attempt = now;
    216                                 let next_duration = Duration::from_millis(3000);
    217                                 debug!(
    218                                     "bumping reconnect duration from {:?} to {:?} and retrying connect",
    219                                     relay.retry_connect_after, next_duration
    220                                 );
    221                                 relay.retry_connect_after = next_duration;
    222                                 if let Err(err) = relay.conn.connect(wakeup.clone()) {
    223                                     error!("error connecting to relay: {}", err);
    224                                 }
    225                             } else {
    226                                 // let's wait a bit before we try again
    227                             }
    228                         }
    229 
    230                         RelayStatus::Connected => {
    231                             relay.retry_connect_after =
    232                                 WebsocketRelay::initial_reconnect_duration();
    233 
    234                             let should_ping = now - relay.last_ping > self.ping_rate;
    235                             if should_ping {
    236                                 trace!("pinging {}", relay.conn.url);
    237                                 relay.conn.ping();
    238                                 relay.last_ping = Instant::now();
    239                             }
    240                         }
    241 
    242                         RelayStatus::Connecting => {
    243                             // cool story bro
    244                         }
    245                     }
    246                 }
    247             }
    248         }
    249     }
    250 
    251     pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) {
    252         for relay in &mut self.relays {
    253             if relay.url() == relay_url {
    254                 if let Some(debug) = &mut self.debug {
    255                     debug.send_cmd(relay.url().to_owned(), cmd);
    256                 }
    257                 if let Err(err) = relay.send(cmd) {
    258                     error!("send_to err: {err}");
    259                 }
    260                 return;
    261             }
    262         }
    263     }
    264 
    265     /// check whether a relay url is valid to add
    266     pub fn is_valid_url(&self, url: &str) -> bool {
    267         if url.is_empty() {
    268             return false;
    269         }
    270         let url = match Url::parse(url) {
    271             Ok(parsed_url) => parsed_url.to_string(),
    272             Err(_err) => {
    273                 // debug!("bad relay url \"{}\": {:?}", url, err);
    274                 return false;
    275             }
    276         };
    277         if self.has(&url) {
    278             return false;
    279         }
    280         true
    281     }
    282 
    283     // Adds a websocket url to the RelayPool.
    284     pub fn add_url(
    285         &mut self,
    286         url: String,
    287         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    288     ) -> Result<()> {
    289         let url = Self::canonicalize_url(url);
    290         // Check if the URL already exists in the pool.
    291         if self.has(&url) {
    292             return Ok(());
    293         }
    294         let relay = WebsocketConn::new(
    295             nostr::RelayUrl::parse(url).map_err(|_| Error::InvalidRelayUrl)?,
    296             wakeup,
    297         )?;
    298         let pool_relay = PoolRelay::websocket(relay);
    299 
    300         self.relays.push(pool_relay);
    301 
    302         Ok(())
    303     }
    304 
    305     pub fn add_urls(
    306         &mut self,
    307         urls: BTreeSet<String>,
    308         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    309     ) -> Result<()> {
    310         for url in urls {
    311             self.add_url(url, wakeup.clone())?;
    312         }
    313         Ok(())
    314     }
    315 
    316     pub fn remove_urls(&mut self, urls: &BTreeSet<String>) {
    317         self.relays
    318             .retain(|pool_relay| !urls.contains(pool_relay.url()));
    319     }
    320 
    321     // standardize the format (ie, trailing slashes)
    322     fn canonicalize_url(url: String) -> String {
    323         match Url::parse(&url) {
    324             Ok(parsed_url) => parsed_url.to_string(),
    325             Err(_) => url, // If parsing fails, return the original URL.
    326         }
    327     }
    328 
    329     /// Attempts to receive a pool event from a list of relays. The
    330     /// function searches each relay in the list in order, attempting to
    331     /// receive a message from each. If a message is received, return it.
    332     /// If no message is received from any relays, None is returned.
    333     pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
    334         for relay in &mut self.relays {
    335             if let PoolRelay::Multicast(mcr) = relay {
    336                 // try rejoin on multicast
    337                 if mcr.should_rejoin() {
    338                     if let Err(err) = mcr.rejoin() {
    339                         error!("multicast: rejoin error: {err}");
    340                     }
    341                 }
    342             }
    343 
    344             if let Some(event) = relay.try_recv() {
    345                 match &event {
    346                     WsEvent::Opened => {
    347                         relay.set_status(RelayStatus::Connected);
    348                     }
    349                     WsEvent::Closed => {
    350                         relay.set_status(RelayStatus::Disconnected);
    351                     }
    352                     WsEvent::Error(err) => {
    353                         error!("{:?}", err);
    354                         relay.set_status(RelayStatus::Disconnected);
    355                     }
    356                     WsEvent::Message(ev) => {
    357                         // let's just handle pongs here.
    358                         // We only need to do this natively.
    359                         #[cfg(not(target_arch = "wasm32"))]
    360                         if let WsMessage::Ping(ref bs) = ev {
    361                             trace!("pong {}", relay.url());
    362                             match relay {
    363                                 PoolRelay::Websocket(wsr) => {
    364                                     wsr.conn.sender.send(WsMessage::Pong(bs.to_owned()));
    365                                 }
    366                                 PoolRelay::Multicast(_mcr) => {}
    367                             }
    368                         }
    369                     }
    370                 }
    371 
    372                 if let Some(debug) = &mut self.debug {
    373                     debug.receive_cmd(relay.url().to_owned(), (&event).into());
    374                 }
    375 
    376                 let pool_event = PoolEvent {
    377                     event,
    378                     relay: relay.url(),
    379                 };
    380 
    381                 return Some(pool_event);
    382             }
    383         }
    384 
    385         None
    386     }
    387 }