notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

pool.rs (12661B)


      1 use crate::relay::{setup_multicast_relay, MulticastRelay, Relay, RelayStatus};
      2 use crate::{ClientMessage, Error, Result};
      3 use nostrdb::Filter;
      4 
      5 use std::collections::BTreeSet;
      6 use std::time::{Duration, Instant};
      7 
      8 use url::Url;
      9 
     10 use ewebsock::{WsEvent, WsMessage};
     11 use tracing::{debug, error, trace};
     12 
     13 use super::subs_debug::SubsDebug;
     14 
     15 #[derive(Debug)]
     16 pub struct PoolEvent<'a> {
     17     pub relay: &'a str,
     18     pub event: ewebsock::WsEvent,
     19 }
     20 
     21 impl PoolEvent<'_> {
     22     pub fn into_owned(self) -> PoolEventBuf {
     23         PoolEventBuf {
     24             relay: self.relay.to_owned(),
     25             event: self.event,
     26         }
     27     }
     28 }
     29 
     30 pub struct PoolEventBuf {
     31     pub relay: String,
     32     pub event: ewebsock::WsEvent,
     33 }
     34 
     35 pub enum PoolRelay {
     36     Websocket(WebsocketRelay),
     37     Multicast(MulticastRelay),
     38 }
     39 
     40 pub struct WebsocketRelay {
     41     pub relay: Relay,
     42     pub last_ping: Instant,
     43     pub last_connect_attempt: Instant,
     44     pub retry_connect_after: Duration,
     45 }
     46 
     47 impl PoolRelay {
     48     pub fn url(&self) -> &str {
     49         match self {
     50             Self::Websocket(wsr) => wsr.relay.url.as_str(),
     51             Self::Multicast(_wsr) => "multicast",
     52         }
     53     }
     54 
     55     pub fn set_status(&mut self, status: RelayStatus) {
     56         match self {
     57             Self::Websocket(wsr) => {
     58                 wsr.relay.status = status;
     59             }
     60             Self::Multicast(_mcr) => {}
     61         }
     62     }
     63 
     64     pub fn try_recv(&self) -> Option<WsEvent> {
     65         match self {
     66             Self::Websocket(recvr) => recvr.relay.receiver.try_recv(),
     67             Self::Multicast(recvr) => recvr.try_recv(),
     68         }
     69     }
     70 
     71     pub fn status(&self) -> RelayStatus {
     72         match self {
     73             Self::Websocket(wsr) => wsr.relay.status,
     74             Self::Multicast(mcr) => mcr.status,
     75         }
     76     }
     77 
     78     pub fn send(&mut self, msg: &ClientMessage) -> Result<()> {
     79         match self {
     80             Self::Websocket(wsr) => {
     81                 wsr.relay.send(msg);
     82                 Ok(())
     83             }
     84 
     85             Self::Multicast(mcr) => {
     86                 // we only send event client messages at the moment
     87                 if let ClientMessage::Event(ecm) = msg {
     88                     mcr.send(ecm)?;
     89                 }
     90                 Ok(())
     91             }
     92         }
     93     }
     94 
     95     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) -> Result<()> {
     96         self.send(&ClientMessage::req(subid, filter))
     97     }
     98 
     99     pub fn websocket(relay: Relay) -> Self {
    100         Self::Websocket(WebsocketRelay::new(relay))
    101     }
    102 
    103     pub fn multicast(wakeup: impl Fn() + Send + Sync + Clone + 'static) -> Result<Self> {
    104         Ok(Self::Multicast(setup_multicast_relay(wakeup)?))
    105     }
    106 }
    107 
    108 impl WebsocketRelay {
    109     pub fn new(relay: Relay) -> Self {
    110         Self {
    111             relay,
    112             last_ping: Instant::now(),
    113             last_connect_attempt: Instant::now(),
    114             retry_connect_after: Self::initial_reconnect_duration(),
    115         }
    116     }
    117 
    118     pub fn initial_reconnect_duration() -> Duration {
    119         Duration::from_secs(5)
    120     }
    121 }
    122 
    123 pub struct RelayPool {
    124     pub relays: Vec<PoolRelay>,
    125     pub ping_rate: Duration,
    126     pub debug: Option<SubsDebug>,
    127 }
    128 
    129 impl Default for RelayPool {
    130     fn default() -> Self {
    131         RelayPool::new()
    132     }
    133 }
    134 
    135 impl RelayPool {
    136     // Constructs a new, empty RelayPool.
    137     pub fn new() -> RelayPool {
    138         RelayPool {
    139             relays: vec![],
    140             ping_rate: Duration::from_secs(45),
    141             debug: None,
    142         }
    143     }
    144 
    145     pub fn add_multicast_relay(
    146         &mut self,
    147         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    148     ) -> Result<()> {
    149         let multicast_relay = PoolRelay::multicast(wakeup)?;
    150         self.relays.push(multicast_relay);
    151         Ok(())
    152     }
    153 
    154     pub fn use_debug(&mut self) {
    155         self.debug = Some(SubsDebug::default());
    156     }
    157 
    158     pub fn ping_rate(&mut self, duration: Duration) -> &mut Self {
    159         self.ping_rate = duration;
    160         self
    161     }
    162 
    163     pub fn has(&self, url: &str) -> bool {
    164         for relay in &self.relays {
    165             if relay.url() == url {
    166                 return true;
    167             }
    168         }
    169 
    170         false
    171     }
    172 
    173     pub fn urls(&self) -> BTreeSet<String> {
    174         self.relays
    175             .iter()
    176             .map(|pool_relay| pool_relay.url().to_string())
    177             .collect()
    178     }
    179 
    180     pub fn send(&mut self, cmd: &ClientMessage) {
    181         for relay in &mut self.relays {
    182             if let Some(debug) = &mut self.debug {
    183                 debug.send_cmd(relay.url().to_owned(), cmd);
    184             }
    185             if let Err(err) = relay.send(cmd) {
    186                 error!("error sending {:?} to {}: {err}", cmd, relay.url());
    187             }
    188         }
    189     }
    190 
    191     pub fn unsubscribe(&mut self, subid: String) {
    192         for relay in &mut self.relays {
    193             let cmd = ClientMessage::close(subid.clone());
    194             if let Some(debug) = &mut self.debug {
    195                 debug.send_cmd(relay.url().to_owned(), &cmd);
    196             }
    197             if let Err(err) = relay.send(&cmd) {
    198                 error!(
    199                     "error unsubscribing from {} on {}: {err}",
    200                     &subid,
    201                     relay.url()
    202                 );
    203             }
    204         }
    205     }
    206 
    207     pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) {
    208         for relay in &mut self.relays {
    209             if let Some(debug) = &mut self.debug {
    210                 debug.send_cmd(
    211                     relay.url().to_owned(),
    212                     &ClientMessage::req(subid.clone(), filter.clone()),
    213                 );
    214             }
    215 
    216             if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) {
    217                 error!("error subscribing to {}: {err}", relay.url());
    218             }
    219         }
    220     }
    221 
    222     /// Keep relay connectiongs alive by pinging relays that haven't been
    223     /// pinged in awhile. Adjust ping rate with [`ping_rate`].
    224     pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) {
    225         for relay in &mut self.relays {
    226             let now = std::time::Instant::now();
    227 
    228             match relay {
    229                 PoolRelay::Multicast(_) => {}
    230                 PoolRelay::Websocket(relay) => {
    231                     match relay.relay.status {
    232                         RelayStatus::Disconnected => {
    233                             let reconnect_at =
    234                                 relay.last_connect_attempt + relay.retry_connect_after;
    235                             if now > reconnect_at {
    236                                 relay.last_connect_attempt = now;
    237                                 let next_duration = Duration::from_millis(3000);
    238                                 debug!(
    239                                     "bumping reconnect duration from {:?} to {:?} and retrying connect",
    240                                     relay.retry_connect_after, next_duration
    241                                 );
    242                                 relay.retry_connect_after = next_duration;
    243                                 if let Err(err) = relay.relay.connect(wakeup.clone()) {
    244                                     error!("error connecting to relay: {}", err);
    245                                 }
    246                             } else {
    247                                 // let's wait a bit before we try again
    248                             }
    249                         }
    250 
    251                         RelayStatus::Connected => {
    252                             relay.retry_connect_after =
    253                                 WebsocketRelay::initial_reconnect_duration();
    254 
    255                             let should_ping = now - relay.last_ping > self.ping_rate;
    256                             if should_ping {
    257                                 trace!("pinging {}", relay.relay.url);
    258                                 relay.relay.ping();
    259                                 relay.last_ping = Instant::now();
    260                             }
    261                         }
    262 
    263                         RelayStatus::Connecting => {
    264                             // cool story bro
    265                         }
    266                     }
    267                 }
    268             }
    269         }
    270     }
    271 
    272     pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) {
    273         for relay in &mut self.relays {
    274             if relay.url() == relay_url {
    275                 if let Some(debug) = &mut self.debug {
    276                     debug.send_cmd(relay.url().to_owned(), cmd);
    277                 }
    278                 if let Err(err) = relay.send(cmd) {
    279                     error!("send_to err: {err}");
    280                 }
    281                 return;
    282             }
    283         }
    284     }
    285 
    286     /// check whether a relay url is valid to add
    287     pub fn is_valid_url(&self, url: &str) -> bool {
    288         if url.is_empty() {
    289             return false;
    290         }
    291         let url = match Url::parse(url) {
    292             Ok(parsed_url) => parsed_url.to_string(),
    293             Err(_err) => {
    294                 // debug!("bad relay url \"{}\": {:?}", url, err);
    295                 return false;
    296             }
    297         };
    298         if self.has(&url) {
    299             return false;
    300         }
    301         true
    302     }
    303 
    304     // Adds a websocket url to the RelayPool.
    305     pub fn add_url(
    306         &mut self,
    307         url: String,
    308         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    309     ) -> Result<()> {
    310         let url = Self::canonicalize_url(url);
    311         // Check if the URL already exists in the pool.
    312         if self.has(&url) {
    313             return Ok(());
    314         }
    315         let relay = Relay::new(
    316             nostr::RelayUrl::parse(url).map_err(|_| Error::InvalidRelayUrl)?,
    317             wakeup,
    318         )?;
    319         let pool_relay = PoolRelay::websocket(relay);
    320 
    321         self.relays.push(pool_relay);
    322 
    323         Ok(())
    324     }
    325 
    326     pub fn add_urls(
    327         &mut self,
    328         urls: BTreeSet<String>,
    329         wakeup: impl Fn() + Send + Sync + Clone + 'static,
    330     ) -> Result<()> {
    331         for url in urls {
    332             self.add_url(url, wakeup.clone())?;
    333         }
    334         Ok(())
    335     }
    336 
    337     pub fn remove_urls(&mut self, urls: &BTreeSet<String>) {
    338         self.relays
    339             .retain(|pool_relay| !urls.contains(pool_relay.url()));
    340     }
    341 
    342     // standardize the format (ie, trailing slashes)
    343     fn canonicalize_url(url: String) -> String {
    344         match Url::parse(&url) {
    345             Ok(parsed_url) => parsed_url.to_string(),
    346             Err(_) => url, // If parsing fails, return the original URL.
    347         }
    348     }
    349 
    350     /// Attempts to receive a pool event from a list of relays. The
    351     /// function searches each relay in the list in order, attempting to
    352     /// receive a message from each. If a message is received, return it.
    353     /// If no message is received from any relays, None is returned.
    354     pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> {
    355         for relay in &mut self.relays {
    356             if let PoolRelay::Multicast(mcr) = relay {
    357                 // try rejoin on multicast
    358                 if mcr.should_rejoin() {
    359                     if let Err(err) = mcr.rejoin() {
    360                         error!("multicast: rejoin error: {err}");
    361                     }
    362                 }
    363             }
    364 
    365             if let Some(event) = relay.try_recv() {
    366                 match &event {
    367                     WsEvent::Opened => {
    368                         relay.set_status(RelayStatus::Connected);
    369                     }
    370                     WsEvent::Closed => {
    371                         relay.set_status(RelayStatus::Disconnected);
    372                     }
    373                     WsEvent::Error(err) => {
    374                         error!("{:?}", err);
    375                         relay.set_status(RelayStatus::Disconnected);
    376                     }
    377                     WsEvent::Message(ev) => {
    378                         // let's just handle pongs here.
    379                         // We only need to do this natively.
    380                         #[cfg(not(target_arch = "wasm32"))]
    381                         if let WsMessage::Ping(ref bs) = ev {
    382                             trace!("pong {}", relay.url());
    383                             match relay {
    384                                 PoolRelay::Websocket(wsr) => {
    385                                     wsr.relay.sender.send(WsMessage::Pong(bs.to_owned()));
    386                                 }
    387                                 PoolRelay::Multicast(_mcr) => {}
    388                             }
    389                         }
    390                     }
    391                 }
    392 
    393                 if let Some(debug) = &mut self.debug {
    394                     debug.receive_cmd(relay.url().to_owned(), (&event).into());
    395                 }
    396 
    397                 let pool_event = PoolEvent {
    398                     event,
    399                     relay: relay.url(),
    400                 };
    401 
    402                 return Some(pool_event);
    403             }
    404         }
    405 
    406         None
    407     }
    408 }