commit b59b455aa53d1061724452b4a347f95b9939c109
parent 3c41c014b971c8857dcf9ab708adf0a9c904c50c
Author: William Casarin <jb55@jb55.com>
Date: Wed, 18 Feb 2026 12:34:30 -0800
refactor: extract run_inflight_deduplicated and replace mock tests
Extract the inflight dedup pattern (check map, wait-or-work, cleanup,
notify) into a generic async function. Both fetch_if_missing and
ensure_profile_feed now use it. Replace the two tests that only
exercised Tokio Notify and DashMap primitives with five tests that
call the actual production function.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat:
| M | src/main.rs | | | 235 | ++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------- |
1 file changed, 166 insertions(+), 69 deletions(-)
diff --git a/src/main.rs b/src/main.rs
@@ -204,6 +204,43 @@ where
}
}
+/// Deduplicates concurrent async work for the same key.
+///
+/// Returns `true` if this call was the "fetcher" (ran the work),
+/// `false` if it was a "waiter" (another call was already in progress).
+///
+/// The fetcher inserts a Notify into the inflight map, runs the work closure,
+/// then removes the entry and wakes all waiters. Waiters just await the Notify.
+async fn run_inflight_deduplicated<F, Fut>(
+ inflight: &Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>>,
+ key: [u8; 32],
+ work: F,
+) -> bool
+where
+ F: FnOnce() -> Fut,
+ Fut: std::future::Future<Output = ()>,
+{
+ // Check if there's already an inflight fetch for this resource
+ let existing_notify = inflight.get(&key).map(|r| r.value().clone());
+
+ if let Some(notify) = existing_notify {
+ // Another request is already fetching — wait for it
+ notify.notified().await;
+ false
+ } else {
+ // We're the first — register inflight and do the work
+ let n = Arc::new(tokio::sync::Notify::new());
+ inflight.insert(key, n.clone());
+
+ work().await;
+
+ // Signal waiters and remove inflight entry
+ inflight.remove(&key);
+ n.notify_waiters();
+ true
+ }
+}
+
/// Fetch missing render data from relays, deduplicating concurrent requests
/// for the same nip19 so only one relay query fires at a time.
async fn fetch_if_missing(
@@ -215,12 +252,24 @@ async fn fetch_if_missing(
) {
let key = nip19_debounce_key(nip19);
- // Check if there's already an inflight fetch for this resource
- let existing_notify = inflight.get(&key).map(|r| r.value().clone());
+ let was_fetcher = run_inflight_deduplicated(inflight, key, || {
+ let ndb = ndb.clone();
+ let relay_pool = relay_pool.clone();
+ let nip19 = nip19.clone();
+ let render_data_ref = &mut *render_data;
+ async move {
+ if let Err(err) = render_data_ref
+ .complete(ndb, relay_pool, nip19)
+ .await
+ {
+ error!("Error fetching completion data: {err}");
+ }
+ }
+ })
+ .await;
- if let Some(notify) = existing_notify {
- // Another request is already fetching — wait for it, then re-check ndb
- notify.notified().await;
+ if !was_fetcher {
+ // We were a waiter — re-check ndb for updated data
let txn = match Transaction::new(ndb) {
Ok(txn) => txn,
Err(err) => {
@@ -231,21 +280,6 @@ async fn fetch_if_missing(
if let Ok(new_rd) = render::get_render_data(ndb, &txn, nip19) {
*render_data = new_rd;
}
- } else {
- // We're the first — register inflight and do the fetch
- let n = Arc::new(tokio::sync::Notify::new());
- inflight.insert(key, n.clone());
-
- if let Err(err) = render_data
- .complete(ndb.clone(), relay_pool.clone(), nip19.clone())
- .await
- {
- error!("Error fetching completion data: {err}");
- }
-
- // Signal waiters and remove inflight entry
- inflight.remove(&key);
- n.notify_waiters();
}
}
@@ -348,19 +382,12 @@ async fn ensure_profile_feed(
// No cached data: must wait for relay fetch before rendering.
// Use inflight dedup so concurrent requests for the same profile
// don't each fire their own relay queries.
- let existing_notify = inflight.get(&pubkey).map(|r| r.value().clone());
-
- if let Some(notify) = existing_notify {
- notify.notified().await;
- } else {
- let n = Arc::new(tokio::sync::Notify::new());
- inflight.insert(pubkey, n.clone());
+ run_inflight_deduplicated(inflight, pubkey, || async move {
if let Err(err) = render::fetch_profile_feed(pool, ndb, pubkey).await {
error!("Error fetching profile feed: {err}");
}
- inflight.remove(&pubkey);
- n.notify_waiters();
- }
+ })
+ .await;
}
Ok(())
@@ -940,69 +967,139 @@ mod tests {
}
// ---------------------------------------------------------------
- // Inflight deduplication tests (Notify-based)
+ // run_inflight_deduplicated tests
// ---------------------------------------------------------------
#[tokio::test]
- async fn inflight_dedup_concurrent_waiters_share_one_fetch() {
+ async fn inflight_first_caller_runs_work_and_returns_true() {
let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
let key = test_key(0xCC);
- let fetch_count = Arc::new(AtomicUsize::new(0));
+ let work_count = Arc::new(AtomicUsize::new(0));
- // Simulate the "first request" pattern: insert Notify, do work, signal
- let n = Arc::new(tokio::sync::Notify::new());
- inflight.insert(key, n.clone());
+ let wc = work_count.clone();
+ let was_fetcher = run_inflight_deduplicated(&inflight, key, || async move {
+ wc.fetch_add(1, Ordering::SeqCst);
+ })
+ .await;
+
+ assert!(was_fetcher);
+ assert_eq!(work_count.load(Ordering::SeqCst), 1);
+ }
+
+ #[tokio::test]
+ async fn inflight_concurrent_callers_only_run_work_once() {
+ let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
+ let key = test_key(0xDD);
+ let work_count = Arc::new(AtomicUsize::new(0));
+
+ // Use a channel to control when the fetcher's work completes,
+ // so we can launch waiters while it's in progress
+ let (tx, rx) = tokio::sync::oneshot::channel::<()>();
+
+ // Spawn the fetcher — it will block until we send on tx
+ let inflight_c = inflight.clone();
+ let wc = work_count.clone();
+ let fetcher = tokio::spawn(async move {
+ run_inflight_deduplicated(&inflight_c, key, || async move {
+ wc.fetch_add(1, Ordering::SeqCst);
+ rx.await.ok();
+ })
+ .await
+ });
- // Spawn 10 "waiter" tasks that find the existing Notify and wait
+ // Yield to let fetcher start and insert its Notify
+ tokio::task::yield_now().await;
+
+ // Spawn 10 concurrent waiters that call the same function
let mut waiters = Vec::new();
for _ in 0..10 {
- let inflight = inflight.clone();
- let fetch_count = fetch_count.clone();
- let key = key;
+ let inflight_c = inflight.clone();
+ let wc = work_count.clone();
waiters.push(tokio::spawn(async move {
- if let Some(notify) = inflight.get(&key).map(|r| r.value().clone()) {
- // This is the "waiter" path — no fetch
- notify.notified().await;
- } else {
- // This would be the "fetcher" path — shouldn't happen
- fetch_count.fetch_add(1, Ordering::SeqCst);
- }
+ run_inflight_deduplicated(&inflight_c, key, || async move {
+ wc.fetch_add(1, Ordering::SeqCst);
+ })
+ .await
}));
}
- // Give waiters a moment to subscribe
+ // Yield to let waiters register
tokio::task::yield_now().await;
- // Simulate the fetch completing
- fetch_count.fetch_add(1, Ordering::SeqCst);
- inflight.remove(&key);
- n.notify_waiters();
+ // Let the fetcher complete
+ tx.send(()).unwrap();
+
+ let fetcher_result = fetcher.await.unwrap();
+ assert!(fetcher_result, "first caller should be the fetcher");
- // All waiters should complete
for w in waiters {
- w.await.unwrap();
+ let was_fetcher = w.await.unwrap();
+ assert!(!was_fetcher, "waiters should not have run work");
}
- // Only 1 fetch should have happened (the original, not any waiters)
- assert_eq!(fetch_count.load(Ordering::SeqCst), 1);
+ // Work closure should have executed exactly once
+ assert_eq!(work_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
- async fn inflight_second_request_after_completion_can_fetch() {
+ async fn inflight_cleans_up_after_completion() {
let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
- let key = test_key(0xDD);
+ let key = test_key(0xEE);
- // First "request" — insert, do work, remove, notify
- {
- let n = Arc::new(tokio::sync::Notify::new());
- inflight.insert(key, n.clone());
- // ... fetch happens ...
- inflight.remove(&key);
- n.notify_waiters();
- }
+ run_inflight_deduplicated(&inflight, key, || async {}).await;
+
+ // The inflight entry should have been removed
+ assert!(
+ !inflight.contains_key(&key),
+ "inflight entry should be cleaned up after work completes"
+ );
+ }
+
+ #[tokio::test]
+ async fn inflight_second_call_after_completion_runs_work_again() {
+ let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
+ let key = test_key(0xFF);
+ let work_count = Arc::new(AtomicUsize::new(0));
+
+ // First call
+ let wc = work_count.clone();
+ run_inflight_deduplicated(&inflight, key, || async move {
+ wc.fetch_add(1, Ordering::SeqCst);
+ })
+ .await;
+
+ // Second call — should run work again since inflight was cleaned up
+ let wc = work_count.clone();
+ let was_fetcher = run_inflight_deduplicated(&inflight, key, || async move {
+ wc.fetch_add(1, Ordering::SeqCst);
+ })
+ .await;
+
+ assert!(was_fetcher, "second call should be a fetcher, not a waiter");
+ assert_eq!(work_count.load(Ordering::SeqCst), 2);
+ }
+
+ #[tokio::test]
+ async fn inflight_independent_keys_both_run_work() {
+ let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
+ let key_a = test_key(0xAA);
+ let key_b = test_key(0xBB);
+ let work_count = Arc::new(AtomicUsize::new(0));
+
+ let wc = work_count.clone();
+ let a = run_inflight_deduplicated(&inflight, key_a, || async move {
+ wc.fetch_add(1, Ordering::SeqCst);
+ })
+ .await;
+
+ let wc = work_count.clone();
+ let b = run_inflight_deduplicated(&inflight, key_b, || async move {
+ wc.fetch_add(1, Ordering::SeqCst);
+ })
+ .await;
- // Second "request" — should NOT find an inflight entry
- let found = inflight.get(&key).is_some();
- assert!(!found, "inflight entry should have been cleaned up");
+ assert!(a);
+ assert!(b);
+ assert_eq!(work_count.load(Ordering::SeqCst), 2);
}
}