pool.rs (6322B)
1 use crate::relay::{Relay, RelayStatus}; 2 use crate::{ClientMessage, Result}; 3 use nostrdb::Filter; 4 5 use std::time::{Duration, Instant}; 6 7 #[cfg(not(target_arch = "wasm32"))] 8 use ewebsock::{WsEvent, WsMessage}; 9 10 #[cfg(not(target_arch = "wasm32"))] 11 use tracing::{debug, error}; 12 13 #[derive(Debug)] 14 pub struct PoolEvent<'a> { 15 pub relay: &'a str, 16 pub event: ewebsock::WsEvent, 17 } 18 19 pub struct PoolRelay { 20 pub relay: Relay, 21 pub last_ping: Instant, 22 pub last_connect_attempt: Instant, 23 pub retry_connect_after: Duration, 24 } 25 26 impl PoolRelay { 27 pub fn new(relay: Relay) -> PoolRelay { 28 PoolRelay { 29 relay, 30 last_ping: Instant::now(), 31 last_connect_attempt: Instant::now(), 32 retry_connect_after: Self::initial_reconnect_duration(), 33 } 34 } 35 36 pub fn initial_reconnect_duration() -> Duration { 37 Duration::from_secs(5) 38 } 39 } 40 41 pub struct RelayPool { 42 pub relays: Vec<PoolRelay>, 43 pub ping_rate: Duration, 44 } 45 46 impl Default for RelayPool { 47 fn default() -> Self { 48 RelayPool::new() 49 } 50 } 51 52 impl RelayPool { 53 // Constructs a new, empty RelayPool. 54 pub fn new() -> RelayPool { 55 RelayPool { 56 relays: vec![], 57 ping_rate: Duration::from_secs(25), 58 } 59 } 60 61 pub fn ping_rate(&mut self, duration: Duration) -> &mut Self { 62 self.ping_rate = duration; 63 self 64 } 65 66 pub fn has(&self, url: &str) -> bool { 67 for relay in &self.relays { 68 if relay.relay.url == url { 69 return true; 70 } 71 } 72 73 false 74 } 75 76 pub fn send(&mut self, cmd: &ClientMessage) { 77 for relay in &mut self.relays { 78 relay.relay.send(cmd); 79 } 80 } 81 82 pub fn unsubscribe(&mut self, subid: String) { 83 for relay in &mut self.relays { 84 relay.relay.send(&ClientMessage::close(subid.clone())); 85 } 86 } 87 88 pub fn subscribe(&mut self, subid: String, filter: Vec<Filter>) { 89 for relay in &mut self.relays { 90 relay.relay.subscribe(subid.clone(), filter.clone()); 91 } 92 } 93 94 /// Keep relay connectiongs alive by pinging relays that haven't been 95 /// pinged in awhile. Adjust ping rate with [`ping_rate`]. 96 pub fn keepalive_ping(&mut self, wakeup: impl Fn() + Send + Sync + Clone + 'static) { 97 for relay in &mut self.relays { 98 let now = std::time::Instant::now(); 99 100 match relay.relay.status { 101 RelayStatus::Disconnected => { 102 let reconnect_at = relay.last_connect_attempt + relay.retry_connect_after; 103 if now > reconnect_at { 104 relay.last_connect_attempt = now; 105 let next_duration = Duration::from_millis( 106 ((relay.retry_connect_after.as_millis() as f64) * 1.5) as u64, 107 ); 108 debug!( 109 "bumping reconnect duration from {:?} to {:?} and retrying connect", 110 relay.retry_connect_after, next_duration 111 ); 112 relay.retry_connect_after = next_duration; 113 if let Err(err) = relay.relay.connect(wakeup.clone()) { 114 error!("error connecting to relay: {}", err); 115 } 116 } else { 117 // let's wait a bit before we try again 118 } 119 } 120 121 RelayStatus::Connected => { 122 relay.retry_connect_after = PoolRelay::initial_reconnect_duration(); 123 124 let should_ping = now - relay.last_ping > self.ping_rate; 125 if should_ping { 126 debug!("pinging {}", relay.relay.url); 127 relay.relay.ping(); 128 relay.last_ping = Instant::now(); 129 } 130 } 131 132 RelayStatus::Connecting => { 133 // cool story bro 134 } 135 } 136 } 137 } 138 139 pub fn send_to(&mut self, cmd: &ClientMessage, relay_url: &str) { 140 for relay in &mut self.relays { 141 let relay = &mut relay.relay; 142 if relay.url == relay_url { 143 relay.send(cmd); 144 return; 145 } 146 } 147 } 148 149 // Adds a websocket url to the RelayPool. 150 pub fn add_url( 151 &mut self, 152 url: String, 153 wakeup: impl Fn() + Send + Sync + Clone + 'static, 154 ) -> Result<()> { 155 let relay = Relay::new(url, wakeup)?; 156 let pool_relay = PoolRelay::new(relay); 157 158 self.relays.push(pool_relay); 159 160 Ok(()) 161 } 162 163 /// Attempts to receive a pool event from a list of relays. The 164 /// function searches each relay in the list in order, attempting to 165 /// receive a message from each. If a message is received, return it. 166 /// If no message is received from any relays, None is returned. 167 pub fn try_recv(&mut self) -> Option<PoolEvent<'_>> { 168 for relay in &mut self.relays { 169 let relay = &mut relay.relay; 170 if let Some(event) = relay.receiver.try_recv() { 171 match &event { 172 WsEvent::Opened => { 173 relay.status = RelayStatus::Connected; 174 } 175 WsEvent::Closed => { 176 relay.status = RelayStatus::Disconnected; 177 } 178 WsEvent::Error(err) => { 179 error!("{:?}", err); 180 relay.status = RelayStatus::Disconnected; 181 } 182 WsEvent::Message(ev) => { 183 // let's just handle pongs here. 184 // We only need to do this natively. 185 #[cfg(not(target_arch = "wasm32"))] 186 if let WsMessage::Ping(ref bs) = ev { 187 debug!("pong {}", &relay.url); 188 relay.sender.send(WsMessage::Pong(bs.to_owned())); 189 } 190 } 191 } 192 return Some(PoolEvent { 193 event, 194 relay: &relay.url, 195 }); 196 } 197 } 198 199 None 200 } 201 }