relay_pool.rs (5484B)
1 use crate::Error; 2 use nostr::prelude::RelayUrl; 3 use nostr_sdk::prelude::{Client, Event, Filter, Keys, ReceiverStream}; 4 use std::collections::HashSet; 5 use std::sync::Arc; 6 use tokio::sync::Mutex; 7 use tokio::time::Duration; 8 use tracing::{debug, info, warn}; 9 10 #[derive(Clone, Copy, Debug, Default)] 11 pub struct RelayStats { 12 pub ensure_calls: u64, 13 pub relays_added: u64, 14 pub connect_successes: u64, 15 pub connect_failures: u64, 16 } 17 18 /// Persistent relay pool responsible for maintaining long-lived connections. 19 #[derive(Clone)] 20 pub struct RelayPool { 21 client: Client, 22 known_relays: Arc<Mutex<HashSet<String>>>, 23 default_relays: Arc<[RelayUrl]>, 24 connect_timeout: Duration, 25 stats: Arc<Mutex<RelayStats>>, 26 } 27 28 impl RelayPool { 29 pub async fn new( 30 keys: Keys, 31 default_relays: &[&str], 32 connect_timeout: Duration, 33 ) -> Result<Self, Error> { 34 let client = Client::builder().signer(keys).build(); 35 let parsed_defaults: Vec<RelayUrl> = default_relays 36 .iter() 37 .filter_map(|url| match RelayUrl::parse(url) { 38 Ok(relay) => Some(relay), 39 Err(err) => { 40 warn!("failed to parse default relay {url}: {err}"); 41 None 42 } 43 }) 44 .collect(); 45 46 let default_relays = Arc::<[RelayUrl]>::from(parsed_defaults); 47 let pool = Self { 48 client, 49 known_relays: Arc::new(Mutex::new(HashSet::new())), 50 default_relays: default_relays.clone(), 51 connect_timeout, 52 stats: Arc::new(Mutex::new(RelayStats::default())), 53 }; 54 55 pool.ensure_relays(pool.default_relays().iter().cloned()) 56 .await?; 57 58 Ok(pool) 59 } 60 61 pub fn default_relays(&self) -> &[RelayUrl] { 62 self.default_relays.as_ref() 63 } 64 65 pub async fn ensure_relays<I>(&self, relays: I) -> Result<(), Error> 66 where 67 I: IntoIterator<Item = RelayUrl>, 68 { 69 metrics::counter!("relay_pool_ensure_calls_total", 1); 70 let mut new_relays = Vec::new(); 71 let mut had_new = false; 72 let mut relays_added = 0u64; 73 { 74 let mut guard = self.known_relays.lock().await; 75 for relay in relays { 76 let key = relay.to_string(); 77 if guard.insert(key) { 78 new_relays.push(relay); 79 had_new = true; 80 relays_added += 1; 81 } 82 } 83 } 84 85 if relays_added > 0 { 86 metrics::counter!("relay_pool_relays_added_total", relays_added); 87 } 88 89 let mut connect_success = 0u64; 90 let mut connect_failure = 0u64; 91 for relay in new_relays { 92 debug!("adding relay {}", relay); 93 self.client 94 .add_relay(relay.clone()) 95 .await 96 .map_err(|err| Error::Generic(format!("failed to add relay {relay}: {err}")))?; 97 if let Err(err) = self.client.connect_relay(relay.clone()).await { 98 warn!("failed to connect relay {}: {}", relay, err); 99 connect_failure += 1; 100 } else { 101 connect_success += 1; 102 } 103 } 104 105 if connect_success > 0 { 106 metrics::counter!("relay_pool_connect_success_total", connect_success); 107 } 108 if connect_failure > 0 { 109 metrics::counter!("relay_pool_connect_failure_total", connect_failure); 110 } 111 112 if had_new { 113 self.client.connect_with_timeout(self.connect_timeout).await; 114 115 let mut stats = self.stats.lock().await; 116 stats.ensure_calls += 1; 117 stats.relays_added += relays_added; 118 stats.connect_successes += connect_success; 119 stats.connect_failures += connect_failure; 120 let snapshot = *stats; 121 drop(stats); 122 123 let tracked = { 124 let guard = self.known_relays.lock().await; 125 guard.len() 126 }; 127 128 info!( 129 total_relays = tracked, 130 ensure_calls = snapshot.ensure_calls, 131 relays_added = relays_added, 132 connect_successes = connect_success, 133 connect_failures = connect_failure, 134 "relay pool health update" 135 ); 136 } else { 137 let mut stats = self.stats.lock().await; 138 stats.ensure_calls += 1; 139 } 140 141 let tracked = { 142 let guard = self.known_relays.lock().await; 143 guard.len() 144 }; 145 metrics::gauge!("relay_pool_known_relays", tracked as f64); 146 147 Ok(()) 148 } 149 150 pub async fn stream_events( 151 &self, 152 filters: Vec<Filter>, 153 relays: &[RelayUrl], 154 timeout: Duration, 155 ) -> Result<ReceiverStream<Event>, Error> { 156 if relays.is_empty() { 157 Ok(self.client.stream_events(filters, Some(timeout)).await?) 158 } else { 159 let urls: Vec<String> = relays.iter().map(|r| r.to_string()).collect(); 160 Ok(self 161 .client 162 .stream_events_from(urls, filters, Some(timeout)) 163 .await?) 164 } 165 } 166 167 pub async fn relay_stats(&self) -> (RelayStats, usize) { 168 let stats = { *self.stats.lock().await }; 169 let tracked = { 170 let guard = self.known_relays.lock().await; 171 guard.len() 172 }; 173 (stats, tracked) 174 } 175 }