commit 3f201f63ea49f76dabe99051e2631f0c373a23a8
parent b59b455aa53d1061724452b4a347f95b9939c109
Author: William Casarin <jb55@jb55.com>
Date: Sun, 22 Feb 2026 11:46:18 -0800
fix: replace Notify with watch channel for inflight dedup, upgrade nostrdb
Switch run_inflight_deduplicated from Notify to watch channels with
DashMap::entry() for atomic check-and-insert. Fixes two bugs:
- TOCTOU race where two callers could both become fetchers
- Missed notification when fetcher completes before waiter subscribes
Also upgrades to local nostrdb 0.10.0 which fixes the poll_next
deadlock (subs mutex held during ndb_poll_for_notes FFI call).
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Diffstat:
4 files changed, 43 insertions(+), 34 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -1871,9 +1871,7 @@ dependencies = [
[[package]]
name = "nostrdb"
-version = "0.9.0"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "05b2685d093dca579807150b6fafab4dcb974fc6e31017d273ae32d42795d41b"
+version = "0.10.0"
dependencies = [
"bindgen 0.69.5",
"cc",
diff --git a/Cargo.toml b/Cargo.toml
@@ -13,8 +13,8 @@ hyper-util = { version = "0.1.1", features = ["full"] }
http-body-util = "0.1"
tracing = "0.1.41"
tracing-subscriber = "0.3.19"
-nostrdb = "0.9.0"
-#nostrdb = { path = "/home/jb55/src/rust/nostrdb-rs" }
+#nostrdb = "0.9.0"
+nostrdb = { path = "/home/jb55/dev/github/damus-io/nostrdb-rs" }
#nostrdb = "0.1.6"
#nostr-sdk = { git = "https://github.com/damus-io/nostr-sdk.git", rev = "fc0dc7b38f5060f171228b976b9700c0135245d3" }
#nostr-sdk = "0.37.0"
diff --git a/src/html.rs b/src/html.rs
@@ -1159,7 +1159,9 @@ fn build_note_stats_html(ndb: &Ndb, txn: &Transaction, note: &Note, is_root: boo
emojis.push((s.to_string(), count));
}
}
- NoteMetadataEntryVariant::Unknown(_) => {}
+ NoteMetadataEntryVariant::Unknown(_)
+ | NoteMetadataEntryVariant::Zap(_)
+ | NoteMetadataEntryVariant::ZapUnverified(_) => {}
}
}
diff --git a/src/main.rs b/src/main.rs
@@ -2,6 +2,7 @@ use std::net::SocketAddr;
use std::time::Instant;
use dashmap::DashMap;
+use tokio::sync::watch;
use tokio::task::AbortHandle;
use http_body_util::Full;
@@ -83,9 +84,10 @@ pub struct Notecrumbs {
note_refresh_state: Arc<DashMap<[u8; 32], RefreshState>>,
/// Inflight fetches - deduplicates concurrent relay queries for the same resource.
- /// Keyed by nip19 debounce key. Waiters subscribe to the Notify; the fetcher
- /// notifies on completion.
- inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>>,
+ /// Keyed by nip19 debounce key. Waiters clone the watch::Receiver and wait for
+ /// the fetcher to signal completion. Uses watch instead of Notify to avoid
+ /// missed-notification races.
+ inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>>,
}
#[inline]
@@ -209,10 +211,10 @@ where
/// Returns `true` if this call was the "fetcher" (ran the work),
/// `false` if it was a "waiter" (another call was already in progress).
///
-/// The fetcher inserts a Notify into the inflight map, runs the work closure,
-/// then removes the entry and wakes all waiters. Waiters just await the Notify.
+/// Uses `DashMap::entry()` for atomic check-and-insert (no TOCTOU race)
+/// and `watch` channels so waiters can't miss the completion signal.
async fn run_inflight_deduplicated<F, Fut>(
- inflight: &Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>>,
+ inflight: &DashMap<[u8; 32], watch::Receiver<bool>>,
key: [u8; 32],
work: F,
) -> bool
@@ -220,24 +222,31 @@ where
F: FnOnce() -> Fut,
Fut: std::future::Future<Output = ()>,
{
- // Check if there's already an inflight fetch for this resource
- let existing_notify = inflight.get(&key).map(|r| r.value().clone());
+ use dashmap::mapref::entry::Entry;
- if let Some(notify) = existing_notify {
- // Another request is already fetching — wait for it
- notify.notified().await;
- false
- } else {
- // We're the first — register inflight and do the work
- let n = Arc::new(tokio::sync::Notify::new());
- inflight.insert(key, n.clone());
+ match inflight.entry(key) {
+ Entry::Occupied(entry) => {
+ // Another request is already fetching — clone receiver then
+ // release the shard lock before awaiting
+ let mut rx = entry.get().clone();
+ drop(entry);
+ // wait_for checks the current value first, so even if the
+ // fetcher already completed we won't miss it
+ let _ = rx.wait_for(|&done| done).await;
+ false
+ }
+ Entry::Vacant(entry) => {
+ // We're the first — insert a watch receiver so waiters can find it
+ let (tx, rx) = watch::channel(false);
+ entry.insert(rx);
- work().await;
+ work().await;
- // Signal waiters and remove inflight entry
- inflight.remove(&key);
- n.notify_waiters();
- true
+ // Clean up and signal all waiters
+ inflight.remove(&key);
+ let _ = tx.send(true);
+ true
+ }
}
}
@@ -246,7 +255,7 @@ where
async fn fetch_if_missing(
ndb: &Ndb,
relay_pool: &Arc<RelayPool>,
- inflight: &Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>>,
+ inflight: &DashMap<[u8; 32], watch::Receiver<bool>>,
render_data: &mut RenderData,
nip19: &Nip19,
) {
@@ -322,7 +331,7 @@ fn spawn_note_secondary_fetch(
async fn ensure_profile_feed(
ndb: &Ndb,
relay_pool: &Arc<RelayPool>,
- inflight: &Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>>,
+ inflight: &DashMap<[u8; 32], watch::Receiver<bool>>,
profile_refresh_state: &Arc<DashMap<[u8; 32], RefreshState>>,
profile_opt: &Option<ProfileRenderData>,
) -> Result<(), Error> {
@@ -972,7 +981,7 @@ mod tests {
#[tokio::test]
async fn inflight_first_caller_runs_work_and_returns_true() {
- let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
+ let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
let key = test_key(0xCC);
let work_count = Arc::new(AtomicUsize::new(0));
@@ -988,7 +997,7 @@ mod tests {
#[tokio::test]
async fn inflight_concurrent_callers_only_run_work_once() {
- let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
+ let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
let key = test_key(0xDD);
let work_count = Arc::new(AtomicUsize::new(0));
@@ -1043,7 +1052,7 @@ mod tests {
#[tokio::test]
async fn inflight_cleans_up_after_completion() {
- let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
+ let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
let key = test_key(0xEE);
run_inflight_deduplicated(&inflight, key, || async {}).await;
@@ -1057,7 +1066,7 @@ mod tests {
#[tokio::test]
async fn inflight_second_call_after_completion_runs_work_again() {
- let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
+ let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
let key = test_key(0xFF);
let work_count = Arc::new(AtomicUsize::new(0));
@@ -1081,7 +1090,7 @@ mod tests {
#[tokio::test]
async fn inflight_independent_keys_both_run_work() {
- let inflight: Arc<DashMap<[u8; 32], Arc<tokio::sync::Notify>>> = Arc::new(DashMap::new());
+ let inflight: Arc<DashMap<[u8; 32], watch::Receiver<bool>>> = Arc::new(DashMap::new());
let key_a = test_key(0xAA);
let key_b = test_key(0xBB);
let work_count = Arc::new(AtomicUsize::new(0));