pool.rs (12661B)
1 use crate::relay::{setup_multicast_relay, MulticastRelay, Relay, RelayStatus}; 2 use crate::{ClientMessage, Error, Result}; 3 use nostrdb::Filter; 4 5 use std::collections::BTreeSet; 6 use std::time::{Duration, Instant}; 7 8 use url::Url; 9 10 use ewebsock::{WsEvent, WsMessage}; 11 use tracing::{debug, error, trace}; 12 13 use super::subs_debug::SubsDebug; 14 15 #[derive(Debug)] 16 pub struct PoolEvent<'a> { 17 pub relay: &'a str, 18 pub event: ewebsock::WsEvent, 19 } 20 21 impl PoolEvent<'_> { 22 pub fn into_owned(self) -> PoolEventBuf { 23 PoolEventBuf { 24 relay: self.relay.to_owned(), 25 event: self.event, 26 } 27 } 28 } 29 30 pub struct PoolEventBuf { 31 pub relay: String, 32 pub event: ewebsock::WsEvent, 33 } 34 35 pub enum PoolRelay { 36 Websocket(WebsocketRelay), 37 Multicast(MulticastRelay), 38 } 39 40 pub struct WebsocketRelay { 41 pub relay: Relay, 42 pub last_ping: Instant, 43 pub last_connect_attempt: Instant, 44 pub retry_connect_after: Duration, 45 } 46 47 impl PoolRelay { 48 pub fn url(&self) -> &str { 49 match self { 50 Self::Websocket(wsr) => wsr.relay.url.as_str(), 51 Self::Multicast(_wsr) => "multicast", 52 } 53 } 54 55 pub fn set_status(&mut self, status: RelayStatus) { 56 match self { 57 Self::Websocket(wsr) => { 58 wsr.relay.status = status; 59 } 60 Self::Multicast(_mcr) => {} 61 } 62 } 63 64 pub fn try_recv(&self) -> Option<WsEvent> { 65 match self { 66 Self::Websocket(recvr) => recvr.relay.receiver.try_recv(), 67 Self::Multicast(recvr) => recvr.try_recv(), 68 } 69 } 70 71 pub fn status(&self) -> RelayStatus { 72 match self { 73 Self::Websocket(wsr) => wsr.relay.status, 74 Self::Multicast(mcr) => mcr.status, 75 } 76 } 77 78 pub fn send(&mut self, msg: &ClientMessage) -> Result<()> { 79 match self { 80 Self::Websocket(wsr) => { 81 wsr.relay.send(msg); 82 Ok(()) 83 } 84 85 Self::Multicast(mcr) => { 86 // we only send event client messages at the moment 87 if let ClientMessage::Event(ecm) = msg { 88 mcr.send(ecm)?; 89 } 90 Ok(()) 91 } 92 } 93 } 94 95 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) -> Result<()> { 96 self.send(&ClientMessage::req(subid, filter)) 97 } 98 99 pub fn websocket(relay: Relay) -> Self { 100 Self::Websocket(WebsocketRelay::new(relay)) 101 } 102 103 pub fn multicast(wakeup: impl Fn() + Send + Sync + Clone + 'static) -> Result<Self> { 104 Ok(Self::Multicast(setup_multicast_relay(wakeup)?)) 105 } 106 } 107 108 impl WebsocketRelay { 109 pub fn new(relay: Relay) -> Self { 110 Self { 111 relay, 112 last_ping: Instant::now(), 113 last_connect_attempt: Instant::now(), 114 retry_connect_after: Self::initial_reconnect_duration(), 115 } 116 } 117 118 pub fn initial_reconnect_duration() -> Duration { 119 Duration::from_secs(5) 120 } 121 } 122 123 pub struct RelayPool { 124 pub relays: Vec<PoolRelay>, 125 pub ping_rate: Duration, 126 pub debug: Option<SubsDebug>, 127 } 128 129 impl Default for RelayPool { 130 fn default() -> Self { 131 RelayPool::new() 132 } 133 } 134 135 impl RelayPool { 136 // Constructs a new, empty RelayPool. 137 pub fn new() -> RelayPool { 138 RelayPool { 139 relays: vec![], 140 ping_rate: Duration::from_secs(45), 141 debug: None, 142 } 143 } 144 145 pub fn add_multicast_relay( 146 &mut self, 147 wakeup: impl Fn() + Send + Sync + Clone + 'static, 148 ) -> Result<()> { 149 let multicast_relay = PoolRelay::multicast(wakeup)?; 150 self.relays.push(multicast_relay); 151 Ok(()) 152 } 153 154 pub fn use_debug(&mut self) { 155 self.debug = Some(SubsDebug::default()); 156 } 157 158 pub fn ping_rate(&mut self, duration: Duration) -> &mut Self { 159 self.ping_rate = duration; 160 self 161 } 162 163 pub fn has(&self, url: &str) -> bool { 164 for relay in &self.relays { 165 if relay.url() == url { 166 return true; 167 } 168 } 169 170 false 171 } 172 173 pub fn urls(&self) -> BTreeSet<String> { 174 self.relays 175 .iter() 176 .map(|pool_relay| pool_relay.url().to_string()) 177 .collect() 178 } 179 180 pub fn send(&mut self, cmd: &ClientMessage) { 181 for relay in &mut self.relays { 182 if let Some(debug) = &mut self.debug { 183 debug.send_cmd(relay.url().to_owned(), cmd); 184 } 185 if let Err(err) = relay.send(cmd) { 186 error!("error sending {:?} to {}: {err}", cmd, relay.url()); 187 } 188 } 189 } 190 191 pub fn unsubscribe(&mut self, subid: String) { 192 for relay in &mut self.relays { 193 let cmd = ClientMessage::close(subid.clone()); 194 if let Some(debug) = &mut self.debug { 195 debug.send_cmd(relay.url().to_owned(), &cmd); 196 } 197 if let Err(err) = relay.send(&cmd) { 198 error!( 199 "error unsubscribing from {} on {}: {err}", 200 &subid, 201 relay.url() 202 ); 203 } 204 } 205 } 206 207 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) { 208 for relay in &mut self.relays { 209 if let Some(debug) = &mut self.debug { 210 debug.send_cmd( 211 relay.url().to_owned(), 212 &ClientMessage::req(subid.clone(), filter.clone()), 213 ); 214 } 215 216 if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) { 217 error!("error subscribing to {}: {err}", relay.url()); 218 } 219 } 220 } 221 222 /// Keep relay connectiongs alive by pinging relays that haven't been 223 /// pinged in awhile. Adjust ping rate with [`ping_rate`]. 224 pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { 225 for relay in &mut self.relays { 226 let now = std::time::Instant::now(); 227 228 match relay { 229 PoolRelay::Multicast(_) => {} 230 PoolRelay::Websocket(relay) => { 231 match relay.relay.status { 232 RelayStatus::Disconnected => { 233 let reconnect_at = 234 relay.last_connect_attempt + relay.retry_connect_after; 235 if now > reconnect_at { 236 relay.last_connect_attempt = now; 237 let next_duration = Duration::from_millis(3000); 238 debug!( 239 "bumping reconnect duration from {:?} to {:?} and retrying connect", 240 relay.retry_connect_after, next_duration 241 ); 242 relay.retry_connect_after = next_duration; 243 if let Err(err) = relay.relay.connect(wakeup.clone()) { 244 error!("error connecting to relay: {}", err); 245 } 246 } else { 247 // let's wait a bit before we try again 248 } 249 } 250 251 RelayStatus::Connected => { 252 relay.retry_connect_after = 253 WebsocketRelay::initial_reconnect_duration(); 254 255 let should_ping = now - relay.last_ping > self.ping_rate; 256 if should_ping { 257 trace!("pinging {}", relay.relay.url); 258 relay.relay.ping(); 259 relay.last_ping = Instant::now(); 260 } 261 } 262 263 RelayStatus::Connecting => { 264 // cool story bro 265 } 266 } 267 } 268 } 269 } 270 } 271 272 pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) { 273 for relay in &mut self.relays { 274 if relay.url() == relay_url { 275 if let Some(debug) = &mut self.debug { 276 debug.send_cmd(relay.url().to_owned(), cmd); 277 } 278 if let Err(err) = relay.send(cmd) { 279 error!("send_to err: {err}"); 280 } 281 return; 282 } 283 } 284 } 285 286 /// check whether a relay url is valid to add 287 pub fn is_valid_url(&self, url: &str) -> bool { 288 if url.is_empty() { 289 return false; 290 } 291 let url = match Url::parse(url) { 292 Ok(parsed_url) => parsed_url.to_string(), 293 Err(_err) => { 294 // debug!("bad relay url \"{}\": {:?}", url, err); 295 return false; 296 } 297 }; 298 if self.has(&url) { 299 return false; 300 } 301 true 302 } 303 304 // Adds a websocket url to the RelayPool. 305 pub fn add_url( 306 &mut self, 307 url: String, 308 wakeup: impl Fn() + Send + Sync + Clone + 'static, 309 ) -> Result<()> { 310 let url = Self::canonicalize_url(url); 311 // Check if the URL already exists in the pool. 312 if self.has(&url) { 313 return Ok(()); 314 } 315 let relay = Relay::new( 316 nostr::RelayUrl::parse(url).map_err(|_| Error::InvalidRelayUrl)?, 317 wakeup, 318 )?; 319 let pool_relay = PoolRelay::websocket(relay); 320 321 self.relays.push(pool_relay); 322 323 Ok(()) 324 } 325 326 pub fn add_urls( 327 &mut self, 328 urls: BTreeSet<String>, 329 wakeup: impl Fn() + Send + Sync + Clone + 'static, 330 ) -> Result<()> { 331 for url in urls { 332 self.add_url(url, wakeup.clone())?; 333 } 334 Ok(()) 335 } 336 337 pub fn remove_urls(&mut self, urls: &BTreeSet<String>) { 338 self.relays 339 .retain(|pool_relay| !urls.contains(pool_relay.url())); 340 } 341 342 // standardize the format (ie, trailing slashes) 343 fn canonicalize_url(url: String) -> String { 344 match Url::parse(&url) { 345 Ok(parsed_url) => parsed_url.to_string(), 346 Err(_) => url, // If parsing fails, return the original URL. 347 } 348 } 349 350 /// Attempts to receive a pool event from a list of relays. The 351 /// function searches each relay in the list in order, attempting to 352 /// receive a message from each. If a message is received, return it. 353 /// If no message is received from any relays, None is returned. 354 pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> { 355 for relay in &mut self.relays { 356 if let PoolRelay::Multicast(mcr) = relay { 357 // try rejoin on multicast 358 if mcr.should_rejoin() { 359 if let Err(err) = mcr.rejoin() { 360 error!("multicast: rejoin error: {err}"); 361 } 362 } 363 } 364 365 if let Some(event) = relay.try_recv() { 366 match &event { 367 WsEvent::Opened => { 368 relay.set_status(RelayStatus::Connected); 369 } 370 WsEvent::Closed => { 371 relay.set_status(RelayStatus::Disconnected); 372 } 373 WsEvent::Error(err) => { 374 error!("{:?}", err); 375 relay.set_status(RelayStatus::Disconnected); 376 } 377 WsEvent::Message(ev) => { 378 // let's just handle pongs here. 379 // We only need to do this natively. 380 #[cfg(not(target_arch = "wasm32"))] 381 if let WsMessage::Ping(ref bs) = ev { 382 trace!("pong {}", relay.url()); 383 match relay { 384 PoolRelay::Websocket(wsr) => { 385 wsr.relay.sender.send(WsMessage::Pong(bs.to_owned())); 386 } 387 PoolRelay::Multicast(_mcr) => {} 388 } 389 } 390 } 391 } 392 393 if let Some(debug) = &mut self.debug { 394 debug.receive_cmd(relay.url().to_owned(), (&event).into()); 395 } 396 397 let pool_event = PoolEvent { 398 event, 399 relay: relay.url(), 400 }; 401 402 return Some(pool_event); 403 } 404 } 405 406 None 407 } 408 }