notecrumbs

a nostr opengraph server build on nostrdb and egui
git clone git://jb55.com/notecrumbs
Log | Files | Refs | README | LICENSE

relay_pool.rs (5487B)


      1 use crate::Error;
      2 use nostr::prelude::RelayUrl;
      3 use nostr_sdk::prelude::{BoxedStream, Client, Filter, Keys, RelayEvent};
      4 use std::collections::HashSet;
      5 use std::sync::Arc;
      6 use tokio::sync::Mutex;
      7 use tokio::time::Duration;
      8 use tracing::{debug, info, warn};
      9 
     10 #[derive(Clone, Copy, Debug, Default)]
     11 pub struct RelayStats {
     12     pub ensure_calls: u64,
     13     pub relays_added: u64,
     14     pub connect_successes: u64,
     15     pub connect_failures: u64,
     16 }
     17 
     18 /// Persistent relay pool responsible for maintaining long-lived connections.
     19 #[derive(Clone)]
     20 pub struct RelayPool {
     21     client: Client,
     22     known_relays: Arc<Mutex<HashSet<String>>>,
     23     default_relays: Arc<[RelayUrl]>,
     24     stats: Arc<Mutex<RelayStats>>,
     25 }
     26 
     27 impl RelayPool {
     28     pub async fn new(keys: Keys, default_relays: &[&str]) -> Result<Self, Error> {
     29         let client = Client::builder().signer(keys).build();
     30         let parsed_defaults: Vec<RelayUrl> = default_relays
     31             .iter()
     32             .filter_map(|url| match RelayUrl::parse(url) {
     33                 Ok(relay) => Some(relay),
     34                 Err(err) => {
     35                     warn!("failed to parse default relay {url}: {err}");
     36                     None
     37                 }
     38             })
     39             .collect();
     40 
     41         let default_relays = Arc::<[RelayUrl]>::from(parsed_defaults);
     42         let pool = Self {
     43             client,
     44             known_relays: Arc::new(Mutex::new(HashSet::new())),
     45             default_relays: default_relays.clone(),
     46             stats: Arc::new(Mutex::new(RelayStats::default())),
     47         };
     48 
     49         pool.ensure_relays(pool.default_relays().iter().cloned())
     50             .await?;
     51 
     52         Ok(pool)
     53     }
     54 
     55     pub fn default_relays(&self) -> &[RelayUrl] {
     56         self.default_relays.as_ref()
     57     }
     58 
     59     pub async fn ensure_relays<I>(&self, relays: I) -> Result<(), Error>
     60     where
     61         I: IntoIterator<Item = RelayUrl>,
     62     {
     63         metrics::counter!("relay_pool_ensure_calls_total", 1);
     64         let mut new_relays = Vec::new();
     65         let mut had_new = false;
     66         let mut relays_added = 0u64;
     67         {
     68             let mut guard = self.known_relays.lock().await;
     69             for relay in relays {
     70                 let key = relay.to_string();
     71                 if guard.insert(key) {
     72                     new_relays.push(relay);
     73                     had_new = true;
     74                     relays_added += 1;
     75                 }
     76             }
     77         }
     78 
     79         if relays_added > 0 {
     80             metrics::counter!("relay_pool_relays_added_total", relays_added);
     81         }
     82 
     83         let mut connect_success = 0u64;
     84         let mut connect_failure = 0u64;
     85         for relay in new_relays {
     86             debug!("adding relay {}", relay);
     87             self.client
     88                 .add_relay(relay.clone())
     89                 .await
     90                 .map_err(|err| Error::Generic(format!("failed to add relay {relay}: {err}")))?;
     91             if let Err(err) = self.client.connect_relay(relay.clone()).await {
     92                 warn!("failed to connect relay {}: {}", relay, err);
     93                 connect_failure += 1;
     94             } else {
     95                 connect_success += 1;
     96             }
     97         }
     98 
     99         if connect_success > 0 {
    100             metrics::counter!("relay_pool_connect_success_total", connect_success);
    101         }
    102         if connect_failure > 0 {
    103             metrics::counter!("relay_pool_connect_failure_total", connect_failure);
    104         }
    105 
    106         if had_new {
    107             self.client.connect().await;
    108 
    109             let mut stats = self.stats.lock().await;
    110             stats.ensure_calls += 1;
    111             stats.relays_added += relays_added;
    112             stats.connect_successes += connect_success;
    113             stats.connect_failures += connect_failure;
    114             let snapshot = *stats;
    115             drop(stats);
    116 
    117             let tracked = {
    118                 let guard = self.known_relays.lock().await;
    119                 guard.len()
    120             };
    121 
    122             info!(
    123                 total_relays = tracked,
    124                 ensure_calls = snapshot.ensure_calls,
    125                 relays_added = relays_added,
    126                 connect_successes = connect_success,
    127                 connect_failures = connect_failure,
    128                 "relay pool health update"
    129             );
    130         } else {
    131             let mut stats = self.stats.lock().await;
    132             stats.ensure_calls += 1;
    133         }
    134 
    135         let tracked = {
    136             let guard = self.known_relays.lock().await;
    137             guard.len()
    138         };
    139         metrics::gauge!("relay_pool_known_relays", tracked as f64);
    140 
    141         Ok(())
    142     }
    143 
    144     /// Stream events from relays, returning RelayEvent which includes source relay URL.
    145     /// Takes a single Filter - callers should combine filters before calling.
    146     pub async fn stream_events(
    147         &self,
    148         filter: Filter,
    149         relays: &[RelayUrl],
    150         timeout: Duration,
    151     ) -> Result<BoxedStream<RelayEvent>, Error> {
    152         if relays.is_empty() {
    153             Ok(self
    154                 .client
    155                 .stream_events_with_source(filter, timeout)
    156                 .await?)
    157         } else {
    158             Ok(self
    159                 .client
    160                 .stream_events_from_with_source(relays.to_vec(), filter, timeout)
    161                 .await?)
    162         }
    163     }
    164 
    165     pub async fn relay_stats(&self) -> (RelayStats, usize) {
    166         let stats = { *self.stats.lock().await };
    167         let tracked = {
    168             let guard = self.known_relays.lock().await;
    169             guard.len()
    170         };
    171         (stats, tracked)
    172     }
    173 }