notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

websocket.rs (3785B)


      1 use crate::{relay::RelayStatus, ClientMessage, Result, Wakeup};
      2 
      3 use std::{
      4     fmt,
      5     hash::{Hash, Hasher},
      6     time::{Duration, Instant},
      7 };
      8 
      9 use ewebsock::{Options, WsMessage, WsReceiver, WsSender};
     10 use tracing::{debug, error};
     11 
     12 /// WebsocketConn owns an outbound websocket connection to a relay.
     13 pub struct WebsocketConn {
     14     pub url: nostr::RelayUrl,
     15     pub status: RelayStatus,
     16     pub sender: WsSender,
     17     pub receiver: WsReceiver,
     18 }
     19 
     20 impl fmt::Debug for WebsocketConn {
     21     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
     22         f.debug_struct("Relay")
     23             .field("url", &self.url)
     24             .field("status", &self.status)
     25             .finish()
     26     }
     27 }
     28 
     29 impl Hash for WebsocketConn {
     30     fn hash<H: Hasher>(&self, state: &mut H) {
     31         // Hashes the Relay by hashing the URL
     32         self.url.hash(state);
     33     }
     34 }
     35 
     36 impl PartialEq for WebsocketConn {
     37     fn eq(&self, other: &Self) -> bool {
     38         self.url == other.url
     39     }
     40 }
     41 
     42 impl Eq for WebsocketConn {}
     43 
     44 impl WebsocketConn {
     45     pub fn new(
     46         url: nostr::RelayUrl,
     47         wakeup: impl Fn() + Send + Sync + Clone + 'static,
     48     ) -> Result<Self> {
     49         #[derive(Clone)]
     50         struct TmpWakeup<W>(W);
     51 
     52         impl<W> Wakeup for TmpWakeup<W>
     53         where
     54             W: Fn() + Send + Sync + Clone + 'static,
     55         {
     56             fn wake(&self) {
     57                 (self.0)()
     58             }
     59         }
     60 
     61         WebsocketConn::from_wakeup(url, TmpWakeup(wakeup))
     62     }
     63 
     64     pub fn from_wakeup<W>(url: nostr::RelayUrl, wakeup: W) -> Result<Self>
     65     where
     66         W: Wakeup,
     67     {
     68         let status = RelayStatus::Connecting;
     69         let wake = wakeup;
     70         let (sender, receiver) =
     71             ewebsock::connect_with_wakeup(url.as_str(), Options::default(), move || wake.wake())?;
     72 
     73         Ok(Self {
     74             url,
     75             sender,
     76             receiver,
     77             status,
     78         })
     79     }
     80 
     81     #[profiling::function]
     82     pub fn send(&mut self, msg: &ClientMessage) {
     83         let json = match msg.to_json() {
     84             Ok(json) => {
     85                 debug!("sending {} to {}", json, self.url);
     86                 json
     87             }
     88             Err(e) => {
     89                 error!("error serializing json for filter: {e}");
     90                 return;
     91             }
     92         };
     93 
     94         let txt = WsMessage::Text(json);
     95         self.sender.send(txt);
     96     }
     97 
     98     pub fn connect(&mut self, wakeup: impl Fn() + Send + Sync + 'static) -> Result<()> {
     99         let (sender, receiver) =
    100             ewebsock::connect_with_wakeup(self.url.as_str(), Options::default(), wakeup)?;
    101         self.status = RelayStatus::Connecting;
    102         self.sender = sender;
    103         self.receiver = receiver;
    104         Ok(())
    105     }
    106 
    107     pub fn ping(&mut self) {
    108         let msg = WsMessage::Ping(vec![]);
    109         self.sender.send(msg);
    110     }
    111 
    112     pub fn set_status(&mut self, status: RelayStatus) {
    113         self.status = status;
    114     }
    115 }
    116 
    117 /// WebsocketRelay wraps WebsocketConn with reconnect/keepalive metadata.
    118 pub struct WebsocketRelay {
    119     pub conn: WebsocketConn,
    120     pub last_ping: Instant,
    121     pub last_connect_attempt: Instant,
    122     pub retry_connect_after: Duration,
    123     /// Number of consecutive failed reconnect attempts. Reset to 0 on successful connection.
    124     pub reconnect_attempt: u32,
    125 }
    126 
    127 impl WebsocketRelay {
    128     pub fn new(relay: WebsocketConn) -> Self {
    129         Self {
    130             conn: relay,
    131             last_ping: Instant::now(),
    132             last_connect_attempt: Instant::now(),
    133             retry_connect_after: Self::initial_reconnect_duration(),
    134             reconnect_attempt: 0,
    135         }
    136     }
    137 
    138     pub fn initial_reconnect_duration() -> Duration {
    139         Duration::from_secs(5)
    140     }
    141 
    142     pub fn is_connected(&self) -> bool {
    143         self.conn.status == RelayStatus::Connected
    144     }
    145 }