pool.rs (7056B)
1 use crate::relay::{Relay, RelayStatus}; 2 use crate::{ClientMessage, Result}; 3 use nostrdb::Filter; 4 5 use std::time::{Duration, Instant}; 6 7 use url::Url; 8 9 #[cfg(not(target_arch = "wasm32"))] 10 use ewebsock::{WsEvent, WsMessage}; 11 12 #[cfg(not(target_arch = "wasm32"))] 13 use tracing::{debug, error}; 14 15 #[derive(Debug)] 16 pub struct PoolEvent<'a> { 17 pub relay: &'a str, 18 pub event: ewebsock::WsEvent, 19 } 20 21 impl<'a> PoolEvent<'a> { 22 pub fn into_owned(self) -> PoolEventBuf { 23 PoolEventBuf { 24 relay: self.relay.to_owned(), 25 event: self.event, 26 } 27 } 28 } 29 30 pub struct PoolEventBuf { 31 pub relay: String, 32 pub event: ewebsock::WsEvent, 33 } 34 35 pub struct PoolRelay { 36 pub relay: Relay, 37 pub last_ping: Instant, 38 pub last_connect_attempt: Instant, 39 pub retry_connect_after: Duration, 40 } 41 42 impl PoolRelay { 43 pub fn new(relay: Relay) -> PoolRelay { 44 PoolRelay { 45 relay, 46 last_ping: Instant::now(), 47 last_connect_attempt: Instant::now(), 48 retry_connect_after: Self::initial_reconnect_duration(), 49 } 50 } 51 52 pub fn initial_reconnect_duration() -> Duration { 53 Duration::from_secs(5) 54 } 55 } 56 57 pub struct RelayPool { 58 pub relays: Vec<PoolRelay>, 59 pub ping_rate: Duration, 60 } 61 62 impl Default for RelayPool { 63 fn default() -> Self { 64 RelayPool::new() 65 } 66 } 67 68 impl RelayPool { 69 // Constructs a new, empty RelayPool. 70 pub fn new() -> RelayPool { 71 RelayPool { 72 relays: vec![], 73 ping_rate: Duration::from_secs(25), 74 } 75 } 76 77 pub fn ping_rate(&mut self, duration: Duration) -> &mut Self { 78 self.ping_rate = duration; 79 self 80 } 81 82 pub fn has(&self, url: &str) -> bool { 83 for relay in &self.relays { 84 if relay.relay.url == url { 85 return true; 86 } 87 } 88 89 false 90 } 91 92 pub fn send(&mut self, cmd: &ClientMessage) { 93 for relay in &mut self.relays { 94 relay.relay.send(cmd); 95 } 96 } 97 98 pub fn unsubscribe(&mut self, subid: String) { 99 for relay in &mut self.relays { 100 relay.relay.send(&ClientMessage::close(subid.clone())); 101 } 102 } 103 104 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) { 105 for relay in &mut self.relays { 106 relay.relay.subscribe(subid.clone(), filter.clone()); 107 } 108 } 109 110 /// Keep relay connectiongs alive by pinging relays that haven't been 111 /// pinged in awhile. Adjust ping rate with [`ping_rate`]. 112 pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { 113 for relay in &mut self.relays { 114 let now = std::time::Instant::now(); 115 116 match relay.relay.status { 117 RelayStatus::Disconnected => { 118 let reconnect_at = relay.last_connect_attempt + relay.retry_connect_after; 119 if now > reconnect_at { 120 relay.last_connect_attempt = now; 121 let next_duration = Duration::from_millis( 122 ((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64, 123 ); 124 debug!( 125 "bumping reconnect duration from {:?} to {:?} and retrying connect", 126 relay.retry_connect_after, next_duration 127 ); 128 relay.retry_connect_after = next_duration; 129 if let Err(err) = relay.relay.connect(wakeup.clone()) { 130 error!("error connecting to relay: {}", err); 131 } 132 } else { 133 // let's wait a bit before we try again 134 } 135 } 136 137 RelayStatus::Connected => { 138 relay.retry_connect_after = PoolRelay::initial_reconnect_duration(); 139 140 let should_ping = now - relay.last_ping > self.ping_rate; 141 if should_ping { 142 debug!("pinging {}", relay.relay.url); 143 relay.relay.ping(); 144 relay.last_ping = Instant::now(); 145 } 146 } 147 148 RelayStatus::Connecting => { 149 // cool story bro 150 } 151 } 152 } 153 } 154 155 pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) { 156 for relay in &mut self.relays { 157 let relay = &mut relay.relay; 158 if relay.url == relay_url { 159 relay.send(cmd); 160 return; 161 } 162 } 163 } 164 165 // Adds a websocket url to the RelayPool. 166 pub fn add_url( 167 &mut self, 168 url: String, 169 wakeup: impl Fn() + Send + Sync + Clone + 'static, 170 ) -> Result<()> { 171 let url = Self::canonicalize_url(url); 172 // Check if the URL already exists in the pool. 173 if self.has(&url) { 174 return Ok(()); 175 } 176 let relay = Relay::new(url, wakeup)?; 177 let pool_relay = PoolRelay::new(relay); 178 179 self.relays.push(pool_relay); 180 181 Ok(()) 182 } 183 184 // standardize the format (ie, trailing slashes) 185 fn canonicalize_url(url: String) -> String { 186 match Url::parse(&url) { 187 Ok(parsed_url) => parsed_url.to_string(), 188 Err(_) => url, // If parsing fails, return the original URL. 189 } 190 } 191 192 /// Attempts to receive a pool event from a list of relays. The 193 /// function searches each relay in the list in order, attempting to 194 /// receive a message from each. If a message is received, return it. 195 /// If no message is received from any relays, None is returned. 196 pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> { 197 for relay in &mut self.relays { 198 let relay = &mut relay.relay; 199 if let Some(event) = relay.receiver.try_recv() { 200 match &event { 201 WsEvent::Opened => { 202 relay.status = RelayStatus::Connected; 203 } 204 WsEvent::Closed => { 205 relay.status = RelayStatus::Disconnected; 206 } 207 WsEvent::Error(err) => { 208 error!("{:?}", err); 209 relay.status = RelayStatus::Disconnected; 210 } 211 WsEvent::Message(ev) => { 212 // let's just handle pongs here. 213 // We only need to do this natively. 214 #[cfg(not(target_arch = "wasm32"))] 215 if let WsMessage::Ping(ref bs) = ev { 216 debug!("pong {}", &relay.url); 217 relay.sender.send(WsMessage::Pong(bs.to_owned())); 218 } 219 } 220 } 221 return Some(PoolEvent { 222 event, 223 relay: &relay.url, 224 }); 225 } 226 } 227 228 None 229 } 230 }