pool.rs (12616B)
1 use crate::relay::{setup_multicast_relay, MulticastRelay, Relay, RelayStatus}; 2 use crate::{ClientMessage, Result}; 3 use nostrdb::Filter; 4 5 use std::collections::BTreeSet; 6 use std::time::{Duration, Instant}; 7 8 use url::Url; 9 10 #[cfg(not(target_arch = "wasm32"))] 11 use ewebsock::{WsEvent, WsMessage}; 12 13 #[cfg(not(target_arch = "wasm32"))] 14 use tracing::{debug, error}; 15 16 use super::subs_debug::SubsDebug; 17 18 #[derive(Debug)] 19 pub struct PoolEvent<'a> { 20 pub relay: &'a str, 21 pub event: ewebsock::WsEvent, 22 } 23 24 impl PoolEvent<'_> { 25 pub fn into_owned(self) -> PoolEventBuf { 26 PoolEventBuf { 27 relay: self.relay.to_owned(), 28 event: self.event, 29 } 30 } 31 } 32 33 pub struct PoolEventBuf { 34 pub relay: String, 35 pub event: ewebsock::WsEvent, 36 } 37 38 pub enum PoolRelay { 39 Websocket(WebsocketRelay), 40 Multicast(MulticastRelay), 41 } 42 43 pub struct WebsocketRelay { 44 pub relay: Relay, 45 pub last_ping: Instant, 46 pub last_connect_attempt: Instant, 47 pub retry_connect_after: Duration, 48 } 49 50 impl PoolRelay { 51 pub fn url(&self) -> &str { 52 match self { 53 Self::Websocket(wsr) => &wsr.relay.url, 54 Self::Multicast(_wsr) => "multicast", 55 } 56 } 57 58 pub fn set_status(&mut self, status: RelayStatus) { 59 match self { 60 Self::Websocket(wsr) => { 61 wsr.relay.status = status; 62 } 63 Self::Multicast(_mcr) => {} 64 } 65 } 66 67 pub fn try_recv(&self) -> Option<WsEvent> { 68 match self { 69 Self::Websocket(recvr) => recvr.relay.receiver.try_recv(), 70 Self::Multicast(recvr) => recvr.try_recv(), 71 } 72 } 73 74 pub fn status(&self) -> RelayStatus { 75 match self { 76 Self::Websocket(wsr) => wsr.relay.status, 77 Self::Multicast(mcr) => mcr.status, 78 } 79 } 80 81 pub fn send(&mut self, msg: &ClientMessage) -> Result<()> { 82 match self { 83 Self::Websocket(wsr) => { 84 wsr.relay.send(msg); 85 Ok(()) 86 } 87 88 Self::Multicast(mcr) => { 89 // we only send event client messages at the moment 90 if let ClientMessage::Event(ecm) = msg { 91 mcr.send(ecm)?; 92 } 93 Ok(()) 94 } 95 } 96 } 97 98 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) -> Result<()> { 99 self.send(&ClientMessage::req(subid, filter)) 100 } 101 102 pub fn websocket(relay: Relay) -> Self { 103 Self::Websocket(WebsocketRelay::new(relay)) 104 } 105 106 pub fn multicast(wakeup: impl Fn() + Send + Sync + Clone + 'static) -> Result<Self> { 107 Ok(Self::Multicast(setup_multicast_relay(wakeup)?)) 108 } 109 } 110 111 impl WebsocketRelay { 112 pub fn new(relay: Relay) -> Self { 113 Self { 114 relay, 115 last_ping: Instant::now(), 116 last_connect_attempt: Instant::now(), 117 retry_connect_after: Self::initial_reconnect_duration(), 118 } 119 } 120 121 pub fn initial_reconnect_duration() -> Duration { 122 Duration::from_secs(5) 123 } 124 } 125 126 pub struct RelayPool { 127 pub relays: Vec<PoolRelay>, 128 pub ping_rate: Duration, 129 pub debug: Option<SubsDebug>, 130 } 131 132 impl Default for RelayPool { 133 fn default() -> Self { 134 RelayPool::new() 135 } 136 } 137 138 impl RelayPool { 139 // Constructs a new, empty RelayPool. 140 pub fn new() -> RelayPool { 141 RelayPool { 142 relays: vec![], 143 ping_rate: Duration::from_secs(45), 144 debug: None, 145 } 146 } 147 148 pub fn add_multicast_relay( 149 &mut self, 150 wakeup: impl Fn() + Send + Sync + Clone + 'static, 151 ) -> Result<()> { 152 let multicast_relay = PoolRelay::multicast(wakeup)?; 153 self.relays.push(multicast_relay); 154 Ok(()) 155 } 156 157 pub fn use_debug(&mut self) { 158 self.debug = Some(SubsDebug::default()); 159 } 160 161 pub fn ping_rate(&mut self, duration: Duration) -> &mut Self { 162 self.ping_rate = duration; 163 self 164 } 165 166 pub fn has(&self, url: &str) -> bool { 167 for relay in &self.relays { 168 if relay.url() == url { 169 return true; 170 } 171 } 172 173 false 174 } 175 176 pub fn urls(&self) -> BTreeSet<String> { 177 self.relays 178 .iter() 179 .map(|pool_relay| pool_relay.url().to_string()) 180 .collect() 181 } 182 183 pub fn send(&mut self, cmd: &ClientMessage) { 184 for relay in &mut self.relays { 185 if let Some(debug) = &mut self.debug { 186 debug.send_cmd(relay.url().to_owned(), cmd); 187 } 188 if let Err(err) = relay.send(cmd) { 189 error!("error sending {:?} to {}: {err}", cmd, relay.url()); 190 } 191 } 192 } 193 194 pub fn unsubscribe(&mut self, subid: String) { 195 for relay in &mut self.relays { 196 let cmd = ClientMessage::close(subid.clone()); 197 if let Some(debug) = &mut self.debug { 198 debug.send_cmd(relay.url().to_owned(), &cmd); 199 } 200 if let Err(err) = relay.send(&cmd) { 201 error!( 202 "error unsubscribing from {} on {}: {err}", 203 &subid, 204 relay.url() 205 ); 206 } 207 } 208 } 209 210 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) { 211 for relay in &mut self.relays { 212 if let Some(debug) = &mut self.debug { 213 debug.send_cmd( 214 relay.url().to_owned(), 215 &ClientMessage::req(subid.clone(), filter.clone()), 216 ); 217 } 218 219 if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) { 220 error!("error subscribing to {}: {err}", relay.url()); 221 } 222 } 223 } 224 225 /// Keep relay connectiongs alive by pinging relays that haven't been 226 /// pinged in awhile. Adjust ping rate with [`ping_rate`]. 227 pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { 228 for relay in &mut self.relays { 229 let now = std::time::Instant::now(); 230 231 match relay { 232 PoolRelay::Multicast(_) => {} 233 PoolRelay::Websocket(relay) => { 234 match relay.relay.status { 235 RelayStatus::Disconnected => { 236 let reconnect_at = 237 relay.last_connect_attempt + relay.retry_connect_after; 238 if now > reconnect_at { 239 relay.last_connect_attempt = now; 240 let next_duration = Duration::from_millis(3000); 241 debug!( 242 "bumping reconnect duration from {:?} to {:?} and retrying connect", 243 relay.retry_connect_after, next_duration 244 ); 245 relay.retry_connect_after = next_duration; 246 if let Err(err) = relay.relay.connect(wakeup.clone()) { 247 error!("error connecting to relay: {}", err); 248 } 249 } else { 250 // let's wait a bit before we try again 251 } 252 } 253 254 RelayStatus::Connected => { 255 relay.retry_connect_after = 256 WebsocketRelay::initial_reconnect_duration(); 257 258 let should_ping = now - relay.last_ping > self.ping_rate; 259 if should_ping { 260 debug!("pinging {}", relay.relay.url); 261 relay.relay.ping(); 262 relay.last_ping = Instant::now(); 263 } 264 } 265 266 RelayStatus::Connecting => { 267 // cool story bro 268 } 269 } 270 } 271 } 272 } 273 } 274 275 pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) { 276 for relay in &mut self.relays { 277 if relay.url() == relay_url { 278 if let Some(debug) = &mut self.debug { 279 debug.send_cmd(relay.url().to_owned(), cmd); 280 } 281 if let Err(err) = relay.send(cmd) { 282 error!("send_to err: {err}"); 283 } 284 return; 285 } 286 } 287 } 288 289 /// check whether a relay url is valid to add 290 pub fn is_valid_url(&self, url: &str) -> bool { 291 if url.is_empty() { 292 return false; 293 } 294 let url = match Url::parse(url) { 295 Ok(parsed_url) => parsed_url.to_string(), 296 Err(_err) => { 297 // debug!("bad relay url \"{}\": {:?}", url, err); 298 return false; 299 } 300 }; 301 if self.has(&url) { 302 return false; 303 } 304 true 305 } 306 307 // Adds a websocket url to the RelayPool. 308 pub fn add_url( 309 &mut self, 310 url: String, 311 wakeup: impl Fn() + Send + Sync + Clone + 'static, 312 ) -> Result<()> { 313 let url = Self::canonicalize_url(url); 314 // Check if the URL already exists in the pool. 315 if self.has(&url) { 316 return Ok(()); 317 } 318 let relay = Relay::new(url, wakeup)?; 319 let pool_relay = PoolRelay::websocket(relay); 320 321 self.relays.push(pool_relay); 322 323 Ok(()) 324 } 325 326 pub fn add_urls( 327 &mut self, 328 urls: BTreeSet<String>, 329 wakeup: impl Fn() + Send + Sync + Clone + 'static, 330 ) -> Result<()> { 331 for url in urls { 332 self.add_url(url, wakeup.clone())?; 333 } 334 Ok(()) 335 } 336 337 pub fn remove_urls(&mut self, urls: &BTreeSet<String>) { 338 self.relays 339 .retain(|pool_relay| !urls.contains(pool_relay.url())); 340 } 341 342 // standardize the format (ie, trailing slashes) 343 fn canonicalize_url(url: String) -> String { 344 match Url::parse(&url) { 345 Ok(parsed_url) => parsed_url.to_string(), 346 Err(_) => url, // If parsing fails, return the original URL. 347 } 348 } 349 350 /// Attempts to receive a pool event from a list of relays. The 351 /// function searches each relay in the list in order, attempting to 352 /// receive a message from each. If a message is received, return it. 353 /// If no message is received from any relays, None is returned. 354 pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> { 355 for relay in &mut self.relays { 356 if let PoolRelay::Multicast(mcr) = relay { 357 // try rejoin on multicast 358 if mcr.should_rejoin() { 359 if let Err(err) = mcr.rejoin() { 360 error!("multicast: rejoin error: {err}"); 361 } 362 } 363 } 364 365 if let Some(event) = relay.try_recv() { 366 match &event { 367 WsEvent::Opened => { 368 relay.set_status(RelayStatus::Connected); 369 } 370 WsEvent::Closed => { 371 relay.set_status(RelayStatus::Disconnected); 372 } 373 WsEvent::Error(err) => { 374 error!("{:?}", err); 375 relay.set_status(RelayStatus::Disconnected); 376 } 377 WsEvent::Message(ev) => { 378 // let's just handle pongs here. 379 // We only need to do this natively. 380 #[cfg(not(target_arch = "wasm32"))] 381 if let WsMessage::Ping(ref bs) = ev { 382 debug!("pong {}", relay.url()); 383 match relay { 384 PoolRelay::Websocket(wsr) => { 385 wsr.relay.sender.send(WsMessage::Pong(bs.to_owned())); 386 } 387 PoolRelay::Multicast(_mcr) => {} 388 } 389 } 390 } 391 } 392 393 if let Some(debug) = &mut self.debug { 394 debug.receive_cmd(relay.url().to_owned(), (&event).into()); 395 } 396 397 let pool_event = PoolEvent { 398 event, 399 relay: relay.url(), 400 }; 401 402 return Some(pool_event); 403 } 404 } 405 406 None 407 } 408 }