notecrumbs

a nostr opengraph server build on nostrdb and egui
git clone git://jb55.com/notecrumbs
Log | Files | Refs | README | LICENSE

relay_pool.rs (5484B)


      1 use crate::Error;
      2 use nostr::prelude::RelayUrl;
      3 use nostr_sdk::prelude::{Client, Event, Filter, Keys, ReceiverStream};
      4 use std::collections::HashSet;
      5 use std::sync::Arc;
      6 use tokio::sync::Mutex;
      7 use tokio::time::Duration;
      8 use tracing::{debug, info, warn};
      9 
     10 #[derive(Clone, Copy, Debug, Default)]
     11 pub struct RelayStats {
     12     pub ensure_calls: u64,
     13     pub relays_added: u64,
     14     pub connect_successes: u64,
     15     pub connect_failures: u64,
     16 }
     17 
     18 /// Persistent relay pool responsible for maintaining long-lived connections.
     19 #[derive(Clone)]
     20 pub struct RelayPool {
     21     client: Client,
     22     known_relays: Arc<Mutex<HashSet<String>>>,
     23     default_relays: Arc<[RelayUrl]>,
     24     connect_timeout: Duration,
     25     stats: Arc<Mutex<RelayStats>>,
     26 }
     27 
     28 impl RelayPool {
     29     pub async fn new(
     30         keys: Keys,
     31         default_relays: &[&str],
     32         connect_timeout: Duration,
     33     ) -> Result<Self, Error> {
     34         let client = Client::builder().signer(keys).build();
     35         let parsed_defaults: Vec<RelayUrl> = default_relays
     36             .iter()
     37             .filter_map(|url| match RelayUrl::parse(url) {
     38                 Ok(relay) => Some(relay),
     39                 Err(err) => {
     40                     warn!("failed to parse default relay {url}: {err}");
     41                     None
     42                 }
     43             })
     44             .collect();
     45 
     46         let default_relays = Arc::<[RelayUrl]>::from(parsed_defaults);
     47         let pool = Self {
     48             client,
     49             known_relays: Arc::new(Mutex::new(HashSet::new())),
     50             default_relays: default_relays.clone(),
     51             connect_timeout,
     52             stats: Arc::new(Mutex::new(RelayStats::default())),
     53         };
     54 
     55         pool.ensure_relays(pool.default_relays().iter().cloned())
     56             .await?;
     57 
     58         Ok(pool)
     59     }
     60 
     61     pub fn default_relays(&self) -> &[RelayUrl] {
     62         self.default_relays.as_ref()
     63     }
     64 
     65     pub async fn ensure_relays<I>(&self, relays: I) -> Result<(), Error>
     66     where
     67         I: IntoIterator<Item = RelayUrl>,
     68     {
     69         metrics::counter!("relay_pool_ensure_calls_total", 1);
     70         let mut new_relays = Vec::new();
     71         let mut had_new = false;
     72         let mut relays_added = 0u64;
     73         {
     74             let mut guard = self.known_relays.lock().await;
     75             for relay in relays {
     76                 let key = relay.to_string();
     77                 if guard.insert(key) {
     78                     new_relays.push(relay);
     79                     had_new = true;
     80                     relays_added += 1;
     81                 }
     82             }
     83         }
     84 
     85         if relays_added > 0 {
     86             metrics::counter!("relay_pool_relays_added_total", relays_added);
     87         }
     88 
     89         let mut connect_success = 0u64;
     90         let mut connect_failure = 0u64;
     91         for relay in new_relays {
     92             debug!("adding relay {}", relay);
     93             self.client
     94                 .add_relay(relay.clone())
     95                 .await
     96                 .map_err(|err| Error::Generic(format!("failed to add relay {relay}: {err}")))?;
     97             if let Err(err) = self.client.connect_relay(relay.clone()).await {
     98                 warn!("failed to connect relay {}: {}", relay, err);
     99                 connect_failure += 1;
    100             } else {
    101                 connect_success += 1;
    102             }
    103         }
    104 
    105         if connect_success > 0 {
    106             metrics::counter!("relay_pool_connect_success_total", connect_success);
    107         }
    108         if connect_failure > 0 {
    109             metrics::counter!("relay_pool_connect_failure_total", connect_failure);
    110         }
    111 
    112         if had_new {
    113             self.client.connect_with_timeout(self.connect_timeout).await;
    114 
    115             let mut stats = self.stats.lock().await;
    116             stats.ensure_calls += 1;
    117             stats.relays_added += relays_added;
    118             stats.connect_successes += connect_success;
    119             stats.connect_failures += connect_failure;
    120             let snapshot = *stats;
    121             drop(stats);
    122 
    123             let tracked = {
    124                 let guard = self.known_relays.lock().await;
    125                 guard.len()
    126             };
    127 
    128             info!(
    129                 total_relays = tracked,
    130                 ensure_calls = snapshot.ensure_calls,
    131                 relays_added = relays_added,
    132                 connect_successes = connect_success,
    133                 connect_failures = connect_failure,
    134                 "relay pool health update"
    135             );
    136         } else {
    137             let mut stats = self.stats.lock().await;
    138             stats.ensure_calls += 1;
    139         }
    140 
    141         let tracked = {
    142             let guard = self.known_relays.lock().await;
    143             guard.len()
    144         };
    145         metrics::gauge!("relay_pool_known_relays", tracked as f64);
    146 
    147         Ok(())
    148     }
    149 
    150     pub async fn stream_events(
    151         &self,
    152         filters: Vec<Filter>,
    153         relays: &[RelayUrl],
    154         timeout: Duration,
    155     ) -> Result<ReceiverStream<Event>, Error> {
    156         if relays.is_empty() {
    157             Ok(self.client.stream_events(filters, Some(timeout)).await?)
    158         } else {
    159             let urls: Vec<String> = relays.iter().map(|r| r.to_string()).collect();
    160             Ok(self
    161                 .client
    162                 .stream_events_from(urls, filters, Some(timeout))
    163                 .await?)
    164         }
    165     }
    166 
    167     pub async fn relay_stats(&self) -> (RelayStats, usize) {
    168         let stats = { *self.stats.lock().await };
    169         let tracked = {
    170             let guard = self.known_relays.lock().await;
    171             guard.len()
    172         };
    173         (stats, tracked)
    174     }
    175 }