notecrumbs

a nostr opengraph server build on nostrdb and egui
git clone git://jb55.com/notecrumbs
Log | Files | Refs | README | LICENSE

commit f2627aff54db0a2438c834a0c126500a15d21f44
parent 38670b3972b6690ec7705f7694927727af674621
Author: William Casarin <jb55@jb55.com>
Date:   Wed, 18 Feb 2026 11:56:06 -0800

fix: debounce relay requests to prevent hammering on repeated note fetches

Every HTTP request was firing relay queries for note stats, unknowns,
and reply profiles with no deduplication. Someone spamming the same
note URL would cause notecrumbs to spam relays proportionally.

Two layers of protection added:

1. Inflight deduplication for complete(): concurrent requests for the
   same note ID now wait on a shared Notify instead of each firing
   their own relay query.

2. Background fetch debouncing: secondary data fetches (unknowns,
   stats, reply profiles) are gated per note ID with a 5-minute
   cooldown, matching the existing profile refresh pattern.

Also refactors the duplicated refresh state logic into a reusable
try_spawn_debounced() helper and extracts serve() inline blocks into
Notecrumbs methods (fetch_note_if_missing, spawn_note_secondary_fetch,
ensure_profile_feed) and a standalone fetch_note_secondary_data() fn.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>

Diffstat:
Msrc/main.rs | 460+++++++++++++++++++++++++++++++++++++++++++++++++------------------------------
1 file changed, 284 insertions(+), 176 deletions(-)

