notecrumbs

a nostr opengraph server build on nostrdb and egui
git clone git://jb55.com/notecrumbs
Log | Files | Refs | README | LICENSE

main.rs (39347B)


      1 use std::net::SocketAddr;
      2 use std::time::Instant;
      3 
      4 use dashmap::DashMap;
      5 use tokio::sync::watch;
      6 use tokio::task::AbortHandle;
      7 
      8 use http_body_util::Full;
      9 use hyper::body::Bytes;
     10 use hyper::header;
     11 use hyper::server::conn::http1;
     12 use hyper::service::service_fn;
     13 use hyper::{Request, Response, StatusCode};
     14 use hyper_util::rt::TokioIo;
     15 use metrics_exporter_prometheus::PrometheusHandle;
     16 use std::sync::Arc;
     17 use tokio::net::TcpListener;
     18 use tracing::{error, info};
     19 
     20 use crate::{
     21     error::Error,
     22     render::{ProfileRenderData, RenderData},
     23 };
     24 use nostr_sdk::prelude::*;
     25 use nostrdb::{Config, Filter, Ndb, NoteKey, Transaction};
     26 use std::time::Duration;
     27 
     28 mod abbrev;
     29 mod error;
     30 mod fonts;
     31 mod gradient;
     32 mod html;
     33 mod nip19;
     34 mod pfp;
     35 mod relay_pool;
     36 mod render;
     37 mod sitemap;
     38 mod unknowns;
     39 
     40 use relay_pool::RelayPool;
     41 
     42 const FRONTEND_CSS: &str = include_str!("../assets/damus.css");
     43 const POETSEN_FONT: &[u8] = include_bytes!("../fonts/PoetsenOne-Regular.ttf");
     44 const DEFAULT_PFP_IMAGE: &[u8] = include_bytes!("../assets/default_pfp.jpg");
     45 const DAMUS_LOGO_ICON: &[u8] = include_bytes!("../assets/logo_icon.png");
     46 
     47 /// Minimum interval between background profile feed refreshes for the same pubkey
     48 const PROFILE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
     49 
     50 /// Minimum interval between background note secondary fetches (unknowns, stats, replies)
     51 const NOTE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60);
     52 
     53 /// Prune refresh tracking maps when they exceed this size (~40KB max memory each)
     54 const REFRESH_MAP_PRUNE_THRESHOLD: usize = 1000;
     55 
     56 /// Tracks the state of a background refresh (used for both profiles and notes)
     57 enum RefreshState {
     58     /// Refresh currently in progress with handle to abort if stuck
     59     InProgress {
     60         started: Instant,
     61         handle: AbortHandle,
     62     },
     63     /// Last successful refresh completed at this time
     64     Completed(Instant),
     65 }
     66 
     67 #[derive(Clone)]
     68 pub struct Notecrumbs {
     69     pub ndb: Ndb,
     70     _keys: Keys,
     71     relay_pool: Arc<RelayPool>,
     72     font_data: egui::FontData,
     73     default_pfp: egui::ImageData,
     74     background: egui::ImageData,
     75     prometheus_handle: PrometheusHandle,
     76 
     77     /// How long do we wait for remote note requests
     78     _timeout: Duration,
     79 
     80     /// Tracks refresh state per pubkey - prevents excessive relay queries and concurrent fetches
     81     profile_refresh_state: Arc<DashMap<[u8; 32], RefreshState>>,
     82 
     83     /// Tracks refresh state per note id - debounces background fetches (unknowns, stats, replies)
     84     note_refresh_state: Arc<DashMap<[u8; 32], RefreshState>>,
     85 
     86     /// Inflight fetches - deduplicates concurrent relay queries for the same resource.
     87     /// Keyed by nip19 debounce key. Waiters clone the watch::Receiver and wait for
     88     /// the fetcher to signal completion. Uses watch instead of Notify to avoid
     89     /// missed-notification races.
     90     inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>>,
     91 }
     92 
     93 #[inline]
     94 pub fn floor_char_boundary(s: &str, index: usize) -> usize {
     95     if index >= s.len() {
     96         s.len()
     97     } else {
     98         let lower_bound = index.saturating_sub(3);
     99         let new_index = s.as_bytes()[lower_bound..=index]
    100             .iter()
    101             .rposition(|b| is_utf8_char_boundary(*b));
    102 
    103         // SAFETY: we know that the character boundary will be within four bytes
    104         unsafe { lower_bound + new_index.unwrap_unchecked() }
    105     }
    106 }
    107 
    108 #[inline]
    109 fn is_utf8_char_boundary(c: u8) -> bool {
    110     // This is bit magic equivalent to: b < 128 || b >= 192
    111     (c as i8) >= -0x40
    112 }
    113 
    114 /// Derive a 32-byte debounce key from any nip19 reference.
    115 /// Used to deduplicate relay fetches across concurrent and repeated requests.
    116 fn nip19_debounce_key(nip19: &Nip19) -> [u8; 32] {
    117     use std::hash::{Hash, Hasher};
    118     match nip19 {
    119         Nip19::Event(ev) => *ev.event_id.as_bytes(),
    120         Nip19::EventId(id) => *id.as_bytes(),
    121         Nip19::Pubkey(pk) => pk.to_bytes(),
    122         Nip19::Profile(p) => p.public_key.to_bytes(),
    123         Nip19::Coordinate(coord) => {
    124             // Hash the address components into a stable 32-byte key
    125             let mut hasher = std::collections::hash_map::DefaultHasher::new();
    126             coord.coordinate.public_key.to_bytes().hash(&mut hasher);
    127             coord.coordinate.kind.as_u16().hash(&mut hasher);
    128             coord.coordinate.identifier.hash(&mut hasher);
    129             let h = hasher.finish().to_le_bytes();
    130             let mut key = [0u8; 32];
    131             // Repeat the 8-byte hash to fill 32 bytes
    132             key[..8].copy_from_slice(&h);
    133             key[8..16].copy_from_slice(&h);
    134             key[16..24].copy_from_slice(&h);
    135             key[24..32].copy_from_slice(&h);
    136             key
    137         }
    138         Nip19::Secret(_) => [0u8; 32], // shouldn't happen, rejected earlier
    139     }
    140 }
    141 
    142 /// Try to spawn a debounced background task. Returns true if the task was spawned.
    143 ///
    144 /// Uses the refresh state map to prevent concurrent and rapid-fire fetches for the
    145 /// same key. Tasks that are stuck (>10 min) are aborted and retried.
    146 fn try_spawn_debounced<F>(
    147     state_map: &Arc<DashMap<[u8; 32], RefreshState>>,
    148     key: [u8; 32],
    149     interval: Duration,
    150     task: F,
    151 ) -> bool
    152 where
    153     F: FnOnce(Arc<DashMap<[u8; 32], RefreshState>>, [u8; 32]) -> tokio::task::JoinHandle<()>,
    154 {
    155     use dashmap::mapref::entry::Entry;
    156 
    157     let now = Instant::now();
    158 
    159     // Prune stale entries to bound memory
    160     if state_map.len() > REFRESH_MAP_PRUNE_THRESHOLD {
    161         state_map.retain(|_, state| match state {
    162             RefreshState::InProgress { .. } => true,
    163             RefreshState::Completed(t) => now.duration_since(*t) < interval,
    164         });
    165     }
    166 
    167     match state_map.entry(key) {
    168         Entry::Occupied(mut occupied) => {
    169             let should_refresh = match occupied.get() {
    170                 // Already refreshing - skip unless stuck (>10 min)
    171                 RefreshState::InProgress { started, .. }
    172                     if now.duration_since(*started) < Duration::from_secs(10 * 60) =>
    173                 {
    174                     false
    175                 }
    176                 // Recently completed - skip
    177                 RefreshState::Completed(t) if now.duration_since(*t) < interval => false,
    178                 // Stuck fetch - abort and restart
    179                 RefreshState::InProgress { handle, .. } => {
    180                     handle.abort();
    181                     true
    182                 }
    183                 // Stale completion - refresh
    184                 RefreshState::Completed(_) => true,
    185             };
    186 
    187             if should_refresh {
    188                 let handle = task(state_map.clone(), key);
    189                 occupied.insert(RefreshState::InProgress {
    190                     started: now,
    191                     handle: handle.abort_handle(),
    192                 });
    193                 true
    194             } else {
    195                 false
    196             }
    197         }
    198         Entry::Vacant(vacant) => {
    199             let handle = task(state_map.clone(), key);
    200             vacant.insert(RefreshState::InProgress {
    201                 started: now,
    202                 handle: handle.abort_handle(),
    203             });
    204             true
    205         }
    206     }
    207 }
    208 
    209 /// Deduplicates concurrent async work for the same key.
    210 ///
    211 /// Returns `true` if this call was the "fetcher" (ran the work),
    212 /// `false` if it was a "waiter" (another call was already in progress).
    213 ///
    214 /// Uses `DashMap::entry()` for atomic check-and-insert (no TOCTOU race)
    215 /// and `watch` channels so waiters can't miss the completion signal.
    216 async fn run_inflight_deduplicated<F, Fut>(
    217     inflight: &DashMap<[u8; 32], watch::Receiver<bool>>,
    218     key: [u8; 32],
    219     work: F,
    220 ) -> bool
    221 where
    222     F: FnOnce() -> Fut,
    223     Fut: std::future::Future<Output = ()>,
    224 {
    225     use dashmap::mapref::entry::Entry;
    226 
    227     match inflight.entry(key) {
    228         Entry::Occupied(entry) => {
    229             // Another request is already fetching — clone receiver then
    230             // release the shard lock before awaiting
    231             let mut rx = entry.get().clone();
    232             drop(entry);
    233             // wait_for checks the current value first, so even if the
    234             // fetcher already completed we won't miss it
    235             let _ = rx.wait_for(|&done| done).await;
    236             false
    237         }
    238         Entry::Vacant(entry) => {
    239             // We're the first — insert a watch receiver so waiters can find it
    240             let (tx, rx) = watch::channel(false);
    241             entry.insert(rx);
    242 
    243             work().await;
    244 
    245             // Clean up and signal all waiters
    246             inflight.remove(&key);
    247             let _ = tx.send(true);
    248             true
    249         }
    250     }
    251 }
    252 
    253 /// Fetch missing render data from relays, deduplicating concurrent requests
    254 /// for the same nip19 so only one relay query fires at a time.
    255 async fn fetch_if_missing(
    256     ndb: &Ndb,
    257     relay_pool: &Arc<RelayPool>,
    258     inflight: &DashMap<[u8; 32], watch::Receiver<bool>>,
    259     render_data: &mut RenderData,
    260     nip19: &Nip19,
    261 ) {
    262     let key = nip19_debounce_key(nip19);
    263 
    264     let was_fetcher = run_inflight_deduplicated(inflight, key, || {
    265         let ndb = ndb.clone();
    266         let relay_pool = relay_pool.clone();
    267         let nip19 = nip19.clone();
    268         let render_data_ref = &mut *render_data;
    269         async move {
    270             if let Err(err) = render_data_ref
    271                 .complete(ndb, relay_pool, nip19)
    272                 .await
    273             {
    274                 error!("Error fetching completion data: {err}");
    275             }
    276         }
    277     })
    278     .await;
    279 
    280     if !was_fetcher {
    281         // We were a waiter — re-check ndb for updated data
    282         let txn = match Transaction::new(ndb) {
    283             Ok(txn) => txn,
    284             Err(err) => {
    285                 error!("failed to open transaction after inflight wait: {err}");
    286                 return;
    287             }
    288         };
    289         if let Ok(new_rd) = render::get_render_data(ndb, &txn, nip19) {
    290             *render_data = new_rd;
    291         }
    292     }
    293 }
    294 
    295 /// Spawn a debounced background task to fetch secondary note data
    296 /// (unknowns, stats, reply profiles). Skips if a fetch already ran
    297 /// recently for this nip19 resource.
    298 fn spawn_note_secondary_fetch(
    299     ndb: &Ndb,
    300     relay_pool: &Arc<RelayPool>,
    301     note_refresh_state: &Arc<DashMap<[u8; 32], RefreshState>>,
    302     nip19: &Nip19,
    303     note_rd: &render::NoteAndProfileRenderData,
    304 ) {
    305     let ndb = ndb.clone();
    306     let relay_pool = relay_pool.clone();
    307     let note_rd_bg = note_rd.note_rd.clone();
    308     let source_relays = note_rd.source_relays.clone();
    309 
    310     try_spawn_debounced(
    311         note_refresh_state,
    312         nip19_debounce_key(nip19),
    313         NOTE_REFRESH_INTERVAL,
    314         |state_map, key| {
    315             tokio::spawn(async move {
    316                 if let Err(err) =
    317                     fetch_note_secondary_data(&relay_pool, &ndb, &note_rd_bg, &source_relays).await
    318                 {
    319                     tracing::warn!("background note secondary fetch failed: {err}");
    320                     state_map.remove(&key);
    321                     return;
    322                 }
    323                 state_map.insert(key, RefreshState::Completed(Instant::now()));
    324             })
    325         },
    326     );
    327 }
    328 
    329 /// Ensure profile feed data is available, fetching from relays if needed.
    330 /// Uses debounced background refresh when cached data exists.
    331 async fn ensure_profile_feed(
    332     ndb: &Ndb,
    333     relay_pool: &Arc<RelayPool>,
    334     inflight: &DashMap<[u8; 32], watch::Receiver<bool>>,
    335     profile_refresh_state: &Arc<DashMap<[u8; 32], RefreshState>>,
    336     profile_opt: &Option<ProfileRenderData>,
    337 ) -> Result<(), Error> {
    338     let maybe_pubkey = {
    339         let txn = Transaction::new(ndb)?;
    340         match profile_opt {
    341             Some(ProfileRenderData::Profile(profile_key)) => {
    342                 if let Ok(profile_rec) = ndb.get_profile_by_key(&txn, *profile_key) {
    343                     let note_key = NoteKey::new(profile_rec.record().note_key());
    344                     ndb.get_note_by_key(&txn, note_key)
    345                         .ok()
    346                         .map(|note| *note.pubkey())
    347                 } else {
    348                     None
    349                 }
    350             }
    351             Some(ProfileRenderData::Missing(pk)) => Some(*pk),
    352             None => None,
    353         }
    354     };
    355 
    356     let Some(pubkey) = maybe_pubkey else {
    357         return Ok(());
    358     };
    359 
    360     let has_cached_notes = {
    361         let txn = Transaction::new(ndb)?;
    362         let notes_filter = Filter::new().authors([&pubkey]).kinds([1]).limit(1).build();
    363         ndb.query(&txn, &[notes_filter], 1)
    364             .map(|results| !results.is_empty())
    365             .unwrap_or(false)
    366     };
    367 
    368     let pool = relay_pool.clone();
    369     let ndb = ndb.clone();
    370 
    371     if has_cached_notes {
    372         try_spawn_debounced(
    373             profile_refresh_state,
    374             pubkey,
    375             PROFILE_REFRESH_INTERVAL,
    376             |state_map, key| {
    377                 tokio::spawn(async move {
    378                     match render::fetch_profile_feed(pool, ndb, key).await {
    379                         Ok(()) => {
    380                             state_map.insert(key, RefreshState::Completed(Instant::now()));
    381                         }
    382                         Err(err) => {
    383                             error!("Background profile feed refresh failed: {err}");
    384                             state_map.remove(&key);
    385                         }
    386                     }
    387                 })
    388             },
    389         );
    390     } else {
    391         // No cached data: must wait for relay fetch before rendering.
    392         // Use inflight dedup so concurrent requests for the same profile
    393         // don't each fire their own relay queries.
    394         run_inflight_deduplicated(inflight, pubkey, || async move {
    395             if let Err(err) = render::fetch_profile_feed(pool, ndb, pubkey).await {
    396                 error!("Error fetching profile feed: {err}");
    397             }
    398         })
    399         .await;
    400     }
    401 
    402     Ok(())
    403 }
    404 
    405 /// Background task: fetch all secondary data for a note (unknowns, stats, reply profiles).
    406 async fn fetch_note_secondary_data(
    407     relay_pool: &Arc<RelayPool>,
    408     ndb: &Ndb,
    409     note_rd: &render::NoteRenderData,
    410     source_relays: &[nostr::RelayUrl],
    411 ) -> crate::error::Result<()> {
    412     // Fetch unknowns (author, mentions, quotes, reply chain)
    413     if let Some(unknowns) = render::collect_note_unknowns(ndb, note_rd) {
    414         tracing::debug!("fetching {} unknowns", unknowns.ids_len());
    415         render::fetch_unknowns(relay_pool, ndb, unknowns).await?;
    416     }
    417 
    418     // Fetch note stats (reactions, replies, reposts)
    419     render::fetch_note_stats(relay_pool, ndb, note_rd, source_relays).await?;
    420 
    421     // Fetch profiles for reply authors (now that replies are ingested)
    422     if let Some(reply_unknowns) = render::collect_reply_unknowns(ndb, note_rd) {
    423         tracing::debug!(
    424             "fetching {} reply author profiles",
    425             reply_unknowns.ids_len()
    426         );
    427         if let Err(err) = render::fetch_unknowns(relay_pool, ndb, reply_unknowns).await {
    428             tracing::warn!("failed to fetch reply author profiles: {err}");
    429         }
    430     }
    431 
    432     Ok(())
    433 }
    434 
    435 async fn serve(
    436     app: &Notecrumbs,
    437     r: Request<hyper::body::Incoming>,
    438 ) -> Result<Response<Full<Bytes>>, Error> {
    439     if r.uri().path() == "/metrics" {
    440         let body = app.prometheus_handle.render();
    441         return Ok(Response::builder()
    442             .status(StatusCode::OK)
    443             .header(header::CONTENT_TYPE, "text/plain; version=0.0.4")
    444             .body(Full::new(Bytes::from(body)))?);
    445     }
    446 
    447     match r.uri().path() {
    448         "/damus.css" => {
    449             return Ok(Response::builder()
    450                 .status(StatusCode::OK)
    451                 .header(header::CONTENT_TYPE, "text/css; charset=utf-8")
    452                 .body(Full::new(Bytes::from_static(FRONTEND_CSS.as_bytes())))?);
    453         }
    454         "/fonts/PoetsenOne-Regular.ttf" => {
    455             return Ok(Response::builder()
    456                 .status(StatusCode::OK)
    457                 .header(header::CONTENT_TYPE, "font/ttf")
    458                 .header(header::CACHE_CONTROL, "public, max-age=604800, immutable")
    459                 .body(Full::new(Bytes::from_static(POETSEN_FONT)))?);
    460         }
    461         "/assets/default_pfp.jpg" => {
    462             return Ok(Response::builder()
    463                 .status(StatusCode::OK)
    464                 .header(header::CONTENT_TYPE, "image/jpeg")
    465                 .header(header::CACHE_CONTROL, "public, max-age=604800")
    466                 .body(Full::new(Bytes::from_static(DEFAULT_PFP_IMAGE)))?);
    467         }
    468         "/assets/logo_icon.png" => {
    469             return Ok(Response::builder()
    470                 .status(StatusCode::OK)
    471                 .header(header::CONTENT_TYPE, "image/png")
    472                 .header(header::CACHE_CONTROL, "public, max-age=604800, immutable")
    473                 .body(Full::new(Bytes::from_static(DAMUS_LOGO_ICON)))?);
    474         }
    475         "/" => {
    476             return html::serve_homepage(r);
    477         }
    478         "/robots.txt" => {
    479             let body = sitemap::generate_robots_txt();
    480             return Ok(Response::builder()
    481                 .status(StatusCode::OK)
    482                 .header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
    483                 .header(header::CACHE_CONTROL, "public, max-age=86400")
    484                 .body(Full::new(Bytes::from(body)))?);
    485         }
    486         "/sitemap.xml" => match sitemap::generate_sitemap(&app.ndb) {
    487             Ok(xml) => {
    488                 return Ok(Response::builder()
    489                     .status(StatusCode::OK)
    490                     .header(header::CONTENT_TYPE, "application/xml; charset=utf-8")
    491                     .header(header::CACHE_CONTROL, "public, max-age=3600")
    492                     .body(Full::new(Bytes::from(xml)))?);
    493             }
    494             Err(err) => {
    495                 error!("Failed to generate sitemap: {err}");
    496                 return Ok(Response::builder()
    497                     .status(StatusCode::INTERNAL_SERVER_ERROR)
    498                     .body(Full::new(Bytes::from("Failed to generate sitemap\n")))?);
    499             }
    500         },
    501         _ => {}
    502     }
    503 
    504     let is_png = r.uri().path().ends_with(".png");
    505     let is_json = r.uri().path().ends_with(".json");
    506     let until = if is_png {
    507         4
    508     } else if is_json {
    509         5
    510     } else {
    511         0
    512     };
    513 
    514     let path_len = r.uri().path().len();
    515     let nip19 = match Nip19::from_bech32(&r.uri().path()[1..path_len - until]) {
    516         Ok(nip19) => nip19,
    517         Err(_) => {
    518             return Ok(Response::builder()
    519                 .status(StatusCode::NOT_FOUND)
    520                 .body(Full::new(Bytes::from("Invalid url\n")))?);
    521         }
    522     };
    523 
    524     // render_data is always returned, it just might be empty
    525     let mut render_data = {
    526         let txn = Transaction::new(&app.ndb)?;
    527         match render::get_render_data(&app.ndb, &txn, &nip19) {
    528             Err(_err) => {
    529                 return Ok(Response::builder()
    530                     .status(StatusCode::BAD_REQUEST)
    531                     .body(Full::new(Bytes::from(
    532                         "nsecs are not supported, what were you thinking!?\n",
    533                     )))?);
    534             }
    535             Ok(render_data) => render_data,
    536         }
    537     };
    538 
    539     // Fetch missing note/profile data from relays (deduplicated across concurrent requests)
    540     if !render_data.is_complete() {
    541         fetch_if_missing(
    542             &app.ndb,
    543             &app.relay_pool,
    544             &app.inflight,
    545             &mut render_data,
    546             &nip19,
    547         )
    548         .await;
    549     }
    550 
    551     // Spawn debounced background fetch for secondary note data (unknowns, stats, replies)
    552     if let RenderData::Note(note_rd) = &render_data {
    553         spawn_note_secondary_fetch(
    554             &app.ndb,
    555             &app.relay_pool,
    556             &app.note_refresh_state,
    557             &nip19,
    558             note_rd,
    559         );
    560     }
    561 
    562     // Ensure profile feed data is available (debounced background refresh or blocking fetch)
    563     if let RenderData::Profile(profile_opt) = &render_data {
    564         ensure_profile_feed(
    565             &app.ndb,
    566             &app.relay_pool,
    567             &app.inflight,
    568             &app.profile_refresh_state,
    569             profile_opt,
    570         )
    571         .await?;
    572     }
    573 
    574     if is_png {
    575         let data = render::render_note(app, &render_data);
    576 
    577         Ok(Response::builder()
    578             .header(header::CONTENT_TYPE, "image/png")
    579             .status(StatusCode::OK)
    580             .body(Full::new(Bytes::from(data)))?)
    581     } else if is_json {
    582         match render_data {
    583             RenderData::Note(note_rd) => html::serve_note_json(&app.ndb, &note_rd),
    584             RenderData::Profile(_profile_rd) => Ok(Response::builder()
    585                 .status(StatusCode::NOT_FOUND)
    586                 .body(Full::new(Bytes::from("todo: profile json")))?),
    587         }
    588     } else {
    589         match render_data {
    590             RenderData::Note(note_rd) => html::serve_note_html(app, &nip19, &note_rd, r),
    591             RenderData::Profile(profile_rd) => {
    592                 html::serve_profile_html(app, &nip19, profile_rd.as_ref(), r)
    593             }
    594         }
    595     }
    596 }
    597 
    598 fn get_env_timeout() -> Duration {
    599     let timeout_env = std::env::var("TIMEOUT_MS").unwrap_or("2000".to_string());
    600     let timeout_ms: u64 = timeout_env.parse().unwrap_or(2000);
    601     Duration::from_millis(timeout_ms)
    602 }
    603 
    604 fn get_gradient() -> egui::ColorImage {
    605     use egui::{Color32, ColorImage};
    606     //use egui::pos2;
    607     use gradient::Gradient;
    608 
    609     //let gradient = Gradient::linear(Color32::LIGHT_GRAY, Color32::DARK_GRAY);
    610     //let size = pfp::PFP_SIZE as usize;
    611     //let radius = (pfp::PFP_SIZE as f32) / 2.0;
    612     //let center = pos2(radius, radius);
    613 
    614     let scol = [0x1C, 0x55, 0xFF];
    615     //let ecol = [0xFA, 0x0D, 0xD4];
    616     let mcol = [0x7F, 0x35, 0xAB];
    617     //let ecol = [0xFF, 0x0B, 0xD6];
    618     let ecol = [0xC0, 0x2A, 0xBE];
    619 
    620     // TODO: skia has r/b colors swapped for some reason, fix this
    621     let start_color = Color32::from_rgb(scol[2], scol[1], scol[0]);
    622     let mid_color = Color32::from_rgb(mcol[2], mcol[1], mcol[0]);
    623     let end_color = Color32::from_rgb(ecol[2], ecol[1], ecol[0]);
    624 
    625     let gradient = Gradient::linear_many(vec![start_color, mid_color, end_color]);
    626     let pixels = gradient.to_pixel_row();
    627     let width = pixels.len();
    628     let height = 1;
    629 
    630     ColorImage {
    631         size: [width, height],
    632         pixels,
    633     }
    634 }
    635 
    636 fn get_default_pfp() -> egui::ColorImage {
    637     let mut dyn_image =
    638         ::image::load_from_memory(DEFAULT_PFP_IMAGE).expect("failed to load embedded default pfp");
    639     pfp::process_pfp_bitmap(&mut dyn_image)
    640 }
    641 
    642 #[tokio::main]
    643 async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
    644     use tracing_subscriber;
    645 
    646     tracing_subscriber::fmt::init();
    647 
    648     let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
    649 
    650     // We create a TcpListener and bind it to 127.0.0.1:3000
    651     let listener = TcpListener::bind(addr).await?;
    652     info!("Listening on 0.0.0.0:3000");
    653 
    654     let cfg = Config::new();
    655     let ndb = Ndb::new(".", &cfg).expect("ndb failed to open");
    656     let keys = Keys::generate();
    657     let timeout = get_env_timeout();
    658     let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new()
    659         .install_recorder()
    660         .expect("install prometheus recorder");
    661     let relay_pool = Arc::new(
    662         RelayPool::new(
    663             keys.clone(),
    664             &["wss://relay.damus.io", "wss://nostr.wine", "wss://nos.lol"],
    665         )
    666         .await?,
    667     );
    668     spawn_relay_pool_metrics_logger(relay_pool.clone());
    669     let default_pfp = egui::ImageData::Color(Arc::new(get_default_pfp()));
    670     let background = egui::ImageData::Color(Arc::new(get_gradient()));
    671     let font_data = egui::FontData::from_static(include_bytes!("../fonts/NotoSans-Regular.ttf"));
    672 
    673     let app = Notecrumbs {
    674         ndb,
    675         _keys: keys,
    676         relay_pool,
    677         _timeout: timeout,
    678         background,
    679         font_data,
    680         default_pfp,
    681         prometheus_handle,
    682         profile_refresh_state: Arc::new(DashMap::new()),
    683         note_refresh_state: Arc::new(DashMap::new()),
    684         inflight: Arc::new(DashMap::new()),
    685     };
    686 
    687     // We start a loop to continuously accept incoming connections
    688     loop {
    689         let (stream, _) = listener.accept().await?;
    690 
    691         // Use an adapter to access something implementing `tokio::io` traits as if they implement
    692         // `hyper::rt` IO traits.
    693         let io = TokioIo::new(stream);
    694 
    695         let app_copy = app.clone();
    696 
    697         // Spawn a tokio task to serve multiple connections concurrently
    698         tokio::task::spawn(async move {
    699             // Finally, we bind the incoming connection to our `hello` service
    700             if let Err(err) = http1::Builder::new()
    701                 // `service_fn` converts our function in a `Service`
    702                 .serve_connection(io, service_fn(|req| serve(&app_copy, req)))
    703                 .await
    704             {
    705                 println!("Error serving connection: {:?}", err);
    706             }
    707         });
    708     }
    709 }
    710 
    711 fn spawn_relay_pool_metrics_logger(pool: Arc<RelayPool>) {
    712     tokio::spawn(async move {
    713         let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60));
    714         loop {
    715             ticker.tick().await;
    716             let (stats, tracked) = pool.relay_stats().await;
    717             metrics::gauge!("relay_pool_known_relays", tracked as f64);
    718             info!(
    719                 total_relays = tracked,
    720                 ensure_calls = stats.ensure_calls,
    721                 relays_added = stats.relays_added,
    722                 connect_successes = stats.connect_successes,
    723                 connect_failures = stats.connect_failures,
    724                 "relay pool metrics snapshot"
    725             );
    726         }
    727     });
    728 }
    729 
    730 #[cfg(test)]
    731 mod tests {
    732     use super::*;
    733     use nostr::nips::nip19::{Nip19Coordinate, Nip19Profile};
    734     use std::sync::atomic::{AtomicUsize, Ordering};
    735 
    736     /// Helper: create a fresh DashMap wrapped in Arc for testing
    737     fn new_state_map() -> Arc<DashMap<[u8; 32], RefreshState>> {
    738         Arc::new(DashMap::new())
    739     }
    740 
    741     /// Helper: a test key (arbitrary 32 bytes)
    742     fn test_key(byte: u8) -> [u8; 32] {
    743         [byte; 32]
    744     }
    745 
    746     /// Helper: spawn a no-op task that completes immediately, tracking call count
    747     fn counting_task(
    748         counter: Arc<AtomicUsize>,
    749     ) -> impl FnOnce(Arc<DashMap<[u8; 32], RefreshState>>, [u8; 32]) -> tokio::task::JoinHandle<()>
    750     {
    751         move |state_map, key| {
    752             counter.fetch_add(1, Ordering::SeqCst);
    753             tokio::spawn(async move {
    754                 state_map.insert(key, RefreshState::Completed(Instant::now()));
    755             })
    756         }
    757     }
    758 
    759     // ---------------------------------------------------------------
    760     // nip19_debounce_key tests
    761     // ---------------------------------------------------------------
    762 
    763     #[test]
    764     fn debounce_key_event_uses_event_id() {
    765         let event_id = EventId::all_zeros();
    766         let nip19 = Nip19::EventId(event_id);
    767         assert_eq!(nip19_debounce_key(&nip19), *event_id.as_bytes());
    768     }
    769 
    770     #[test]
    771     fn debounce_key_pubkey_uses_pubkey_bytes() {
    772         let keys = Keys::generate();
    773         let pk = keys.public_key();
    774         let nip19 = Nip19::Pubkey(pk);
    775         assert_eq!(nip19_debounce_key(&nip19), pk.to_bytes());
    776     }
    777 
    778     #[test]
    779     fn debounce_key_profile_uses_pubkey_bytes() {
    780         let keys = Keys::generate();
    781         let pk = keys.public_key();
    782         let nip19 = Nip19::Profile(Nip19Profile::new(pk, []));
    783         assert_eq!(nip19_debounce_key(&nip19), pk.to_bytes());
    784     }
    785 
    786     #[test]
    787     fn debounce_key_coordinate_is_deterministic() {
    788         use nostr::nips::nip01::Coordinate;
    789         let keys = Keys::generate();
    790         let coord = Coordinate::new(Kind::LongFormTextNote, keys.public_key())
    791             .identifier("test-article");
    792         let nip19 = Nip19::Coordinate(Nip19Coordinate::new(coord, []));
    793         let key1 = nip19_debounce_key(&nip19);
    794         let key2 = nip19_debounce_key(&nip19);
    795         assert_eq!(key1, key2);
    796     }
    797 
    798     #[test]
    799     fn debounce_key_different_coordinates_differ() {
    800         use nostr::nips::nip01::Coordinate;
    801         let keys = Keys::generate();
    802         let coord_a = Coordinate::new(Kind::LongFormTextNote, keys.public_key())
    803             .identifier("article-a");
    804         let coord_b = Coordinate::new(Kind::LongFormTextNote, keys.public_key())
    805             .identifier("article-b");
    806         let nip19_a = Nip19::Coordinate(Nip19Coordinate::new(coord_a, []));
    807         let nip19_b = Nip19::Coordinate(Nip19Coordinate::new(coord_b, []));
    808         assert_ne!(nip19_debounce_key(&nip19_a), nip19_debounce_key(&nip19_b));
    809     }
    810 
    811     // ---------------------------------------------------------------
    812     // try_spawn_debounced tests
    813     // ---------------------------------------------------------------
    814 
    815     #[tokio::test]
    816     async fn debounce_spawns_on_first_call() {
    817         let state = new_state_map();
    818         let counter = Arc::new(AtomicUsize::new(0));
    819         let key = test_key(1);
    820 
    821         let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone()));
    822 
    823         assert!(spawned);
    824         assert_eq!(counter.load(Ordering::SeqCst), 1);
    825         // State should show InProgress (task may have completed already, but the
    826         // entry was set before the task ran)
    827         assert!(state.contains_key(&key));
    828     }
    829 
    830     #[tokio::test]
    831     async fn debounce_skips_while_in_progress() {
    832         let state = new_state_map();
    833         let key = test_key(2);
    834 
    835         // Insert a fake InProgress entry
    836         state.insert(
    837             key,
    838             RefreshState::InProgress {
    839                 started: Instant::now(),
    840                 handle: tokio::spawn(async {}).abort_handle(),
    841             },
    842         );
    843 
    844         let counter = Arc::new(AtomicUsize::new(0));
    845         let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone()));
    846 
    847         assert!(!spawned);
    848         assert_eq!(counter.load(Ordering::SeqCst), 0);
    849     }
    850 
    851     #[tokio::test]
    852     async fn debounce_skips_recently_completed() {
    853         let state = new_state_map();
    854         let key = test_key(3);
    855 
    856         // Insert a Completed entry from just now
    857         state.insert(key, RefreshState::Completed(Instant::now()));
    858 
    859         let counter = Arc::new(AtomicUsize::new(0));
    860         let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone()));
    861 
    862         assert!(!spawned);
    863         assert_eq!(counter.load(Ordering::SeqCst), 0);
    864     }
    865 
    866     #[tokio::test]
    867     async fn debounce_refreshes_after_interval_expires() {
    868         let state = new_state_map();
    869         let key = test_key(4);
    870 
    871         // Insert a Completed entry from "long ago" (past the interval)
    872         let old_time = Instant::now() - Duration::from_secs(600);
    873         state.insert(key, RefreshState::Completed(old_time));
    874 
    875         let counter = Arc::new(AtomicUsize::new(0));
    876         let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone()));
    877 
    878         assert!(spawned);
    879         assert_eq!(counter.load(Ordering::SeqCst), 1);
    880     }
    881 
    882     #[tokio::test]
    883     async fn debounce_aborts_stuck_task_and_retries() {
    884         let state = new_state_map();
    885         let key = test_key(5);
    886 
    887         // Insert InProgress from >10 minutes ago (stuck)
    888         let stuck_time = Instant::now() - Duration::from_secs(11 * 60);
    889         let stuck_handle = tokio::spawn(async { std::future::pending::<()>().await });
    890         let abort_handle = stuck_handle.abort_handle();
    891         state.insert(
    892             key,
    893             RefreshState::InProgress {
    894                 started: stuck_time,
    895                 handle: abort_handle.clone(),
    896             },
    897         );
    898 
    899         let counter = Arc::new(AtomicUsize::new(0));
    900         let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone()));
    901 
    902         assert!(spawned, "should retry after stuck task");
    903         assert_eq!(counter.load(Ordering::SeqCst), 1);
    904         // The old task should have been aborted — yield to let the runtime process it
    905         tokio::task::yield_now().await;
    906         assert!(stuck_handle.is_finished());
    907     }
    908 
    909     #[tokio::test]
    910     async fn debounce_does_not_abort_recent_in_progress() {
    911         let state = new_state_map();
    912         let key = test_key(6);
    913 
    914         // Insert InProgress from just now (not stuck)
    915         let handle = tokio::spawn(async { std::future::pending::<()>().await });
    916         state.insert(
    917             key,
    918             RefreshState::InProgress {
    919                 started: Instant::now(),
    920                 handle: handle.abort_handle(),
    921             },
    922         );
    923 
    924         let counter = Arc::new(AtomicUsize::new(0));
    925         let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone()));
    926 
    927         assert!(!spawned);
    928         assert_eq!(counter.load(Ordering::SeqCst), 0);
    929         // The original task should NOT have been aborted
    930         assert!(!handle.is_finished());
    931         handle.abort(); // cleanup
    932     }
    933 
    934     #[tokio::test]
    935     async fn debounce_prunes_stale_entries_over_threshold() {
    936         let state = new_state_map();
    937         let old_time = Instant::now() - Duration::from_secs(600);
    938         let interval = Duration::from_secs(300);
    939 
    940         // Fill the map past the threshold with stale Completed entries
    941         for i in 0..(REFRESH_MAP_PRUNE_THRESHOLD + 50) {
    942             let mut key = [0u8; 32];
    943             key[0] = (i & 0xFF) as u8;
    944             key[1] = ((i >> 8) & 0xFF) as u8;
    945             state.insert(key, RefreshState::Completed(old_time));
    946         }
    947 
    948         assert!(state.len() > REFRESH_MAP_PRUNE_THRESHOLD);
    949 
    950         // The next call should trigger pruning
    951         let key = test_key(0xFF);
    952         let counter = Arc::new(AtomicUsize::new(0));
    953         try_spawn_debounced(&state, key, interval, counting_task(counter.clone()));
    954 
    955         // Stale entries should have been pruned (only the new one + any InProgress remain)
    956         assert!(
    957             state.len() < REFRESH_MAP_PRUNE_THRESHOLD,
    958             "state map should have been pruned, but has {} entries",
    959             state.len()
    960         );
    961     }
    962 
    963     #[tokio::test]
    964     async fn debounce_independent_keys_both_spawn() {
    965         let state = new_state_map();
    966         let key_a = test_key(0xAA);
    967         let key_b = test_key(0xBB);
    968 
    969         let counter = Arc::new(AtomicUsize::new(0));
    970         let spawned_a = try_spawn_debounced(&state, key_a, Duration::from_secs(300), counting_task(counter.clone()));
    971         let spawned_b = try_spawn_debounced(&state, key_b, Duration::from_secs(300), counting_task(counter.clone()));
    972 
    973         assert!(spawned_a);
    974         assert!(spawned_b);
    975         assert_eq!(counter.load(Ordering::SeqCst), 2);
    976     }
    977 
    978     // ---------------------------------------------------------------
    979     // run_inflight_deduplicated tests
    980     // ---------------------------------------------------------------
    981 
    982     #[tokio::test]
    983     async fn inflight_first_caller_runs_work_and_returns_true() {
    984         let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
    985         let key = test_key(0xCC);
    986         let work_count = Arc::new(AtomicUsize::new(0));
    987 
    988         let wc = work_count.clone();
    989         let was_fetcher = run_inflight_deduplicated(&inflight, key, || async move {
    990             wc.fetch_add(1, Ordering::SeqCst);
    991         })
    992         .await;
    993 
    994         assert!(was_fetcher);
    995         assert_eq!(work_count.load(Ordering::SeqCst), 1);
    996     }
    997 
    998     #[tokio::test]
    999     async fn inflight_concurrent_callers_only_run_work_once() {
   1000         let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
   1001         let key = test_key(0xDD);
   1002         let work_count = Arc::new(AtomicUsize::new(0));
   1003 
   1004         // Use a channel to control when the fetcher's work completes,
   1005         // so we can launch waiters while it's in progress
   1006         let (tx, rx) = tokio::sync::oneshot::channel::<()>();
   1007 
   1008         // Spawn the fetcher — it will block until we send on tx
   1009         let inflight_c = inflight.clone();
   1010         let wc = work_count.clone();
   1011         let fetcher = tokio::spawn(async move {
   1012             run_inflight_deduplicated(&inflight_c, key, || async move {
   1013                 wc.fetch_add(1, Ordering::SeqCst);
   1014                 rx.await.ok();
   1015             })
   1016             .await
   1017         });
   1018 
   1019         // Yield to let fetcher start and insert its Notify
   1020         tokio::task::yield_now().await;
   1021 
   1022         // Spawn 10 concurrent waiters that call the same function
   1023         let mut waiters = Vec::new();
   1024         for _ in 0..10 {
   1025             let inflight_c = inflight.clone();
   1026             let wc = work_count.clone();
   1027             waiters.push(tokio::spawn(async move {
   1028                 run_inflight_deduplicated(&inflight_c, key, || async move {
   1029                     wc.fetch_add(1, Ordering::SeqCst);
   1030                 })
   1031                 .await
   1032             }));
   1033         }
   1034 
   1035         // Yield to let waiters register
   1036         tokio::task::yield_now().await;
   1037 
   1038         // Let the fetcher complete
   1039         tx.send(()).unwrap();
   1040 
   1041         let fetcher_result = fetcher.await.unwrap();
   1042         assert!(fetcher_result, "first caller should be the fetcher");
   1043 
   1044         for w in waiters {
   1045             let was_fetcher = w.await.unwrap();
   1046             assert!(!was_fetcher, "waiters should not have run work");
   1047         }
   1048 
   1049         // Work closure should have executed exactly once
   1050         assert_eq!(work_count.load(Ordering::SeqCst), 1);
   1051     }
   1052 
   1053     #[tokio::test]
   1054     async fn inflight_cleans_up_after_completion() {
   1055         let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
   1056         let key = test_key(0xEE);
   1057 
   1058         run_inflight_deduplicated(&inflight, key, || async {}).await;
   1059 
   1060         // The inflight entry should have been removed
   1061         assert!(
   1062             !inflight.contains_key(&key),
   1063             "inflight entry should be cleaned up after work completes"
   1064         );
   1065     }
   1066 
   1067     #[tokio::test]
   1068     async fn inflight_second_call_after_completion_runs_work_again() {
   1069         let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
   1070         let key = test_key(0xFF);
   1071         let work_count = Arc::new(AtomicUsize::new(0));
   1072 
   1073         // First call
   1074         let wc = work_count.clone();
   1075         run_inflight_deduplicated(&inflight, key, || async move {
   1076             wc.fetch_add(1, Ordering::SeqCst);
   1077         })
   1078         .await;
   1079 
   1080         // Second call — should run work again since inflight was cleaned up
   1081         let wc = work_count.clone();
   1082         let was_fetcher = run_inflight_deduplicated(&inflight, key, || async move {
   1083             wc.fetch_add(1, Ordering::SeqCst);
   1084         })
   1085         .await;
   1086 
   1087         assert!(was_fetcher, "second call should be a fetcher, not a waiter");
   1088         assert_eq!(work_count.load(Ordering::SeqCst), 2);
   1089     }
   1090 
   1091     #[tokio::test]
   1092     async fn inflight_independent_keys_both_run_work() {
   1093         let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
   1094         let key_a = test_key(0xAA);
   1095         let key_b = test_key(0xBB);
   1096         let work_count = Arc::new(AtomicUsize::new(0));
   1097 
   1098         let wc = work_count.clone();
   1099         let a = run_inflight_deduplicated(&inflight, key_a, || async move {
   1100             wc.fetch_add(1, Ordering::SeqCst);
   1101         })
   1102         .await;
   1103 
   1104         let wc = work_count.clone();
   1105         let b = run_inflight_deduplicated(&inflight, key_b, || async move {
   1106             wc.fetch_add(1, Ordering::SeqCst);
   1107         })
   1108         .await;
   1109 
   1110         assert!(a);
   1111         assert!(b);
   1112         assert_eq!(work_count.load(Ordering::SeqCst), 2);
   1113     }
   1114 }