pool.rs (7679B)
1 use crate::relay::{Relay, RelayStatus}; 2 use crate::{ClientMessage, Result}; 3 use nostrdb::Filter; 4 5 use std::collections::BTreeSet; 6 use std::time::{Duration, Instant}; 7 8 use url::Url; 9 10 #[cfg(not(target_arch = "wasm32"))] 11 use ewebsock::{WsEvent, WsMessage}; 12 13 #[cfg(not(target_arch = "wasm32"))] 14 use tracing::{debug, error}; 15 16 #[derive(Debug)] 17 pub struct PoolEvent<'a> { 18 pub relay: &'a str, 19 pub event: ewebsock::WsEvent, 20 } 21 22 impl PoolEvent<'_> { 23 pub fn into_owned(self) -> PoolEventBuf { 24 PoolEventBuf { 25 relay: self.relay.to_owned(), 26 event: self.event, 27 } 28 } 29 } 30 31 pub struct PoolEventBuf { 32 pub relay: String, 33 pub event: ewebsock::WsEvent, 34 } 35 36 pub struct PoolRelay { 37 pub relay: Relay, 38 pub last_ping: Instant, 39 pub last_connect_attempt: Instant, 40 pub retry_connect_after: Duration, 41 } 42 43 impl PoolRelay { 44 pub fn new(relay: Relay) -> PoolRelay { 45 PoolRelay { 46 relay, 47 last_ping: Instant::now(), 48 last_connect_attempt: Instant::now(), 49 retry_connect_after: Self::initial_reconnect_duration(), 50 } 51 } 52 53 pub fn initial_reconnect_duration() -> Duration { 54 Duration::from_secs(5) 55 } 56 } 57 58 pub struct RelayPool { 59 pub relays: Vec<PoolRelay>, 60 pub ping_rate: Duration, 61 } 62 63 impl Default for RelayPool { 64 fn default() -> Self { 65 RelayPool::new() 66 } 67 } 68 69 impl RelayPool { 70 // Constructs a new, empty RelayPool. 71 pub fn new() -> RelayPool { 72 RelayPool { 73 relays: vec![], 74 ping_rate: Duration::from_secs(25), 75 } 76 } 77 78 pub fn ping_rate(&mut self, duration: Duration) -> &mut Self { 79 self.ping_rate = duration; 80 self 81 } 82 83 pub fn has(&self, url: &str) -> bool { 84 for relay in &self.relays { 85 if relay.relay.url == url { 86 return true; 87 } 88 } 89 90 false 91 } 92 93 pub fn urls(&self) -> BTreeSet<String> { 94 self.relays 95 .iter() 96 .map(|pool_relay| pool_relay.relay.url.clone()) 97 .collect() 98 } 99 100 pub fn send(&mut self, cmd: &ClientMessage) { 101 for relay in &mut self.relays { 102 relay.relay.send(cmd); 103 } 104 } 105 106 pub fn unsubscribe(&mut self, subid: String) { 107 for relay in &mut self.relays { 108 relay.relay.send(&ClientMessage::close(subid.clone())); 109 } 110 } 111 112 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) { 113 for relay in &mut self.relays { 114 relay.relay.subscribe(subid.clone(), filter.clone()); 115 } 116 } 117 118 /// Keep relay connectiongs alive by pinging relays that haven't been 119 /// pinged in awhile. Adjust ping rate with [`ping_rate`]. 120 pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { 121 for relay in &mut self.relays { 122 let now = std::time::Instant::now(); 123 124 match relay.relay.status { 125 RelayStatus::Disconnected => { 126 let reconnect_at = relay.last_connect_attempt + relay.retry_connect_after; 127 if now > reconnect_at { 128 relay.last_connect_attempt = now; 129 let next_duration = Duration::from_millis( 130 ((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64, 131 ); 132 debug!( 133 "bumping reconnect duration from {:?} to {:?} and retrying connect", 134 relay.retry_connect_after, next_duration 135 ); 136 relay.retry_connect_after = next_duration; 137 if let Err(err) = relay.relay.connect(wakeup.clone()) { 138 error!("error connecting to relay: {}", err); 139 } 140 } else { 141 // let's wait a bit before we try again 142 } 143 } 144 145 RelayStatus::Connected => { 146 relay.retry_connect_after = PoolRelay::initial_reconnect_duration(); 147 148 let should_ping = now - relay.last_ping > self.ping_rate; 149 if should_ping { 150 debug!("pinging {}", relay.relay.url); 151 relay.relay.ping(); 152 relay.last_ping = Instant::now(); 153 } 154 } 155 156 RelayStatus::Connecting => { 157 // cool story bro 158 } 159 } 160 } 161 } 162 163 pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) { 164 for relay in &mut self.relays { 165 let relay = &mut relay.relay; 166 if relay.url == relay_url { 167 relay.send(cmd); 168 return; 169 } 170 } 171 } 172 173 // Adds a websocket url to the RelayPool. 174 pub fn add_url( 175 &mut self, 176 url: String, 177 wakeup: impl Fn() + Send + Sync + Clone + 'static, 178 ) -> Result<()> { 179 let url = Self::canonicalize_url(url); 180 // Check if the URL already exists in the pool. 181 if self.has(&url) { 182 return Ok(()); 183 } 184 let relay = Relay::new(url, wakeup)?; 185 let pool_relay = PoolRelay::new(relay); 186 187 self.relays.push(pool_relay); 188 189 Ok(()) 190 } 191 192 pub fn add_urls( 193 &mut self, 194 urls: BTreeSet<String>, 195 wakeup: impl Fn() + Send + Sync + Clone + 'static, 196 ) -> Result<()> { 197 for url in urls { 198 self.add_url(url, wakeup.clone())?; 199 } 200 Ok(()) 201 } 202 203 pub fn remove_urls(&mut self, urls: &BTreeSet<String>) { 204 self.relays 205 .retain(|pool_relay| !urls.contains(&pool_relay.relay.url)); 206 } 207 208 // standardize the format (ie, trailing slashes) 209 fn canonicalize_url(url: String) -> String { 210 match Url::parse(&url) { 211 Ok(parsed_url) => parsed_url.to_string(), 212 Err(_) => url, // If parsing fails, return the original URL. 213 } 214 } 215 216 /// Attempts to receive a pool event from a list of relays. The 217 /// function searches each relay in the list in order, attempting to 218 /// receive a message from each. If a message is received, return it. 219 /// If no message is received from any relays, None is returned. 220 pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> { 221 for relay in &mut self.relays { 222 let relay = &mut relay.relay; 223 if let Some(event) = relay.receiver.try_recv() { 224 match &event { 225 WsEvent::Opened => { 226 relay.status = RelayStatus::Connected; 227 } 228 WsEvent::Closed => { 229 relay.status = RelayStatus::Disconnected; 230 } 231 WsEvent::Error(err) => { 232 error!("{:?}", err); 233 relay.status = RelayStatus::Disconnected; 234 } 235 WsEvent::Message(ev) => { 236 // let's just handle pongs here. 237 // We only need to do this natively. 238 #[cfg(not(target_arch = "wasm32"))] 239 if let WsMessage::Ping(ref bs) = ev { 240 debug!("pong {}", &relay.url); 241 relay.sender.send(WsMessage::Pong(bs.to_owned())); 242 } 243 } 244 } 245 return Some(PoolEvent { 246 event, 247 relay: &relay.url, 248 }); 249 } 250 } 251 252 None 253 } 254 }