pool.rs (12201B)
1 use crate::relay::multicast::{setup_multicast_relay, MulticastRelay}; 2 use crate::relay::{RelayStatus, WebsocketConn, WebsocketRelay}; 3 use crate::{ClientMessage, Error, Result}; 4 use nostrdb::Filter; 5 6 use std::collections::BTreeSet; 7 use std::time::{Duration, Instant}; 8 9 use url::Url; 10 11 use ewebsock::{WsEvent, WsMessage}; 12 use tracing::{debug, error, trace}; 13 14 use super::subs_debug::SubsDebug; 15 16 #[derive(Debug)] 17 pub struct PoolEvent<'a> { 18 pub relay: &'a str, 19 pub event: ewebsock::WsEvent, 20 } 21 22 impl PoolEvent<'_> { 23 pub fn into_owned(self) -> PoolEventBuf { 24 PoolEventBuf { 25 relay: self.relay.to_owned(), 26 event: self.event, 27 } 28 } 29 } 30 31 pub struct PoolEventBuf { 32 pub relay: String, 33 pub event: ewebsock::WsEvent, 34 } 35 36 pub enum PoolRelay { 37 Websocket(WebsocketRelay), 38 Multicast(MulticastRelay), 39 } 40 41 impl PoolRelay { 42 pub fn url(&self) -> &str { 43 match self { 44 Self::Websocket(wsr) => wsr.conn.url.as_str(), 45 Self::Multicast(_wsr) => "multicast", 46 } 47 } 48 49 pub fn set_status(&mut self, status: RelayStatus) { 50 match self { 51 Self::Websocket(wsr) => { 52 wsr.conn.status = status; 53 } 54 Self::Multicast(_mcr) => {} 55 } 56 } 57 58 pub fn try_recv(&self) -> Option<WsEvent> { 59 match self { 60 Self::Websocket(recvr) => recvr.conn.receiver.try_recv(), 61 Self::Multicast(recvr) => recvr.try_recv(), 62 } 63 } 64 65 pub fn status(&self) -> RelayStatus { 66 match self { 67 Self::Websocket(wsr) => wsr.conn.status, 68 Self::Multicast(mcr) => mcr.status(), 69 } 70 } 71 72 pub fn send(&mut self, msg: &ClientMessage) -> Result<()> { 73 match self { 74 Self::Websocket(wsr) => { 75 wsr.conn.send(msg); 76 Ok(()) 77 } 78 79 Self::Multicast(mcr) => { 80 // we only send event client messages at the moment 81 if let ClientMessage::Event(ecm) = msg { 82 mcr.send(ecm)?; 83 } 84 Ok(()) 85 } 86 } 87 } 88 89 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) -> Result<()> { 90 self.send(&ClientMessage::req(subid, filter)) 91 } 92 93 pub fn websocket(relay: WebsocketConn) -> Self { 94 Self::Websocket(WebsocketRelay::new(relay)) 95 } 96 97 pub fn multicast(wakeup: impl Fn() + Send + Sync + Clone + 'static) -> Result<Self> { 98 Ok(Self::Multicast(setup_multicast_relay(wakeup)?)) 99 } 100 } 101 102 pub struct RelayPool { 103 pub relays: Vec<PoolRelay>, 104 pub ping_rate: Duration, 105 pub debug: Option<SubsDebug>, 106 } 107 108 impl Default for RelayPool { 109 fn default() -> Self { 110 RelayPool::new() 111 } 112 } 113 114 impl RelayPool { 115 // Constructs a new, empty RelayPool. 116 pub fn new() -> RelayPool { 117 RelayPool { 118 relays: vec![], 119 ping_rate: Duration::from_secs(45), 120 debug: None, 121 } 122 } 123 124 pub fn add_multicast_relay( 125 &mut self, 126 wakeup: impl Fn() + Send + Sync + Clone + 'static, 127 ) -> Result<()> { 128 let multicast_relay = PoolRelay::multicast(wakeup)?; 129 self.relays.push(multicast_relay); 130 Ok(()) 131 } 132 133 pub fn use_debug(&mut self) { 134 self.debug = Some(SubsDebug::default()); 135 } 136 137 pub fn ping_rate(&mut self, duration: Duration) -> &mut Self { 138 self.ping_rate = duration; 139 self 140 } 141 142 pub fn has(&self, url: &str) -> bool { 143 for relay in &self.relays { 144 if relay.url() == url { 145 return true; 146 } 147 } 148 149 false 150 } 151 152 pub fn urls(&self) -> BTreeSet<String> { 153 self.relays 154 .iter() 155 .map(|pool_relay| pool_relay.url().to_string()) 156 .collect() 157 } 158 159 pub fn send(&mut self, cmd: &ClientMessage) { 160 for relay in &mut self.relays { 161 if let Some(debug) = &mut self.debug { 162 debug.send_cmd(relay.url().to_owned(), cmd); 163 } 164 if let Err(err) = relay.send(cmd) { 165 error!("error sending {:?} to {}: {err}", cmd, relay.url()); 166 } 167 } 168 } 169 170 pub fn unsubscribe(&mut self, subid: String) { 171 for relay in &mut self.relays { 172 let cmd = ClientMessage::close(subid.clone()); 173 if let Some(debug) = &mut self.debug { 174 debug.send_cmd(relay.url().to_owned(), &cmd); 175 } 176 if let Err(err) = relay.send(&cmd) { 177 error!( 178 "error unsubscribing from {} on {}: {err}", 179 &subid, 180 relay.url() 181 ); 182 } 183 } 184 } 185 186 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) { 187 for relay in &mut self.relays { 188 if let Some(debug) = &mut self.debug { 189 debug.send_cmd( 190 relay.url().to_owned(), 191 &ClientMessage::req(subid.clone(), filter.clone()), 192 ); 193 } 194 195 if let Err(err) = relay.send(&ClientMessage::req(subid.clone(), filter.clone())) { 196 error!("error subscribing to {}: {err}", relay.url()); 197 } 198 } 199 } 200 201 /// Keep relay connectiongs alive by pinging relays that haven't been 202 /// pinged in awhile. Adjust ping rate with [`ping_rate`]. 203 pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { 204 for relay in &mut self.relays { 205 let now = std::time::Instant::now(); 206 207 match relay { 208 PoolRelay::Multicast(_) => {} 209 PoolRelay::Websocket(relay) => { 210 match relay.conn.status { 211 RelayStatus::Disconnected => { 212 let reconnect_at = 213 relay.last_connect_attempt + relay.retry_connect_after; 214 if now > reconnect_at { 215 relay.last_connect_attempt = now; 216 let next_duration = Duration::from_millis(3000); 217 debug!( 218 "bumping reconnect duration from {:?} to {:?} and retrying connect", 219 relay.retry_connect_after, next_duration 220 ); 221 relay.retry_connect_after = next_duration; 222 if let Err(err) = relay.conn.connect(wakeup.clone()) { 223 error!("error connecting to relay: {}", err); 224 } 225 } else { 226 // let's wait a bit before we try again 227 } 228 } 229 230 RelayStatus::Connected => { 231 relay.retry_connect_after = 232 WebsocketRelay::initial_reconnect_duration(); 233 234 let should_ping = now - relay.last_ping > self.ping_rate; 235 if should_ping { 236 trace!("pinging {}", relay.conn.url); 237 relay.conn.ping(); 238 relay.last_ping = Instant::now(); 239 } 240 } 241 242 RelayStatus::Connecting => { 243 // cool story bro 244 } 245 } 246 } 247 } 248 } 249 } 250 251 pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) { 252 for relay in &mut self.relays { 253 if relay.url() == relay_url { 254 if let Some(debug) = &mut self.debug { 255 debug.send_cmd(relay.url().to_owned(), cmd); 256 } 257 if let Err(err) = relay.send(cmd) { 258 error!("send_to err: {err}"); 259 } 260 return; 261 } 262 } 263 } 264 265 /// check whether a relay url is valid to add 266 pub fn is_valid_url(&self, url: &str) -> bool { 267 if url.is_empty() { 268 return false; 269 } 270 let url = match Url::parse(url) { 271 Ok(parsed_url) => parsed_url.to_string(), 272 Err(_err) => { 273 // debug!("bad relay url \"{}\": {:?}", url, err); 274 return false; 275 } 276 }; 277 if self.has(&url) { 278 return false; 279 } 280 true 281 } 282 283 // Adds a websocket url to the RelayPool. 284 pub fn add_url( 285 &mut self, 286 url: String, 287 wakeup: impl Fn() + Send + Sync + Clone + 'static, 288 ) -> Result<()> { 289 let url = Self::canonicalize_url(url); 290 // Check if the URL already exists in the pool. 291 if self.has(&url) { 292 return Ok(()); 293 } 294 let relay = WebsocketConn::new( 295 nostr::RelayUrl::parse(url).map_err(|_| Error::InvalidRelayUrl)?, 296 wakeup, 297 )?; 298 let pool_relay = PoolRelay::websocket(relay); 299 300 self.relays.push(pool_relay); 301 302 Ok(()) 303 } 304 305 pub fn add_urls( 306 &mut self, 307 urls: BTreeSet<String>, 308 wakeup: impl Fn() + Send + Sync + Clone + 'static, 309 ) -> Result<()> { 310 for url in urls { 311 self.add_url(url, wakeup.clone())?; 312 } 313 Ok(()) 314 } 315 316 pub fn remove_urls(&mut self, urls: &BTreeSet<String>) { 317 self.relays 318 .retain(|pool_relay| !urls.contains(pool_relay.url())); 319 } 320 321 // standardize the format (ie, trailing slashes) 322 fn canonicalize_url(url: String) -> String { 323 match Url::parse(&url) { 324 Ok(parsed_url) => parsed_url.to_string(), 325 Err(_) => url, // If parsing fails, return the original URL. 326 } 327 } 328 329 /// Attempts to receive a pool event from a list of relays. The 330 /// function searches each relay in the list in order, attempting to 331 /// receive a message from each. If a message is received, return it. 332 /// If no message is received from any relays, None is returned. 333 pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> { 334 for relay in &mut self.relays { 335 if let PoolRelay::Multicast(mcr) = relay { 336 // try rejoin on multicast 337 if mcr.should_rejoin() { 338 if let Err(err) = mcr.rejoin() { 339 error!("multicast: rejoin error: {err}"); 340 } 341 } 342 } 343 344 if let Some(event) = relay.try_recv() { 345 match &event { 346 WsEvent::Opened => { 347 relay.set_status(RelayStatus::Connected); 348 } 349 WsEvent::Closed => { 350 relay.set_status(RelayStatus::Disconnected); 351 } 352 WsEvent::Error(err) => { 353 error!("{:?}", err); 354 relay.set_status(RelayStatus::Disconnected); 355 } 356 WsEvent::Message(ev) => { 357 // let's just handle pongs here. 358 // We only need to do this natively. 359 #[cfg(not(target_arch = "wasm32"))] 360 if let WsMessage::Ping(ref bs) = ev { 361 trace!("pong {}", relay.url()); 362 match relay { 363 PoolRelay::Websocket(wsr) => { 364 wsr.conn.sender.send(WsMessage::Pong(bs.to_owned())); 365 } 366 PoolRelay::Multicast(_mcr) => {} 367 } 368 } 369 } 370 } 371 372 if let Some(debug) = &mut self.debug { 373 debug.receive_cmd(relay.url().to_owned(), (&event).into()); 374 } 375 376 let pool_event = PoolEvent { 377 event, 378 relay: relay.url(), 379 }; 380 381 return Some(pool_event); 382 } 383 } 384 385 None 386 } 387 }