notecrumbs

a nostr opengraph server build on nostrdb and egui
git clone git://jb55.com/notecrumbs
Log | Files | Refs | README | LICENSE

commit b368da75fff0d62c618167e31f6ffd3dba91bb46
parent 839fe5e6e0b00b9db69ef0542a8922b5292aea8b
Author: alltheseas <64376233+alltheseas@users.noreply.github.com>
Date:   Thu, 18 Dec 2025 12:43:32 -0600

perf: add concurrency safety with DashMap and atomic operations

Replace std::sync::Mutex<HashMap> with dashmap::DashMap for lock-free
concurrent access per pubkey, eliminating cross-profile contention.

Key improvements:
- Use entry() API for atomic check-and-insert, preventing race conditions
  where concurrent requests could each spawn a refresh before seeing
  each other's state
- Track InProgress state with AbortHandle to cancel stuck fetches (>10 min)
  before starting new ones, preventing overlapping long-running fetches
- Completed state tracks last success time for rate limiting

The atomic entry pattern requires some nesting but ensures correctness
under concurrent load.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>

Diffstat:
MCargo.lock | 15+++++++++++++++
MCargo.toml | 1+
Msrc/main.rs | 132++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------
3 files changed, 110 insertions(+), 38 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -572,6 +572,20 @@ dependencies = [ ] [[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + +[[package]] name = "data-encoding" version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1909,6 +1923,7 @@ version = "0.1.0" dependencies = [ "ammonia", "bytes", + "dashmap", "egui", "egui_extras", "egui_skia", diff --git a/Cargo.toml b/Cargo.toml @@ -34,3 +34,4 @@ pulldown-cmark = "0.9" serde_json = "*" metrics = "0.21" metrics-exporter-prometheus = "0.13" +dashmap = "6" diff --git a/src/main.rs b/src/main.rs @@ -1,8 +1,9 @@ -use std::collections::HashMap; use std::net::SocketAddr; -use std::sync::Mutex; use std::time::Instant; +use dashmap::DashMap; +use tokio::task::AbortHandle; + use http_body_util::Full; use hyper::body::Bytes; use hyper::header; @@ -46,6 +47,14 @@ const PROFILE_REFRESH_INTERVAL: Duration = Duration::from_secs(5 * 60); /// Prune refresh tracking map when it exceeds this size (deliberate limit, ~40KB max memory) const PROFILE_REFRESH_MAP_PRUNE_THRESHOLD: usize = 1000; +/// Tracks the state of a background profile refresh +enum ProfileRefreshState { + /// Refresh currently in progress with handle to abort if stuck + InProgress { started: Instant, handle: AbortHandle }, + /// Last successful refresh completed at this time + Completed(Instant), +} + #[derive(Clone)] pub struct Notecrumbs { pub ndb: Ndb, @@ -59,8 +68,8 @@ pub struct Notecrumbs { /// How long do we wait for remote note requests _timeout: Duration, - /// Tracks last successful refresh time per pubkey to rate-limit background fetches - profile_last_refresh: Arc<Mutex<HashMap<[u8; 32], Instant>>>, + /// Tracks refresh state per pubkey - prevents excessive relay queries and concurrent fetches + profile_refresh_state: Arc<DashMap<[u8; 32], ProfileRefreshState>>, } #[inline] @@ -217,45 +226,92 @@ async fn serve( if has_cached_notes { // Cached data exists: spawn background refresh so we don't block response. // Rate-limit refreshes per pubkey to avoid hammering relays on hot profiles. - let should_refresh = { - let mut last_refresh = app.profile_last_refresh.lock().unwrap(); - let now = Instant::now(); - - // Prune stale entries to bound memory growth - if last_refresh.len() > PROFILE_REFRESH_MAP_PRUNE_THRESHOLD { - last_refresh - .retain(|_, t| now.duration_since(*t) < PROFILE_REFRESH_INTERVAL); - } + let now = Instant::now(); + let state_map = &app.profile_refresh_state; + + // Prune stale completed entries to bound memory growth + if state_map.len() > PROFILE_REFRESH_MAP_PRUNE_THRESHOLD { + state_map.retain(|_, state| match state { + ProfileRefreshState::InProgress { .. } => true, + ProfileRefreshState::Completed(t) => { + now.duration_since(*t) < PROFILE_REFRESH_INTERVAL + } + }); + } - match last_refresh.get(&pubkey) { - Some(&last) if now.duration_since(last) < PROFILE_REFRESH_INTERVAL => false, - _ => { - last_refresh.insert(pubkey, now); - true + // Use entry API for atomic check-and-insert to prevent race conditions + // where concurrent requests could each spawn a refresh + use dashmap::mapref::entry::Entry; + match state_map.entry(pubkey) { + Entry::Occupied(mut occupied) => { + let should_refresh = match occupied.get() { + // Already refreshing - skip unless stuck (>10 min) + ProfileRefreshState::InProgress { started, .. } + if now.duration_since(*started) < Duration::from_secs(10 * 60) => + { + false + } + // Recently completed - skip refresh + ProfileRefreshState::Completed(t) + if now.duration_since(*t) < PROFILE_REFRESH_INTERVAL => + { + false + } + // Stuck fetch - abort and restart + ProfileRefreshState::InProgress { handle, .. } => { + handle.abort(); + true + } + // Stale completion - refresh + ProfileRefreshState::Completed(_) => true, + }; + + if should_refresh { + let refresh_state = app.profile_refresh_state.clone(); + let handle = tokio::spawn(async move { + let result = render::fetch_profile_feed(pool, ndb, pubkey).await; + match result { + Ok(()) => { + refresh_state.insert( + pubkey, + ProfileRefreshState::Completed(Instant::now()), + ); + } + Err(err) => { + error!("Background profile feed refresh failed: {err}"); + refresh_state.remove(&pubkey); + } + } + }); + occupied.insert(ProfileRefreshState::InProgress { + started: now, + handle: handle.abort_handle(), + }); } } - }; - - if should_refresh { - let last_refresh_map = app.profile_last_refresh.clone(); - tokio::spawn(async move { - let result = render::fetch_profile_feed(pool, ndb, pubkey).await; - match result { - Ok(()) => { - // Update timestamp on success - if let Ok(mut map) = last_refresh_map.lock() { - map.insert(pubkey, Instant::now()); + Entry::Vacant(vacant) => { + // No existing state - start refresh + let refresh_state = app.profile_refresh_state.clone(); + let handle = tokio::spawn(async move { + let result = render::fetch_profile_feed(pool, ndb, pubkey).await; + match result { + Ok(()) => { + refresh_state.insert( + pubkey, + ProfileRefreshState::Completed(Instant::now()), + ); } - } - Err(err) => { - error!("Background profile feed refresh failed: {err}"); - // Clear on failure so next request retries immediately - if let Ok(mut map) = last_refresh_map.lock() { - map.remove(&pubkey); + Err(err) => { + error!("Background profile feed refresh failed: {err}"); + refresh_state.remove(&pubkey); } } - } - }); + }); + vacant.insert(ProfileRefreshState::InProgress { + started: now, + handle: handle.abort_handle(), + }); + } } } else { // No cached data: must wait for relay fetch before rendering @@ -377,7 +433,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> { font_data, default_pfp, prometheus_handle, - profile_last_refresh: Arc::new(Mutex::new(HashMap::new())), + profile_refresh_state: Arc::new(DashMap::new()), }; // We start a loop to continuously accept incoming connections