main.rs (39347B)
1 use std::net::SocketAddr; 2 use std::time::Instant; 3 4 use dashmap::DashMap; 5 use tokio::sync::watch; 6 use tokio::task::AbortHandle; 7 8 use http_body_util::Full; 9 use hyper::body::Bytes; 10 use hyper::header; 11 use hyper::server::conn::http1; 12 use hyper::service::service_fn; 13 use hyper::{Request, Response, StatusCode}; 14 use hyper_util::rt::TokioIo; 15 use metrics_exporter_prometheus::PrometheusHandle; 16 use std::sync::Arc; 17 use tokio::net::TcpListener; 18 use tracing::{error, info}; 19 20 use crate::{ 21 error::Error, 22 render::{ProfileRenderData, RenderData}, 23 }; 24 use nostr_sdk::prelude::*; 25 use nostrdb::{Config, Filter, Ndb, NoteKey, Transaction}; 26 use std::time::Duration; 27 28 mod abbrev; 29 mod error; 30 mod fonts; 31 mod gradient; 32 mod html; 33 mod nip19; 34 mod pfp; 35 mod relay_pool; 36 mod render; 37 mod sitemap; 38 mod unknowns; 39 40 use relay_pool::RelayPool; 41 42 const FRONTEND_CSS: &str = include_str!("../assets/damus.css"); 43 const POETSEN_FONT: &[u8] = include_bytes!("../fonts/PoetsenOne-Regular.ttf"); 44 const DEFAULT_PFP_IMAGE: &[u8] = include_bytes!("../assets/default_pfp.jpg"); 45 const DAMUS_LOGO_ICON: &[u8] = include_bytes!("../assets/logo_icon.png"); 46 47 /// Minimum interval between background profile feed refreshes for the same pubkey 48 const PROFILE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60); 49 50 /// Minimum interval between background note secondary fetches (unknowns, stats, replies) 51 const NOTE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60); 52 53 /// Prune refresh tracking maps when they exceed this size (~40KB max memory each) 54 const REFRESH_MAP_PRUNE_THRESHOLD: usize = 1000; 55 56 /// Tracks the state of a background refresh (used for both profiles and notes) 57 enum RefreshState { 58 /// Refresh currently in progress with handle to abort if stuck 59 InProgress { 60 started: Instant, 61 handle: AbortHandle, 62 }, 63 /// Last successful refresh completed at this time 64 Completed(Instant), 65 } 66 67 #[derive(Clone)] 68 pub struct Notecrumbs { 69 pub ndb: Ndb, 70 _keys: Keys, 71 relay_pool: Arc<RelayPool>, 72 font_data: egui::FontData, 73 default_pfp: egui::ImageData, 74 background: egui::ImageData, 75 prometheus_handle: PrometheusHandle, 76 77 /// How long do we wait for remote note requests 78 _timeout: Duration, 79 80 /// Tracks refresh state per pubkey - prevents excessive relay queries and concurrent fetches 81 profile_refresh_state: Arc<DashMap<[u8; 32], RefreshState>>, 82 83 /// Tracks refresh state per note id - debounces background fetches (unknowns, stats, replies) 84 note_refresh_state: Arc<DashMap<[u8; 32], RefreshState>>, 85 86 /// Inflight fetches - deduplicates concurrent relay queries for the same resource. 87 /// Keyed by nip19 debounce key. Waiters clone the watch::Receiver and wait for 88 /// the fetcher to signal completion. Uses watch instead of Notify to avoid 89 /// missed-notification races. 90 inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>>, 91 } 92 93 #[inline] 94 pub fn floor_char_boundary(s: &str, index: usize) -> usize { 95 if index >= s.len() { 96 s.len() 97 } else { 98 let lower_bound = index.saturating_sub(3); 99 let new_index = s.as_bytes()[lower_bound..=index] 100 .iter() 101 .rposition(|b| is_utf8_char_boundary(*b)); 102 103 // SAFETY: we know that the character boundary will be within four bytes 104 unsafe { lower_bound + new_index.unwrap_unchecked() } 105 } 106 } 107 108 #[inline] 109 fn is_utf8_char_boundary(c: u8) -> bool { 110 // This is bit magic equivalent to: b < 128 || b >= 192 111 (c as i8) >= -0x40 112 } 113 114 /// Derive a 32-byte debounce key from any nip19 reference. 115 /// Used to deduplicate relay fetches across concurrent and repeated requests. 116 fn nip19_debounce_key(nip19: &Nip19) -> [u8; 32] { 117 use std::hash::{Hash, Hasher}; 118 match nip19 { 119 Nip19::Event(ev) => *ev.event_id.as_bytes(), 120 Nip19::EventId(id) => *id.as_bytes(), 121 Nip19::Pubkey(pk) => pk.to_bytes(), 122 Nip19::Profile(p) => p.public_key.to_bytes(), 123 Nip19::Coordinate(coord) => { 124 // Hash the address components into a stable 32-byte key 125 let mut hasher = std::collections::hash_map::DefaultHasher::new(); 126 coord.coordinate.public_key.to_bytes().hash(&mut hasher); 127 coord.coordinate.kind.as_u16().hash(&mut hasher); 128 coord.coordinate.identifier.hash(&mut hasher); 129 let h = hasher.finish().to_le_bytes(); 130 let mut key = [0u8; 32]; 131 // Repeat the 8-byte hash to fill 32 bytes 132 key[..8].copy_from_slice(&h); 133 key[8..16].copy_from_slice(&h); 134 key[16..24].copy_from_slice(&h); 135 key[24..32].copy_from_slice(&h); 136 key 137 } 138 Nip19::Secret(_) => [0u8; 32], // shouldn't happen, rejected earlier 139 } 140 } 141 142 /// Try to spawn a debounced background task. Returns true if the task was spawned. 143 /// 144 /// Uses the refresh state map to prevent concurrent and rapid-fire fetches for the 145 /// same key. Tasks that are stuck (>10 min) are aborted and retried. 146 fn try_spawn_debounced<F>( 147 state_map: &Arc<DashMap<[u8; 32], RefreshState>>, 148 key: [u8; 32], 149 interval: Duration, 150 task: F, 151 ) -> bool 152 where 153 F: FnOnce(Arc<DashMap<[u8; 32], RefreshState>>, [u8; 32]) -> tokio::task::JoinHandle<()>, 154 { 155 use dashmap::mapref::entry::Entry; 156 157 let now = Instant::now(); 158 159 // Prune stale entries to bound memory 160 if state_map.len() > REFRESH_MAP_PRUNE_THRESHOLD { 161 state_map.retain(|_, state| match state { 162 RefreshState::InProgress { .. } => true, 163 RefreshState::Completed(t) => now.duration_since(*t) < interval, 164 }); 165 } 166 167 match state_map.entry(key) { 168 Entry::Occupied(mut occupied) => { 169 let should_refresh = match occupied.get() { 170 // Already refreshing - skip unless stuck (>10 min) 171 RefreshState::InProgress { started, .. } 172 if now.duration_since(*started) < Duration::from_secs(10 * 60) => 173 { 174 false 175 } 176 // Recently completed - skip 177 RefreshState::Completed(t) if now.duration_since(*t) < interval => false, 178 // Stuck fetch - abort and restart 179 RefreshState::InProgress { handle, .. } => { 180 handle.abort(); 181 true 182 } 183 // Stale completion - refresh 184 RefreshState::Completed(_) => true, 185 }; 186 187 if should_refresh { 188 let handle = task(state_map.clone(), key); 189 occupied.insert(RefreshState::InProgress { 190 started: now, 191 handle: handle.abort_handle(), 192 }); 193 true 194 } else { 195 false 196 } 197 } 198 Entry::Vacant(vacant) => { 199 let handle = task(state_map.clone(), key); 200 vacant.insert(RefreshState::InProgress { 201 started: now, 202 handle: handle.abort_handle(), 203 }); 204 true 205 } 206 } 207 } 208 209 /// Deduplicates concurrent async work for the same key. 210 /// 211 /// Returns `true` if this call was the "fetcher" (ran the work), 212 /// `false` if it was a "waiter" (another call was already in progress). 213 /// 214 /// Uses `DashMap::entry()` for atomic check-and-insert (no TOCTOU race) 215 /// and `watch` channels so waiters can't miss the completion signal. 216 async fn run_inflight_deduplicated<F, Fut>( 217 inflight: &DashMap<[u8; 32], watch::Receiver<bool>>, 218 key: [u8; 32], 219 work: F, 220 ) -> bool 221 where 222 F: FnOnce() -> Fut, 223 Fut: std::future::Future<Output = ()>, 224 { 225 use dashmap::mapref::entry::Entry; 226 227 match inflight.entry(key) { 228 Entry::Occupied(entry) => { 229 // Another request is already fetching — clone receiver then 230 // release the shard lock before awaiting 231 let mut rx = entry.get().clone(); 232 drop(entry); 233 // wait_for checks the current value first, so even if the 234 // fetcher already completed we won't miss it 235 let _ = rx.wait_for(|&done| done).await; 236 false 237 } 238 Entry::Vacant(entry) => { 239 // We're the first — insert a watch receiver so waiters can find it 240 let (tx, rx) = watch::channel(false); 241 entry.insert(rx); 242 243 work().await; 244 245 // Clean up and signal all waiters 246 inflight.remove(&key); 247 let _ = tx.send(true); 248 true 249 } 250 } 251 } 252 253 /// Fetch missing render data from relays, deduplicating concurrent requests 254 /// for the same nip19 so only one relay query fires at a time. 255 async fn fetch_if_missing( 256 ndb: &Ndb, 257 relay_pool: &Arc<RelayPool>, 258 inflight: &DashMap<[u8; 32], watch::Receiver<bool>>, 259 render_data: &mut RenderData, 260 nip19: &Nip19, 261 ) { 262 let key = nip19_debounce_key(nip19); 263 264 let was_fetcher = run_inflight_deduplicated(inflight, key, || { 265 let ndb = ndb.clone(); 266 let relay_pool = relay_pool.clone(); 267 let nip19 = nip19.clone(); 268 let render_data_ref = &mut *render_data; 269 async move { 270 if let Err(err) = render_data_ref 271 .complete(ndb, relay_pool, nip19) 272 .await 273 { 274 error!("Error fetching completion data: {err}"); 275 } 276 } 277 }) 278 .await; 279 280 if !was_fetcher { 281 // We were a waiter — re-check ndb for updated data 282 let txn = match Transaction::new(ndb) { 283 Ok(txn) => txn, 284 Err(err) => { 285 error!("failed to open transaction after inflight wait: {err}"); 286 return; 287 } 288 }; 289 if let Ok(new_rd) = render::get_render_data(ndb, &txn, nip19) { 290 *render_data = new_rd; 291 } 292 } 293 } 294 295 /// Spawn a debounced background task to fetch secondary note data 296 /// (unknowns, stats, reply profiles). Skips if a fetch already ran 297 /// recently for this nip19 resource. 298 fn spawn_note_secondary_fetch( 299 ndb: &Ndb, 300 relay_pool: &Arc<RelayPool>, 301 note_refresh_state: &Arc<DashMap<[u8; 32], RefreshState>>, 302 nip19: &Nip19, 303 note_rd: &render::NoteAndProfileRenderData, 304 ) { 305 let ndb = ndb.clone(); 306 let relay_pool = relay_pool.clone(); 307 let note_rd_bg = note_rd.note_rd.clone(); 308 let source_relays = note_rd.source_relays.clone(); 309 310 try_spawn_debounced( 311 note_refresh_state, 312 nip19_debounce_key(nip19), 313 NOTE_REFRESH_INTERVAL, 314 |state_map, key| { 315 tokio::spawn(async move { 316 if let Err(err) = 317 fetch_note_secondary_data(&relay_pool, &ndb, ¬e_rd_bg, &source_relays).await 318 { 319 tracing::warn!("background note secondary fetch failed: {err}"); 320 state_map.remove(&key); 321 return; 322 } 323 state_map.insert(key, RefreshState::Completed(Instant::now())); 324 }) 325 }, 326 ); 327 } 328 329 /// Ensure profile feed data is available, fetching from relays if needed. 330 /// Uses debounced background refresh when cached data exists. 331 async fn ensure_profile_feed( 332 ndb: &Ndb, 333 relay_pool: &Arc<RelayPool>, 334 inflight: &DashMap<[u8; 32], watch::Receiver<bool>>, 335 profile_refresh_state: &Arc<DashMap<[u8; 32], RefreshState>>, 336 profile_opt: &Option<ProfileRenderData>, 337 ) -> Result<(), Error> { 338 let maybe_pubkey = { 339 let txn = Transaction::new(ndb)?; 340 match profile_opt { 341 Some(ProfileRenderData::Profile(profile_key)) => { 342 if let Ok(profile_rec) = ndb.get_profile_by_key(&txn, *profile_key) { 343 let note_key = NoteKey::new(profile_rec.record().note_key()); 344 ndb.get_note_by_key(&txn, note_key) 345 .ok() 346 .map(|note| *note.pubkey()) 347 } else { 348 None 349 } 350 } 351 Some(ProfileRenderData::Missing(pk)) => Some(*pk), 352 None => None, 353 } 354 }; 355 356 let Some(pubkey) = maybe_pubkey else { 357 return Ok(()); 358 }; 359 360 let has_cached_notes = { 361 let txn = Transaction::new(ndb)?; 362 let notes_filter = Filter::new().authors([&pubkey]).kinds([1]).limit(1).build(); 363 ndb.query(&txn, &[notes_filter], 1) 364 .map(|results| !results.is_empty()) 365 .unwrap_or(false) 366 }; 367 368 let pool = relay_pool.clone(); 369 let ndb = ndb.clone(); 370 371 if has_cached_notes { 372 try_spawn_debounced( 373 profile_refresh_state, 374 pubkey, 375 PROFILE_REFRESH_INTERVAL, 376 |state_map, key| { 377 tokio::spawn(async move { 378 match render::fetch_profile_feed(pool, ndb, key).await { 379 Ok(()) => { 380 state_map.insert(key, RefreshState::Completed(Instant::now())); 381 } 382 Err(err) => { 383 error!("Background profile feed refresh failed: {err}"); 384 state_map.remove(&key); 385 } 386 } 387 }) 388 }, 389 ); 390 } else { 391 // No cached data: must wait for relay fetch before rendering. 392 // Use inflight dedup so concurrent requests for the same profile 393 // don't each fire their own relay queries. 394 run_inflight_deduplicated(inflight, pubkey, || async move { 395 if let Err(err) = render::fetch_profile_feed(pool, ndb, pubkey).await { 396 error!("Error fetching profile feed: {err}"); 397 } 398 }) 399 .await; 400 } 401 402 Ok(()) 403 } 404 405 /// Background task: fetch all secondary data for a note (unknowns, stats, reply profiles). 406 async fn fetch_note_secondary_data( 407 relay_pool: &Arc<RelayPool>, 408 ndb: &Ndb, 409 note_rd: &render::NoteRenderData, 410 source_relays: &[nostr::RelayUrl], 411 ) -> crate::error::Result<()> { 412 // Fetch unknowns (author, mentions, quotes, reply chain) 413 if let Some(unknowns) = render::collect_note_unknowns(ndb, note_rd) { 414 tracing::debug!("fetching {} unknowns", unknowns.ids_len()); 415 render::fetch_unknowns(relay_pool, ndb, unknowns).await?; 416 } 417 418 // Fetch note stats (reactions, replies, reposts) 419 render::fetch_note_stats(relay_pool, ndb, note_rd, source_relays).await?; 420 421 // Fetch profiles for reply authors (now that replies are ingested) 422 if let Some(reply_unknowns) = render::collect_reply_unknowns(ndb, note_rd) { 423 tracing::debug!( 424 "fetching {} reply author profiles", 425 reply_unknowns.ids_len() 426 ); 427 if let Err(err) = render::fetch_unknowns(relay_pool, ndb, reply_unknowns).await { 428 tracing::warn!("failed to fetch reply author profiles: {err}"); 429 } 430 } 431 432 Ok(()) 433 } 434 435 async fn serve( 436 app: &Notecrumbs, 437 r: Request<hyper::body::Incoming>, 438 ) -> Result<Response<Full<Bytes>>, Error> { 439 if r.uri().path() == "/metrics" { 440 let body = app.prometheus_handle.render(); 441 return Ok(Response::builder() 442 .status(StatusCode::OK) 443 .header(header::CONTENT_TYPE, "text/plain; version=0.0.4") 444 .body(Full::new(Bytes::from(body)))?); 445 } 446 447 match r.uri().path() { 448 "/damus.css" => { 449 return Ok(Response::builder() 450 .status(StatusCode::OK) 451 .header(header::CONTENT_TYPE, "text/css; charset=utf-8") 452 .body(Full::new(Bytes::from_static(FRONTEND_CSS.as_bytes())))?); 453 } 454 "/fonts/PoetsenOne-Regular.ttf" => { 455 return Ok(Response::builder() 456 .status(StatusCode::OK) 457 .header(header::CONTENT_TYPE, "font/ttf") 458 .header(header::CACHE_CONTROL, "public, max-age=604800, immutable") 459 .body(Full::new(Bytes::from_static(POETSEN_FONT)))?); 460 } 461 "/assets/default_pfp.jpg" => { 462 return Ok(Response::builder() 463 .status(StatusCode::OK) 464 .header(header::CONTENT_TYPE, "image/jpeg") 465 .header(header::CACHE_CONTROL, "public, max-age=604800") 466 .body(Full::new(Bytes::from_static(DEFAULT_PFP_IMAGE)))?); 467 } 468 "/assets/logo_icon.png" => { 469 return Ok(Response::builder() 470 .status(StatusCode::OK) 471 .header(header::CONTENT_TYPE, "image/png") 472 .header(header::CACHE_CONTROL, "public, max-age=604800, immutable") 473 .body(Full::new(Bytes::from_static(DAMUS_LOGO_ICON)))?); 474 } 475 "/" => { 476 return html::serve_homepage(r); 477 } 478 "/robots.txt" => { 479 let body = sitemap::generate_robots_txt(); 480 return Ok(Response::builder() 481 .status(StatusCode::OK) 482 .header(header::CONTENT_TYPE, "text/plain; charset=utf-8") 483 .header(header::CACHE_CONTROL, "public, max-age=86400") 484 .body(Full::new(Bytes::from(body)))?); 485 } 486 "/sitemap.xml" => match sitemap::generate_sitemap(&app.ndb) { 487 Ok(xml) => { 488 return Ok(Response::builder() 489 .status(StatusCode::OK) 490 .header(header::CONTENT_TYPE, "application/xml; charset=utf-8") 491 .header(header::CACHE_CONTROL, "public, max-age=3600") 492 .body(Full::new(Bytes::from(xml)))?); 493 } 494 Err(err) => { 495 error!("Failed to generate sitemap: {err}"); 496 return Ok(Response::builder() 497 .status(StatusCode::INTERNAL_SERVER_ERROR) 498 .body(Full::new(Bytes::from("Failed to generate sitemap\n")))?); 499 } 500 }, 501 _ => {} 502 } 503 504 let is_png = r.uri().path().ends_with(".png"); 505 let is_json = r.uri().path().ends_with(".json"); 506 let until = if is_png { 507 4 508 } else if is_json { 509 5 510 } else { 511 0 512 }; 513 514 let path_len = r.uri().path().len(); 515 let nip19 = match Nip19::from_bech32(&r.uri().path()[1..path_len - until]) { 516 Ok(nip19) => nip19, 517 Err(_) => { 518 return Ok(Response::builder() 519 .status(StatusCode::NOT_FOUND) 520 .body(Full::new(Bytes::from("Invalid url\n")))?); 521 } 522 }; 523 524 // render_data is always returned, it just might be empty 525 let mut render_data = { 526 let txn = Transaction::new(&app.ndb)?; 527 match render::get_render_data(&app.ndb, &txn, &nip19) { 528 Err(_err) => { 529 return Ok(Response::builder() 530 .status(StatusCode::BAD_REQUEST) 531 .body(Full::new(Bytes::from( 532 "nsecs are not supported, what were you thinking!?\n", 533 )))?); 534 } 535 Ok(render_data) => render_data, 536 } 537 }; 538 539 // Fetch missing note/profile data from relays (deduplicated across concurrent requests) 540 if !render_data.is_complete() { 541 fetch_if_missing( 542 &app.ndb, 543 &app.relay_pool, 544 &app.inflight, 545 &mut render_data, 546 &nip19, 547 ) 548 .await; 549 } 550 551 // Spawn debounced background fetch for secondary note data (unknowns, stats, replies) 552 if let RenderData::Note(note_rd) = &render_data { 553 spawn_note_secondary_fetch( 554 &app.ndb, 555 &app.relay_pool, 556 &app.note_refresh_state, 557 &nip19, 558 note_rd, 559 ); 560 } 561 562 // Ensure profile feed data is available (debounced background refresh or blocking fetch) 563 if let RenderData::Profile(profile_opt) = &render_data { 564 ensure_profile_feed( 565 &app.ndb, 566 &app.relay_pool, 567 &app.inflight, 568 &app.profile_refresh_state, 569 profile_opt, 570 ) 571 .await?; 572 } 573 574 if is_png { 575 let data = render::render_note(app, &render_data); 576 577 Ok(Response::builder() 578 .header(header::CONTENT_TYPE, "image/png") 579 .status(StatusCode::OK) 580 .body(Full::new(Bytes::from(data)))?) 581 } else if is_json { 582 match render_data { 583 RenderData::Note(note_rd) => html::serve_note_json(&app.ndb, ¬e_rd), 584 RenderData::Profile(_profile_rd) => Ok(Response::builder() 585 .status(StatusCode::NOT_FOUND) 586 .body(Full::new(Bytes::from("todo: profile json")))?), 587 } 588 } else { 589 match render_data { 590 RenderData::Note(note_rd) => html::serve_note_html(app, &nip19, ¬e_rd, r), 591 RenderData::Profile(profile_rd) => { 592 html::serve_profile_html(app, &nip19, profile_rd.as_ref(), r) 593 } 594 } 595 } 596 } 597 598 fn get_env_timeout() -> Duration { 599 let timeout_env = std::env::var("TIMEOUT_MS").unwrap_or("2000".to_string()); 600 let timeout_ms: u64 = timeout_env.parse().unwrap_or(2000); 601 Duration::from_millis(timeout_ms) 602 } 603 604 fn get_gradient() -> egui::ColorImage { 605 use egui::{Color32, ColorImage}; 606 //use egui::pos2; 607 use gradient::Gradient; 608 609 //let gradient = Gradient::linear(Color32::LIGHT_GRAY, Color32::DARK_GRAY); 610 //let size = pfp::PFP_SIZE as usize; 611 //let radius = (pfp::PFP_SIZE as f32) / 2.0; 612 //let center = pos2(radius, radius); 613 614 let scol = [0x1C, 0x55, 0xFF]; 615 //let ecol = [0xFA, 0x0D, 0xD4]; 616 let mcol = [0x7F, 0x35, 0xAB]; 617 //let ecol = [0xFF, 0x0B, 0xD6]; 618 let ecol = [0xC0, 0x2A, 0xBE]; 619 620 // TODO: skia has r/b colors swapped for some reason, fix this 621 let start_color = Color32::from_rgb(scol[2], scol[1], scol[0]); 622 let mid_color = Color32::from_rgb(mcol[2], mcol[1], mcol[0]); 623 let end_color = Color32::from_rgb(ecol[2], ecol[1], ecol[0]); 624 625 let gradient = Gradient::linear_many(vec![start_color, mid_color, end_color]); 626 let pixels = gradient.to_pixel_row(); 627 let width = pixels.len(); 628 let height = 1; 629 630 ColorImage { 631 size: [width, height], 632 pixels, 633 } 634 } 635 636 fn get_default_pfp() -> egui::ColorImage { 637 let mut dyn_image = 638 ::image::load_from_memory(DEFAULT_PFP_IMAGE).expect("failed to load embedded default pfp"); 639 pfp::process_pfp_bitmap(&mut dyn_image) 640 } 641 642 #[tokio::main] 643 async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { 644 use tracing_subscriber; 645 646 tracing_subscriber::fmt::init(); 647 648 let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); 649 650 // We create a TcpListener and bind it to 127.0.0.1:3000 651 let listener = TcpListener::bind(addr).await?; 652 info!("Listening on 0.0.0.0:3000"); 653 654 let cfg = Config::new(); 655 let ndb = Ndb::new(".", &cfg).expect("ndb failed to open"); 656 let keys = Keys::generate(); 657 let timeout = get_env_timeout(); 658 let prometheus_handle = metrics_exporter_prometheus::PrometheusBuilder::new() 659 .install_recorder() 660 .expect("install prometheus recorder"); 661 let relay_pool = Arc::new( 662 RelayPool::new( 663 keys.clone(), 664 &["wss://relay.damus.io", "wss://nostr.wine", "wss://nos.lol"], 665 ) 666 .await?, 667 ); 668 spawn_relay_pool_metrics_logger(relay_pool.clone()); 669 let default_pfp = egui::ImageData::Color(Arc::new(get_default_pfp())); 670 let background = egui::ImageData::Color(Arc::new(get_gradient())); 671 let font_data = egui::FontData::from_static(include_bytes!("../fonts/NotoSans-Regular.ttf")); 672 673 let app = Notecrumbs { 674 ndb, 675 _keys: keys, 676 relay_pool, 677 _timeout: timeout, 678 background, 679 font_data, 680 default_pfp, 681 prometheus_handle, 682 profile_refresh_state: Arc::new(DashMap::new()), 683 note_refresh_state: Arc::new(DashMap::new()), 684 inflight: Arc::new(DashMap::new()), 685 }; 686 687 // We start a loop to continuously accept incoming connections 688 loop { 689 let (stream, _) = listener.accept().await?; 690 691 // Use an adapter to access something implementing `tokio::io` traits as if they implement 692 // `hyper::rt` IO traits. 693 let io = TokioIo::new(stream); 694 695 let app_copy = app.clone(); 696 697 // Spawn a tokio task to serve multiple connections concurrently 698 tokio::task::spawn(async move { 699 // Finally, we bind the incoming connection to our `hello` service 700 if let Err(err) = http1::Builder::new() 701 // `service_fn` converts our function in a `Service` 702 .serve_connection(io, service_fn(|req| serve(&app_copy, req))) 703 .await 704 { 705 println!("Error serving connection: {:?}", err); 706 } 707 }); 708 } 709 } 710 711 fn spawn_relay_pool_metrics_logger(pool: Arc<RelayPool>) { 712 tokio::spawn(async move { 713 let mut ticker = tokio::time::interval(std::time::Duration::from_secs(60)); 714 loop { 715 ticker.tick().await; 716 let (stats, tracked) = pool.relay_stats().await; 717 metrics::gauge!("relay_pool_known_relays", tracked as f64); 718 info!( 719 total_relays = tracked, 720 ensure_calls = stats.ensure_calls, 721 relays_added = stats.relays_added, 722 connect_successes = stats.connect_successes, 723 connect_failures = stats.connect_failures, 724 "relay pool metrics snapshot" 725 ); 726 } 727 }); 728 } 729 730 #[cfg(test)] 731 mod tests { 732 use super::*; 733 use nostr::nips::nip19::{Nip19Coordinate, Nip19Profile}; 734 use std::sync::atomic::{AtomicUsize, Ordering}; 735 736 /// Helper: create a fresh DashMap wrapped in Arc for testing 737 fn new_state_map() -> Arc<DashMap<[u8; 32], RefreshState>> { 738 Arc::new(DashMap::new()) 739 } 740 741 /// Helper: a test key (arbitrary 32 bytes) 742 fn test_key(byte: u8) -> [u8; 32] { 743 [byte; 32] 744 } 745 746 /// Helper: spawn a no-op task that completes immediately, tracking call count 747 fn counting_task( 748 counter: Arc<AtomicUsize>, 749 ) -> impl FnOnce(Arc<DashMap<[u8; 32], RefreshState>>, [u8; 32]) -> tokio::task::JoinHandle<()> 750 { 751 move |state_map, key| { 752 counter.fetch_add(1, Ordering::SeqCst); 753 tokio::spawn(async move { 754 state_map.insert(key, RefreshState::Completed(Instant::now())); 755 }) 756 } 757 } 758 759 // --------------------------------------------------------------- 760 // nip19_debounce_key tests 761 // --------------------------------------------------------------- 762 763 #[test] 764 fn debounce_key_event_uses_event_id() { 765 let event_id = EventId::all_zeros(); 766 let nip19 = Nip19::EventId(event_id); 767 assert_eq!(nip19_debounce_key(&nip19), *event_id.as_bytes()); 768 } 769 770 #[test] 771 fn debounce_key_pubkey_uses_pubkey_bytes() { 772 let keys = Keys::generate(); 773 let pk = keys.public_key(); 774 let nip19 = Nip19::Pubkey(pk); 775 assert_eq!(nip19_debounce_key(&nip19), pk.to_bytes()); 776 } 777 778 #[test] 779 fn debounce_key_profile_uses_pubkey_bytes() { 780 let keys = Keys::generate(); 781 let pk = keys.public_key(); 782 let nip19 = Nip19::Profile(Nip19Profile::new(pk, [])); 783 assert_eq!(nip19_debounce_key(&nip19), pk.to_bytes()); 784 } 785 786 #[test] 787 fn debounce_key_coordinate_is_deterministic() { 788 use nostr::nips::nip01::Coordinate; 789 let keys = Keys::generate(); 790 let coord = Coordinate::new(Kind::LongFormTextNote, keys.public_key()) 791 .identifier("test-article"); 792 let nip19 = Nip19::Coordinate(Nip19Coordinate::new(coord, [])); 793 let key1 = nip19_debounce_key(&nip19); 794 let key2 = nip19_debounce_key(&nip19); 795 assert_eq!(key1, key2); 796 } 797 798 #[test] 799 fn debounce_key_different_coordinates_differ() { 800 use nostr::nips::nip01::Coordinate; 801 let keys = Keys::generate(); 802 let coord_a = Coordinate::new(Kind::LongFormTextNote, keys.public_key()) 803 .identifier("article-a"); 804 let coord_b = Coordinate::new(Kind::LongFormTextNote, keys.public_key()) 805 .identifier("article-b"); 806 let nip19_a = Nip19::Coordinate(Nip19Coordinate::new(coord_a, [])); 807 let nip19_b = Nip19::Coordinate(Nip19Coordinate::new(coord_b, [])); 808 assert_ne!(nip19_debounce_key(&nip19_a), nip19_debounce_key(&nip19_b)); 809 } 810 811 // --------------------------------------------------------------- 812 // try_spawn_debounced tests 813 // --------------------------------------------------------------- 814 815 #[tokio::test] 816 async fn debounce_spawns_on_first_call() { 817 let state = new_state_map(); 818 let counter = Arc::new(AtomicUsize::new(0)); 819 let key = test_key(1); 820 821 let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone())); 822 823 assert!(spawned); 824 assert_eq!(counter.load(Ordering::SeqCst), 1); 825 // State should show InProgress (task may have completed already, but the 826 // entry was set before the task ran) 827 assert!(state.contains_key(&key)); 828 } 829 830 #[tokio::test] 831 async fn debounce_skips_while_in_progress() { 832 let state = new_state_map(); 833 let key = test_key(2); 834 835 // Insert a fake InProgress entry 836 state.insert( 837 key, 838 RefreshState::InProgress { 839 started: Instant::now(), 840 handle: tokio::spawn(async {}).abort_handle(), 841 }, 842 ); 843 844 let counter = Arc::new(AtomicUsize::new(0)); 845 let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone())); 846 847 assert!(!spawned); 848 assert_eq!(counter.load(Ordering::SeqCst), 0); 849 } 850 851 #[tokio::test] 852 async fn debounce_skips_recently_completed() { 853 let state = new_state_map(); 854 let key = test_key(3); 855 856 // Insert a Completed entry from just now 857 state.insert(key, RefreshState::Completed(Instant::now())); 858 859 let counter = Arc::new(AtomicUsize::new(0)); 860 let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone())); 861 862 assert!(!spawned); 863 assert_eq!(counter.load(Ordering::SeqCst), 0); 864 } 865 866 #[tokio::test] 867 async fn debounce_refreshes_after_interval_expires() { 868 let state = new_state_map(); 869 let key = test_key(4); 870 871 // Insert a Completed entry from "long ago" (past the interval) 872 let old_time = Instant::now() - Duration::from_secs(600); 873 state.insert(key, RefreshState::Completed(old_time)); 874 875 let counter = Arc::new(AtomicUsize::new(0)); 876 let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone())); 877 878 assert!(spawned); 879 assert_eq!(counter.load(Ordering::SeqCst), 1); 880 } 881 882 #[tokio::test] 883 async fn debounce_aborts_stuck_task_and_retries() { 884 let state = new_state_map(); 885 let key = test_key(5); 886 887 // Insert InProgress from >10 minutes ago (stuck) 888 let stuck_time = Instant::now() - Duration::from_secs(11 * 60); 889 let stuck_handle = tokio::spawn(async { std::future::pending::<()>().await }); 890 let abort_handle = stuck_handle.abort_handle(); 891 state.insert( 892 key, 893 RefreshState::InProgress { 894 started: stuck_time, 895 handle: abort_handle.clone(), 896 }, 897 ); 898 899 let counter = Arc::new(AtomicUsize::new(0)); 900 let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone())); 901 902 assert!(spawned, "should retry after stuck task"); 903 assert_eq!(counter.load(Ordering::SeqCst), 1); 904 // The old task should have been aborted — yield to let the runtime process it 905 tokio::task::yield_now().await; 906 assert!(stuck_handle.is_finished()); 907 } 908 909 #[tokio::test] 910 async fn debounce_does_not_abort_recent_in_progress() { 911 let state = new_state_map(); 912 let key = test_key(6); 913 914 // Insert InProgress from just now (not stuck) 915 let handle = tokio::spawn(async { std::future::pending::<()>().await }); 916 state.insert( 917 key, 918 RefreshState::InProgress { 919 started: Instant::now(), 920 handle: handle.abort_handle(), 921 }, 922 ); 923 924 let counter = Arc::new(AtomicUsize::new(0)); 925 let spawned = try_spawn_debounced(&state, key, Duration::from_secs(300), counting_task(counter.clone())); 926 927 assert!(!spawned); 928 assert_eq!(counter.load(Ordering::SeqCst), 0); 929 // The original task should NOT have been aborted 930 assert!(!handle.is_finished()); 931 handle.abort(); // cleanup 932 } 933 934 #[tokio::test] 935 async fn debounce_prunes_stale_entries_over_threshold() { 936 let state = new_state_map(); 937 let old_time = Instant::now() - Duration::from_secs(600); 938 let interval = Duration::from_secs(300); 939 940 // Fill the map past the threshold with stale Completed entries 941 for i in 0..(REFRESH_MAP_PRUNE_THRESHOLD + 50) { 942 let mut key = [0u8; 32]; 943 key[0] = (i & 0xFF) as u8; 944 key[1] = ((i >> 8) & 0xFF) as u8; 945 state.insert(key, RefreshState::Completed(old_time)); 946 } 947 948 assert!(state.len() > REFRESH_MAP_PRUNE_THRESHOLD); 949 950 // The next call should trigger pruning 951 let key = test_key(0xFF); 952 let counter = Arc::new(AtomicUsize::new(0)); 953 try_spawn_debounced(&state, key, interval, counting_task(counter.clone())); 954 955 // Stale entries should have been pruned (only the new one + any InProgress remain) 956 assert!( 957 state.len() < REFRESH_MAP_PRUNE_THRESHOLD, 958 "state map should have been pruned, but has {} entries", 959 state.len() 960 ); 961 } 962 963 #[tokio::test] 964 async fn debounce_independent_keys_both_spawn() { 965 let state = new_state_map(); 966 let key_a = test_key(0xAA); 967 let key_b = test_key(0xBB); 968 969 let counter = Arc::new(AtomicUsize::new(0)); 970 let spawned_a = try_spawn_debounced(&state, key_a, Duration::from_secs(300), counting_task(counter.clone())); 971 let spawned_b = try_spawn_debounced(&state, key_b, Duration::from_secs(300), counting_task(counter.clone())); 972 973 assert!(spawned_a); 974 assert!(spawned_b); 975 assert_eq!(counter.load(Ordering::SeqCst), 2); 976 } 977 978 // --------------------------------------------------------------- 979 // run_inflight_deduplicated tests 980 // --------------------------------------------------------------- 981 982 #[tokio::test] 983 async fn inflight_first_caller_runs_work_and_returns_true() { 984 let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new()); 985 let key = test_key(0xCC); 986 let work_count = Arc::new(AtomicUsize::new(0)); 987 988 let wc = work_count.clone(); 989 let was_fetcher = run_inflight_deduplicated(&inflight, key, || async move { 990 wc.fetch_add(1, Ordering::SeqCst); 991 }) 992 .await; 993 994 assert!(was_fetcher); 995 assert_eq!(work_count.load(Ordering::SeqCst), 1); 996 } 997 998 #[tokio::test] 999 async fn inflight_concurrent_callers_only_run_work_once() { 1000 let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new()); 1001 let key = test_key(0xDD); 1002 let work_count = Arc::new(AtomicUsize::new(0)); 1003 1004 // Use a channel to control when the fetcher's work completes, 1005 // so we can launch waiters while it's in progress 1006 let (tx, rx) = tokio::sync::oneshot::channel::<()>(); 1007 1008 // Spawn the fetcher — it will block until we send on tx 1009 let inflight_c = inflight.clone(); 1010 let wc = work_count.clone(); 1011 let fetcher = tokio::spawn(async move { 1012 run_inflight_deduplicated(&inflight_c, key, || async move { 1013 wc.fetch_add(1, Ordering::SeqCst); 1014 rx.await.ok(); 1015 }) 1016 .await 1017 }); 1018 1019 // Yield to let fetcher start and insert its Notify 1020 tokio::task::yield_now().await; 1021 1022 // Spawn 10 concurrent waiters that call the same function 1023 let mut waiters = Vec::new(); 1024 for _ in 0..10 { 1025 let inflight_c = inflight.clone(); 1026 let wc = work_count.clone(); 1027 waiters.push(tokio::spawn(async move { 1028 run_inflight_deduplicated(&inflight_c, key, || async move { 1029 wc.fetch_add(1, Ordering::SeqCst); 1030 }) 1031 .await 1032 })); 1033 } 1034 1035 // Yield to let waiters register 1036 tokio::task::yield_now().await; 1037 1038 // Let the fetcher complete 1039 tx.send(()).unwrap(); 1040 1041 let fetcher_result = fetcher.await.unwrap(); 1042 assert!(fetcher_result, "first caller should be the fetcher"); 1043 1044 for w in waiters { 1045 let was_fetcher = w.await.unwrap(); 1046 assert!(!was_fetcher, "waiters should not have run work"); 1047 } 1048 1049 // Work closure should have executed exactly once 1050 assert_eq!(work_count.load(Ordering::SeqCst), 1); 1051 } 1052 1053 #[tokio::test] 1054 async fn inflight_cleans_up_after_completion() { 1055 let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new()); 1056 let key = test_key(0xEE); 1057 1058 run_inflight_deduplicated(&inflight, key, || async {}).await; 1059 1060 // The inflight entry should have been removed 1061 assert!( 1062 !inflight.contains_key(&key), 1063 "inflight entry should be cleaned up after work completes" 1064 ); 1065 } 1066 1067 #[tokio::test] 1068 async fn inflight_second_call_after_completion_runs_work_again() { 1069 let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new()); 1070 let key = test_key(0xFF); 1071 let work_count = Arc::new(AtomicUsize::new(0)); 1072 1073 // First call 1074 let wc = work_count.clone(); 1075 run_inflight_deduplicated(&inflight, key, || async move { 1076 wc.fetch_add(1, Ordering::SeqCst); 1077 }) 1078 .await; 1079 1080 // Second call — should run work again since inflight was cleaned up 1081 let wc = work_count.clone(); 1082 let was_fetcher = run_inflight_deduplicated(&inflight, key, || async move { 1083 wc.fetch_add(1, Ordering::SeqCst); 1084 }) 1085 .await; 1086 1087 assert!(was_fetcher, "second call should be a fetcher, not a waiter"); 1088 assert_eq!(work_count.load(Ordering::SeqCst), 2); 1089 } 1090 1091 #[tokio::test] 1092 async fn inflight_independent_keys_both_run_work() { 1093 let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new()); 1094 let key_a = test_key(0xAA); 1095 let key_b = test_key(0xBB); 1096 let work_count = Arc::new(AtomicUsize::new(0)); 1097 1098 let wc = work_count.clone(); 1099 let a = run_inflight_deduplicated(&inflight, key_a, || async move { 1100 wc.fetch_add(1, Ordering::SeqCst); 1101 }) 1102 .await; 1103 1104 let wc = work_count.clone(); 1105 let b = run_inflight_deduplicated(&inflight, key_b, || async move { 1106 wc.fetch_add(1, Ordering::SeqCst); 1107 }) 1108 .await; 1109 1110 assert!(a); 1111 assert!(b); 1112 assert_eq!(work_count.load(Ordering::SeqCst), 2); 1113 } 1114 }