diff --git a/src/main.rs b/src/main.rs @@ -46,11 +46,14 @@ const DAMUS_LOGO_ICON: &[u8] = include_bytes!("../assets/logo_icon.png"); /// Minimum interval between background profile feed refreshes for the same pubkey const PROFILE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60); -/// Prune refresh tracking map when it exceeds this size (deliberate limit, ~40KB max memory) -const PROFILE_REFRESH_MAP_PRUNE_THRESHOLD: usize = 1000; +/// Minimum interval between background note secondary fetches (unknowns, stats, replies) +const NOTE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60); -/// Tracks the state of a background profile refresh -enum ProfileRefreshState { +/// Prune refresh tracking maps when they exceed this size (~40KB max memory each) +const REFRESH_MAP_PRUNE_THRESHOLD: usize = 1000; + +/// Tracks the state of a background refresh (used for both profiles and notes) +enum RefreshState { /// Refresh currently in progress with handle to abort if stuck InProgress { started: Instant, @@ -74,7 +77,14 @@ pub struct Notecrumbs { _timeout: Duration, /// Tracks refresh state per pubkey - prevents excessive relay queries and concurrent fetches - profile_refresh_state: Arc<DashMap<[u8; 32], ProfileRefreshState>>, + profile_refresh_state: Arc<DashMap<[u8; 32], RefreshState>>, + + /// Tracks refresh state per note id - debounces background fetches (unknowns, stats, replies) + note_refresh_state: Arc<DashMap<[u8; 32], RefreshState>>, + + /// Inflight note fetches - deduplicates concurrent complete() calls for the same note. + /// Waiters subscribe to the Notify; the fetcher notifies on completion. + note_inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>>, } #[inline] @@ -98,6 +108,267 @@ fn is_utf8_char_boundary(c: u8) -> bool { (c as i8) >= -0x40 } +/// Extract a note event ID from a nip19 reference, if it refers to a note. +fn nip19_note_id(nip19: &Nip19) -> Option<[u8; 32]> { + match nip19 { + Nip19::Event(ev) => Some(*ev.event_id.as_bytes()), + Nip19::EventId(id) => Some(*id.as_bytes()), + // Addresses (naddr) don't have a stable event ID + _ => None, + } +} + +/// Try to spawn a debounced background task. Returns true if the task was spawned. +/// +/// Uses the refresh state map to prevent concurrent and rapid-fire fetches for the +/// same key. Tasks that are stuck (>10 min) are aborted and retried. +fn try_spawn_debounced<F>( + state_map: &Arc<DashMap<[u8; 32], RefreshState>>, + key: [u8; 32], + interval: Duration, + task: F, +) -> bool +where + F: FnOnce(Arc<DashMap<[u8; 32], RefreshState>>, [u8; 32]) -> tokio::task::JoinHandle<()>, +{ + use dashmap::mapref::entry::Entry; + + let now = Instant::now(); + + // Prune stale entries to bound memory + if state_map.len() > REFRESH_MAP_PRUNE_THRESHOLD { + state_map.retain(|_, state| match state { + RefreshState::InProgress { .. } => true, + RefreshState::Completed(t) => now.duration_since(*t) < interval, + }); + } + + match state_map.entry(key) { + Entry::Occupied(mut occupied) => { + let should_refresh = match occupied.get() { + // Already refreshing - skip unless stuck (>10 min) + RefreshState::InProgress { started, .. } + if now.duration_since(*started) < Duration::from_secs(10 * 60) => + { + false + } + // Recently completed - skip + RefreshState::Completed(t) if now.duration_since(*t) < interval => false, + // Stuck fetch - abort and restart + RefreshState::InProgress { handle, .. } => { + handle.abort(); + true + } + // Stale completion - refresh + RefreshState::Completed(_) => true, + }; + + if should_refresh { + let handle = task(state_map.clone(), key); + occupied.insert(RefreshState::InProgress { + started: now, + handle: handle.abort_handle(), + }); + true + } else { + false + } + } + Entry::Vacant(vacant) => { + let handle = task(state_map.clone(), key); + vacant.insert(RefreshState::InProgress { + started: now, + handle: handle.abort_handle(), + }); + true + } + } +} + +impl Notecrumbs { + /// Fetch missing render data from relays, deduplicating concurrent requests + /// for the same note so only one relay query fires at a time. + async fn fetch_note_if_missing(&self, render_data: &mut RenderData, nip19: &Nip19) { + let note_id = nip19_note_id(nip19); + + // Check if there's already an inflight fetch for this note + let existing_notify = + note_id.and_then(|id| self.note_inflight.get(&id).map(|r| r.value().clone())); + + if let Some(notify) = existing_notify { + // Another request is already fetching — wait for it, then re-check ndb + notify.notified().await; + let txn = match Transaction::new(&self.ndb) { + Ok(txn) => txn, + Err(err) => { + error!("failed to open transaction after inflight wait: {err}"); + return; + } + }; + if let Ok(new_rd) = render::get_render_data(&self.ndb, &txn, nip19) { + *render_data = new_rd; + } + } else { + // We're the first — register inflight and do the fetch + let notify = note_id.map(|id| { + let n = Arc::new(tokio::sync::Notify::new()); + self.note_inflight.insert(id, n.clone()); + (id, n) + }); + + if let Err(err) = render_data + .complete(self.ndb.clone(), self.relay_pool.clone(), nip19.clone()) + .await + { + error!("Error fetching completion data: {err}"); + } + + // Signal waiters and remove inflight entry + if let Some((id, n)) = notify { + self.note_inflight.remove(&id); + n.notify_waiters(); + } + } + } + + /// Spawn a debounced background task to fetch secondary note data + /// (unknowns, stats, reply profiles). Skips if a fetch already ran + /// recently for this note ID. + fn spawn_note_secondary_fetch( + &self, + nip19: &Nip19, + note_rd: &render::NoteAndProfileRenderData, + ) { + let Some(note_id) = nip19_note_id(nip19) else { + return; + }; + + let ndb = self.ndb.clone(); + let relay_pool = self.relay_pool.clone(); + let note_rd_bg = note_rd.note_rd.clone(); + let source_relays = note_rd.source_relays.clone(); + + try_spawn_debounced( + &self.note_refresh_state, + note_id, + NOTE_REFRESH_INTERVAL, + |state_map, key| { + tokio::spawn(async move { + if let Err(err) = + fetch_note_secondary_data(&relay_pool, &ndb, &note_rd_bg, &source_relays) + .await + { + tracing::warn!("background note secondary fetch failed: {err}"); + state_map.remove(&key); + return; + } + state_map.insert(key, RefreshState::Completed(Instant::now())); + }) + }, + ); + } + + /// Ensure profile feed data is available, fetching from relays if needed. + /// Uses debounced background refresh when cached data exists. + async fn ensure_profile_feed( + &self, + profile_opt: &Option<ProfileRenderData>, + ) -> Result<(), Error> { + let maybe_pubkey = { + let txn = Transaction::new(&self.ndb)?; + match profile_opt { + Some(ProfileRenderData::Profile(profile_key)) => { + if let Ok(profile_rec) = self.ndb.get_profile_by_key(&txn, *profile_key) { + let note_key = NoteKey::new(profile_rec.record().note_key()); + self.ndb + .get_note_by_key(&txn, note_key) + .ok() + .map(|note| *note.pubkey()) + } else { + None + } + } + Some(ProfileRenderData::Missing(pk)) => Some(*pk), + None => None, + } + }; + + let Some(pubkey) = maybe_pubkey else { + return Ok(()); + }; + + let has_cached_notes = { + let txn = Transaction::new(&self.ndb)?; + let notes_filter = Filter::new().authors([&pubkey]).kinds([1]).limit(1).build(); + self.ndb + .query(&txn, &[notes_filter], 1) + .map(|results| !results.is_empty()) + .unwrap_or(false) + }; + + let pool = self.relay_pool.clone(); + let ndb = self.ndb.clone(); + + if has_cached_notes { + try_spawn_debounced( + &self.profile_refresh_state, + pubkey, + PROFILE_REFRESH_INTERVAL, + |state_map, key| { + tokio::spawn(async move { + match render::fetch_profile_feed(pool, ndb, key).await { + Ok(()) => { + state_map.insert(key, RefreshState::Completed(Instant::now())); + } + Err(err) => { + error!("Background profile feed refresh failed: {err}"); + state_map.remove(&key); + } + } + }) + }, + ); + } else { + // No cached data: must wait for relay fetch before rendering + if let Err(err) = render::fetch_profile_feed(pool, ndb, pubkey).await { + error!("Error fetching profile feed: {err}"); + } + } + + Ok(()) + } +} + +/// Background task: fetch all secondary data for a note (unknowns, stats, reply profiles). +async fn fetch_note_secondary_data( + relay_pool: &Arc<RelayPool>, + ndb: &Ndb, + note_rd: &render::NoteRenderData, + source_relays: &[nostr::RelayUrl], +) -> crate::error::Result<()> { + // Fetch unknowns (author, mentions, quotes, reply chain) + if let Some(unknowns) = render::collect_note_unknowns(ndb, note_rd) { + tracing::debug!("fetching {} unknowns", unknowns.ids_len()); + render::fetch_unknowns(relay_pool, ndb, unknowns).await?; + } + + // Fetch note stats (reactions, replies, reposts) + render::fetch_note_stats(relay_pool, ndb, note_rd, source_relays).await?; + + // Fetch profiles for reply authors (now that replies are ingested) + if let Some(reply_unknowns) = render::collect_reply_unknowns(ndb, note_rd) { + tracing::debug!( + "fetching {} reply author profiles", + reply_unknowns.ids_len() + ); + if let Err(err) = render::fetch_unknowns(relay_pool, ndb, reply_unknowns).await { + tracing::warn!("failed to fetch reply author profiles: {err}"); + } + } + + Ok(()) +} + async fn serve( app: &Notecrumbs, r: Request<hyper::body::Incoming>, @@ -202,184 +473,19 @@ async fn serve( } }; - // fetch extra data if we are missing it + // Fetch missing note/profile data from relays (deduplicated across concurrent requests) if !render_data.is_complete() { - if let Err(err) = render_data - .complete(app.ndb.clone(), app.relay_pool.clone(), nip19.clone()) - .await - { - error!("Error fetching completion data: {err}"); - } + app.fetch_note_if_missing(&mut render_data, &nip19).await; } - // Fetch secondary data (unknowns, stats, reply profiles) in the background. - // Render immediately with whatever is in ndb; next load will have the full data. + // Spawn debounced background fetch for secondary note data (unknowns, stats, replies) if let RenderData::Note(note_rd) = &render_data { - let ndb = app.ndb.clone(); - let relay_pool = app.relay_pool.clone(); - let note_rd_bg = note_rd.note_rd.clone(); - let source_relays = note_rd.source_relays.clone(); - tokio::spawn(async move { - // Fetch unknowns (author, mentions, quotes, reply chain) - if let Some(unknowns) = render::collect_note_unknowns(&ndb, &note_rd_bg) { - tracing::debug!("fetching {} unknowns", unknowns.ids_len()); - if let Err(err) = render::fetch_unknowns(&relay_pool, &ndb, unknowns).await { - tracing::warn!("failed to fetch unknowns: {err}"); - } - } - - // Fetch note stats (reactions, replies, reposts) - if let Err(err) = - render::fetch_note_stats(&relay_pool, &ndb, &note_rd_bg, &source_relays).await - { - tracing::warn!("failed to fetch note stats: {err}"); - } - - // Fetch profiles for reply authors (now that replies are ingested) - if let Some(reply_unknowns) = render::collect_reply_unknowns(&ndb, &note_rd_bg) { - tracing::debug!( - "fetching {} reply author profiles", - reply_unknowns.ids_len() - ); - if let Err(err) = render::fetch_unknowns(&relay_pool, &ndb, reply_unknowns).await { - tracing::warn!("failed to fetch reply author profiles: {err}"); - } - } - }); + app.spawn_note_secondary_fetch(&nip19, note_rd); } + // Ensure profile feed data is available (debounced background refresh or blocking fetch) if let RenderData::Profile(profile_opt) = &render_data { - let maybe_pubkey = { - let txn = Transaction::new(&app.ndb)?; - match profile_opt { - Some(ProfileRenderData::Profile(profile_key)) => { - if let Ok(profile_rec) = app.ndb.get_profile_by_key(&txn, *profile_key) { - let note_key = NoteKey::new(profile_rec.record().note_key()); - if let Ok(profile_note) = app.ndb.get_note_by_key(&txn, note_key) { - Some(*profile_note.pubkey()) - } else { - None - } - } else { - None - } - } - Some(ProfileRenderData::Missing(pk)) => Some(*pk), - None => None, - } - }; - - if let Some(pubkey) = maybe_pubkey { - // Check if we have cached notes for this profile - let has_cached_notes = { - let txn = Transaction::new(&app.ndb)?; - let notes_filter = Filter::new().authors([&pubkey]).kinds([1]).limit(1).build(); - app.ndb - .query(&txn, &[notes_filter], 1) - .map(|results| !results.is_empty()) - .unwrap_or(false) - }; - - let pool = app.relay_pool.clone(); - let ndb = app.ndb.clone(); - - if has_cached_notes { - // Cached data exists: spawn background refresh so we don't block response. - // Rate-limit refreshes per pubkey to avoid hammering relays on hot profiles. - let now = Instant::now(); - let state_map = &app.profile_refresh_state; - - // Prune stale completed entries to bound memory growth - if state_map.len() > PROFILE_REFRESH_MAP_PRUNE_THRESHOLD { - state_map.retain(|_, state| match state { - ProfileRefreshState::InProgress { .. } => true, - ProfileRefreshState::Completed(t) => { - now.duration_since(*t) < PROFILE_REFRESH_INTERVAL - } - }); - } - - // Use entry API for atomic check-and-insert to prevent race conditions - // where concurrent requests could each spawn a refresh - use dashmap::mapref::entry::Entry; - match state_map.entry(pubkey) { - Entry::Occupied(mut occupied) => { - let should_refresh = match occupied.get() { - // Already refreshing - skip unless stuck (>10 min) - ProfileRefreshState::InProgress { started, .. } - if now.duration_since(*started) < Duration::from_secs(10 * 60) => - { - false - } - // Recently completed - skip refresh - ProfileRefreshState::Completed(t) - if now.duration_since(*t) < PROFILE_REFRESH_INTERVAL => - { - false - } - // Stuck fetch - abort and restart - ProfileRefreshState::InProgress { handle, .. } => { - handle.abort(); - true - } - // Stale completion - refresh - ProfileRefreshState::Completed(_) => true, - }; - - if should_refresh { - let refresh_state = app.profile_refresh_state.clone(); - let handle = tokio::spawn(async move { - let result = render::fetch_profile_feed(pool, ndb, pubkey).await; - match result { - Ok(()) => { - refresh_state.insert( - pubkey, - ProfileRefreshState::Completed(Instant::now()), - ); - } - Err(err) => { - error!("Background profile feed refresh failed: {err}"); - refresh_state.remove(&pubkey); - } - } - }); - occupied.insert(ProfileRefreshState::InProgress { - started: now, - handle: handle.abort_handle(), - }); - } - } - Entry::Vacant(vacant) => { - // No existing state - start refresh - let refresh_state = app.profile_refresh_state.clone(); - let handle = tokio::spawn(async move { - let result = render::fetch_profile_feed(pool, ndb, pubkey).await; - match result { - Ok(()) => { - refresh_state.insert( - pubkey, - ProfileRefreshState::Completed(Instant::now()), - ); - } - Err(err) => { - error!("Background profile feed refresh failed: {err}"); - refresh_state.remove(&pubkey); - } - } - }); - vacant.insert(ProfileRefreshState::InProgress { - started: now, - handle: handle.abort_handle(), - }); - } - } - } else { - // No cached data: must wait for relay fetch before rendering - if let Err(err) = render::fetch_profile_feed(pool, ndb, pubkey).await { - error!("Error fetching profile feed: {err}"); - } - } - } + app.ensure_profile_feed(profile_opt).await?; } if is_png { @@ -491,6 +597,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { default_pfp, prometheus_handle, profile_refresh_state: Arc::new(DashMap::new()), + note_refresh_state: Arc::new(DashMap::new()), + note_inflight: Arc::new(DashMap::new()), }; // We start a loop to continuously accept incoming connections