websocket.rs (3785B)
1 use crate::{relay::RelayStatus, ClientMessage, Result, Wakeup}; 2 3 use std::{ 4 fmt, 5 hash::{Hash, Hasher}, 6 time::{Duration, Instant}, 7 }; 8 9 use ewebsock::{Options, WsMessage, WsReceiver, WsSender}; 10 use tracing::{debug, error}; 11 12 /// WebsocketConn owns an outbound websocket connection to a relay. 13 pub struct WebsocketConn { 14 pub url: nostr::RelayUrl, 15 pub status: RelayStatus, 16 pub sender: WsSender, 17 pub receiver: WsReceiver, 18 } 19 20 impl fmt::Debug for WebsocketConn { 21 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 22 f.debug_struct("Relay") 23 .field("url", &self.url) 24 .field("status", &self.status) 25 .finish() 26 } 27 } 28 29 impl Hash for WebsocketConn { 30 fn hash<H: Hasher>(&self, state: &mut H) { 31 // Hashes the Relay by hashing the URL 32 self.url.hash(state); 33 } 34 } 35 36 impl PartialEq for WebsocketConn { 37 fn eq(&self, other: &Self) -> bool { 38 self.url == other.url 39 } 40 } 41 42 impl Eq for WebsocketConn {} 43 44 impl WebsocketConn { 45 pub fn new( 46 url: nostr::RelayUrl, 47 wakeup: impl Fn() + Send + Sync + Clone + 'static, 48 ) -> Result<Self> { 49 #[derive(Clone)] 50 struct TmpWakeup<W>(W); 51 52 impl<W> Wakeup for TmpWakeup<W> 53 where 54 W: Fn() + Send + Sync + Clone + 'static, 55 { 56 fn wake(&self) { 57 (self.0)() 58 } 59 } 60 61 WebsocketConn::from_wakeup(url, TmpWakeup(wakeup)) 62 } 63 64 pub fn from_wakeup<W>(url: nostr::RelayUrl, wakeup: W) -> Result<Self> 65 where 66 W: Wakeup, 67 { 68 let status = RelayStatus::Connecting; 69 let wake = wakeup; 70 let (sender, receiver) = 71 ewebsock::connect_with_wakeup(url.as_str(), Options::default(), move || wake.wake())?; 72 73 Ok(Self { 74 url, 75 sender, 76 receiver, 77 status, 78 }) 79 } 80 81 #[profiling::function] 82 pub fn send(&mut self, msg: &ClientMessage) { 83 let json = match msg.to_json() { 84 Ok(json) => { 85 debug!("sending {} to {}", json, self.url); 86 json 87 } 88 Err(e) => { 89 error!("error serializing json for filter: {e}"); 90 return; 91 } 92 }; 93 94 let txt = WsMessage::Text(json); 95 self.sender.send(txt); 96 } 97 98 pub fn connect(&mut self, wakeup: impl Fn() + Send + Sync + 'static) -> Result<()> { 99 let (sender, receiver) = 100 ewebsock::connect_with_wakeup(self.url.as_str(), Options::default(), wakeup)?; 101 self.status = RelayStatus::Connecting; 102 self.sender = sender; 103 self.receiver = receiver; 104 Ok(()) 105 } 106 107 pub fn ping(&mut self) { 108 let msg = WsMessage::Ping(vec![]); 109 self.sender.send(msg); 110 } 111 112 pub fn set_status(&mut self, status: RelayStatus) { 113 self.status = status; 114 } 115 } 116 117 /// WebsocketRelay wraps WebsocketConn with reconnect/keepalive metadata. 118 pub struct WebsocketRelay { 119 pub conn: WebsocketConn, 120 pub last_ping: Instant, 121 pub last_connect_attempt: Instant, 122 pub retry_connect_after: Duration, 123 /// Number of consecutive failed reconnect attempts. Reset to 0 on successful connection. 124 pub reconnect_attempt: u32, 125 } 126 127 impl WebsocketRelay { 128 pub fn new(relay: WebsocketConn) -> Self { 129 Self { 130 conn: relay, 131 last_ping: Instant::now(), 132 last_connect_attempt: Instant::now(), 133 retry_connect_after: Self::initial_reconnect_duration(), 134 reconnect_attempt: 0, 135 } 136 } 137 138 pub fn initial_reconnect_duration() -> Duration { 139 Duration::from_secs(5) 140 } 141 142 pub fn is_connected(&self) -> bool { 143 self.conn.status == RelayStatus::Connected 144 } 145 }