relay_pool.rs (5487B)
1 use crate::Error; 2 use nostr::prelude::RelayUrl; 3 use nostr_sdk::prelude::{BoxedStream, Client, Filter, Keys, RelayEvent}; 4 use std::collections::HashSet; 5 use std::sync::Arc; 6 use tokio::sync::Mutex; 7 use tokio::time::Duration; 8 use tracing::{debug, info, warn}; 9 10 #[derive(Clone, Copy, Debug, Default)] 11 pub struct RelayStats { 12 pub ensure_calls: u64, 13 pub relays_added: u64, 14 pub connect_successes: u64, 15 pub connect_failures: u64, 16 } 17 18 /// Persistent relay pool responsible for maintaining long-lived connections. 19 #[derive(Clone)] 20 pub struct RelayPool { 21 client: Client, 22 known_relays: Arc<Mutex<HashSet<String>>>, 23 default_relays: Arc<[RelayUrl]>, 24 stats: Arc<Mutex<RelayStats>>, 25 } 26 27 impl RelayPool { 28 pub async fn new(keys: Keys, default_relays: &[&str]) -> Result<Self, Error> { 29 let client = Client::builder().signer(keys).build(); 30 let parsed_defaults: Vec<RelayUrl> = default_relays 31 .iter() 32 .filter_map(|url| match RelayUrl::parse(url) { 33 Ok(relay) => Some(relay), 34 Err(err) => { 35 warn!("failed to parse default relay {url}: {err}"); 36 None 37 } 38 }) 39 .collect(); 40 41 let default_relays = Arc::<[RelayUrl]>::from(parsed_defaults); 42 let pool = Self { 43 client, 44 known_relays: Arc::new(Mutex::new(HashSet::new())), 45 default_relays: default_relays.clone(), 46 stats: Arc::new(Mutex::new(RelayStats::default())), 47 }; 48 49 pool.ensure_relays(pool.default_relays().iter().cloned()) 50 .await?; 51 52 Ok(pool) 53 } 54 55 pub fn default_relays(&self) -> &[RelayUrl] { 56 self.default_relays.as_ref() 57 } 58 59 pub async fn ensure_relays<I>(&self, relays: I) -> Result<(), Error> 60 where 61 I: IntoIterator<Item = RelayUrl>, 62 { 63 metrics::counter!("relay_pool_ensure_calls_total", 1); 64 let mut new_relays = Vec::new(); 65 let mut had_new = false; 66 let mut relays_added = 0u64; 67 { 68 let mut guard = self.known_relays.lock().await; 69 for relay in relays { 70 let key = relay.to_string(); 71 if guard.insert(key) { 72 new_relays.push(relay); 73 had_new = true; 74 relays_added += 1; 75 } 76 } 77 } 78 79 if relays_added > 0 { 80 metrics::counter!("relay_pool_relays_added_total", relays_added); 81 } 82 83 let mut connect_success = 0u64; 84 let mut connect_failure = 0u64; 85 for relay in new_relays { 86 debug!("adding relay {}", relay); 87 self.client 88 .add_relay(relay.clone()) 89 .await 90 .map_err(|err| Error::Generic(format!("failed to add relay {relay}: {err}")))?; 91 if let Err(err) = self.client.connect_relay(relay.clone()).await { 92 warn!("failed to connect relay {}: {}", relay, err); 93 connect_failure += 1; 94 } else { 95 connect_success += 1; 96 } 97 } 98 99 if connect_success > 0 { 100 metrics::counter!("relay_pool_connect_success_total", connect_success); 101 } 102 if connect_failure > 0 { 103 metrics::counter!("relay_pool_connect_failure_total", connect_failure); 104 } 105 106 if had_new { 107 self.client.connect().await; 108 109 let mut stats = self.stats.lock().await; 110 stats.ensure_calls += 1; 111 stats.relays_added += relays_added; 112 stats.connect_successes += connect_success; 113 stats.connect_failures += connect_failure; 114 let snapshot = *stats; 115 drop(stats); 116 117 let tracked = { 118 let guard = self.known_relays.lock().await; 119 guard.len() 120 }; 121 122 info!( 123 total_relays = tracked, 124 ensure_calls = snapshot.ensure_calls, 125 relays_added = relays_added, 126 connect_successes = connect_success, 127 connect_failures = connect_failure, 128 "relay pool health update" 129 ); 130 } else { 131 let mut stats = self.stats.lock().await; 132 stats.ensure_calls += 1; 133 } 134 135 let tracked = { 136 let guard = self.known_relays.lock().await; 137 guard.len() 138 }; 139 metrics::gauge!("relay_pool_known_relays", tracked as f64); 140 141 Ok(()) 142 } 143 144 /// Stream events from relays, returning RelayEvent which includes source relay URL. 145 /// Takes a single Filter - callers should combine filters before calling. 146 pub async fn stream_events( 147 &self, 148 filter: Filter, 149 relays: &[RelayUrl], 150 timeout: Duration, 151 ) -> Result<BoxedStream<RelayEvent>, Error> { 152 if relays.is_empty() { 153 Ok(self 154 .client 155 .stream_events_with_source(filter, timeout) 156 .await?) 157 } else { 158 Ok(self 159 .client 160 .stream_events_from_with_source(relays.to_vec(), filter, timeout) 161 .await?) 162 } 163 } 164 165 pub async fn relay_stats(&self) -> (RelayStats, usize) { 166 let stats = { *self.stats.lock().await }; 167 let tracked = { 168 let guard = self.known_relays.lock().await; 169 guard.len() 170 }; 171 (stats, tracked) 172 } 173 }