notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit bd2517c597e151e045623c48aec9ef1b1bf34d3a
parent a11d1ac0ef5b254a80e2b51547f38cd12307a0e3
Author: William Casarin <jb55@jb55.com>
Date:   Wed, 25 Feb 2026 12:42:49 -0800

columns: async initial timeline loading to avoid UI stalls

Move initial timeline NostrDB queries off the UI thread by introducing
a generic AsyncLoader<Cmd, Msg> worker pool and a TimelineLoader that
streams NoteRef batches via ndb fold. Results are drained each frame
(capped at 8 per frame) to keep rendering smooth.

This also refactors MessagesLoader to reuse the new AsyncLoader,
eliminating the duplicated worker thread spawning code, and adds
a load_local flag to TimelineCache::open() so startup paths can
skip blocking local queries.

Diffstat:
MCargo.lock | 1+
Acrates/notedeck/src/async_loader.rs | 109+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/notedeck/src/lib.rs | 2++
Mcrates/notedeck_columns/Cargo.toml | 2+-
Mcrates/notedeck_columns/src/actionbar.rs | 4++--
Mcrates/notedeck_columns/src/app.rs | 110++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------
Mcrates/notedeck_columns/src/column.rs | 2+-
Mcrates/notedeck_columns/src/lib.rs | 1+
Mcrates/notedeck_columns/src/nav.rs | 3++-
Mcrates/notedeck_columns/src/timeline/cache.rs | 37+++++++++++++++++++++++++++++++++----
Mcrates/notedeck_columns/src/timeline/kind.rs | 56++++++++++++++++++++++++++++++++++++++------------------
Mcrates/notedeck_columns/src/timeline/mod.rs | 17++++++++---------
Acrates/notedeck_columns/src/timeline_loader.rs | 203+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/notedeck_messages/src/loader.rs | 107+++++++++++++++++++++++++++----------------------------------------------------
14 files changed, 530 insertions(+), 124 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4593,6 +4593,7 @@ dependencies = [ "base64 0.22.1", "bech32", "bitflags 2.9.1", + "crossbeam-channel", "dirs", "eframe", "egui", diff --git a/crates/notedeck/src/async_loader.rs b/crates/notedeck/src/async_loader.rs @@ -0,0 +1,109 @@ +//! Async loader helpers for background NostrDB work. + +use crossbeam_channel as chan; +use nostrdb::Ndb; +use std::sync::Arc; +use std::thread; + +/// Handle for a background async loader that processes commands on worker threads. +pub struct AsyncLoader<Cmd, Msg> { + cmd_tx: Option<chan::Sender<Cmd>>, + msg_rx: Option<chan::Receiver<Msg>>, +} + +impl<Cmd, Msg> AsyncLoader<Cmd, Msg> +where + Cmd: Send + 'static, + Msg: Send + 'static, +{ + /// Create an uninitialized loader handle. + pub fn new() -> Self { + Self { + cmd_tx: None, + msg_rx: None, + } + } + + /// Start the loader workers if they have not been started yet. + pub fn start( + &mut self, + egui_ctx: egui::Context, + ndb: Ndb, + workers: usize, + worker_name: &str, + handler: impl Fn(Cmd, &egui::Context, &Ndb, &chan::Sender<Msg>) + Send + Sync + 'static, + ) -> bool { + if self.cmd_tx.is_some() { + return false; + } + + let (cmd_tx, cmd_rx) = chan::unbounded::<Cmd>(); + let (msg_tx, msg_rx) = chan::unbounded::<Msg>(); + + self.cmd_tx = Some(cmd_tx.clone()); + self.msg_rx = Some(msg_rx); + + let handler = Arc::new(handler); + let workers = workers.max(1); + for idx in 0..workers { + let cmd_rx = cmd_rx.clone(); + let msg_tx = msg_tx.clone(); + let egui_ctx = egui_ctx.clone(); + let ndb = ndb.clone(); + let handler = handler.clone(); + let name = if workers == 1 { + worker_name.to_string() + } else { + format!("{worker_name}-{idx}") + }; + + thread::Builder::new() + .name(name) + .spawn(move || { + while let Ok(cmd) = cmd_rx.recv() { + (handler)(cmd, &egui_ctx, &ndb, &msg_tx); + } + }) + .expect("failed to spawn async loader worker"); + } + + true + } + + /// Send a loader command if the worker pool is available. + pub fn send(&self, cmd: Cmd) { + let Some(tx) = &self.cmd_tx else { + return; + }; + + let _ = tx.send(cmd); + } + + /// Try to receive the next loader message without blocking. + pub fn try_recv(&self) -> Option<Msg> { + let Some(rx) = &self.msg_rx else { + return None; + }; + + rx.try_recv().ok() + } +} + +impl<Cmd, Msg> Default for AsyncLoader<Cmd, Msg> +where + Cmd: Send + 'static, + Msg: Send + 'static, +{ + fn default() -> Self { + Self::new() + } +} + +/// Compute a worker count based on available parallelism, clamped to a max. +pub fn worker_count(max_workers: usize) -> usize { + let available = thread::available_parallelism() + .map(|count| count.get()) + .unwrap_or(1); + let max_workers = max_workers.max(1); + available.clamp(1, max_workers) +} diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs @@ -4,6 +4,7 @@ pub mod abbrev; mod account; mod app; mod args; +pub mod async_loader; pub mod compact; pub mod contacts; mod context; @@ -51,6 +52,7 @@ pub use account::relay::RelayAction; pub use account::FALLBACK_PUBKEY; pub use app::{try_process_events_core, App, AppAction, AppResponse, Notedeck}; pub use args::Args; +pub use async_loader::{worker_count, AsyncLoader}; pub use context::{AppContext, SoftKeyboardContext}; pub use error::{show_one_error_message, Error, FilterError, ZapError}; pub use filter::{FilterState, FilterStates, UnifiedSubscription}; diff --git a/crates/notedeck_columns/Cargo.toml b/crates/notedeck_columns/Cargo.toml @@ -59,6 +59,7 @@ profiling = { workspace = true } hashbrown = { workspace = true } oot_bitset = { workspace = true } human_format = "1.1.0" +crossbeam-channel = { workspace = true } [target.'cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))'.dependencies] rfd = { workspace = true } @@ -74,4 +75,3 @@ security-framework = "2.11.0" default = [] puffin = ["dep:puffin", "profiling/profile-with-puffin"] tracy = ["profiling/profile-with-tracy"] - diff --git a/crates/notedeck_columns/src/actionbar.rs b/crates/notedeck_columns/src/actionbar.rs @@ -117,7 +117,7 @@ fn execute_note_action( let kind = TimelineKind::Profile(pubkey); router_action = Some(RouterAction::route_to(Route::Timeline(kind.clone()))); timeline_res = timeline_cache - .open(ndb, note_cache, txn, pool, &kind) + .open(ndb, note_cache, txn, pool, &kind, false) .map(NotesOpenResult::Timeline); } NoteAction::Note { @@ -154,7 +154,7 @@ fn execute_note_action( let kind = TimelineKind::Hashtag(vec![htag.clone()]); router_action = Some(RouterAction::route_to(Route::Timeline(kind.clone()))); timeline_res = timeline_cache - .open(ndb, note_cache, txn, pool, &kind) + .open(ndb, note_cache, txn, pool, &kind, false) .map(NotesOpenResult::Timeline); } NoteAction::Repost(note_id) => { diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs @@ -11,6 +11,7 @@ use crate::{ subscriptions::{SubKind, Subscriptions}, support::Support, timeline::{self, kind::ListKind, thread::Threads, TimelineCache, TimelineKind}, + timeline_loader::{TimelineLoader, TimelineLoaderMsg}, toolbar::unseen_notification, ui::{self, toolbar::toolbar, DesktopSidePanel, SidePanelAction}, view_state::ViewState, @@ -28,11 +29,14 @@ use notedeck_ui::{ media::{MediaViewer, MediaViewerFlags, MediaViewerState}, NoteOptions, }; -use std::collections::{BTreeSet, HashMap}; +use std::collections::{BTreeSet, HashMap, HashSet}; use std::path::Path; use tracing::{error, info, warn}; use uuid::Uuid; +/// Max timeline loader messages to process per frame to avoid UI stalls. +const MAX_TIMELINE_LOADER_MSGS_PER_FRAME: usize = 8; + #[derive(Debug, Eq, PartialEq, Clone)] pub enum DamusState { Initializing, @@ -50,6 +54,12 @@ pub struct Damus { pub subscriptions: Subscriptions, pub support: Support, pub threads: Threads, + /// Background loader for initial timeline scans. + timeline_loader: TimelineLoader, + /// Timelines currently loading initial notes. + inflight_timeline_loads: HashSet<TimelineKind>, + /// Timelines that have completed their initial load. + loaded_timeline_loads: HashSet<TimelineKind>, //frame_history: crate::frame_history::FrameHistory, @@ -192,16 +202,18 @@ fn try_process_event( }); for (kind, timeline) in &mut damus.timeline_cache { - let is_ready = timeline::is_timeline_ready( - app_ctx.ndb, - app_ctx.pool, - app_ctx.note_cache, - timeline, - app_ctx.accounts, - app_ctx.unknown_ids, - ); + let is_ready = + timeline::is_timeline_ready(app_ctx.ndb, app_ctx.pool, timeline, app_ctx.accounts); if is_ready { + schedule_timeline_load( + &damus.timeline_loader, + &mut damus.inflight_timeline_loads, + &damus.loaded_timeline_loads, + app_ctx.ndb, + kind, + timeline, + ); let txn = Transaction::new(app_ctx.ndb).expect("txn"); // only thread timelines are reversed let reversed = false; @@ -230,10 +242,74 @@ fn try_process_event( Ok(()) } +/// Schedule an initial timeline load if it is not already in-flight or complete. +fn schedule_timeline_load( + loader: &TimelineLoader, + inflight: &mut HashSet<TimelineKind>, + loaded: &HashSet<TimelineKind>, + ndb: &nostrdb::Ndb, + kind: &TimelineKind, + timeline: &mut timeline::Timeline, +) { + if loaded.contains(kind) || inflight.contains(kind) { + return; + } + + let Some(filter) = timeline.filter.get_any_ready().cloned() else { + return; + }; + + if timeline.kind.should_subscribe_locally() { + timeline.subscription.try_add_local(ndb, &filter); + } + + loader.load_timeline(kind.clone()); + inflight.insert(kind.clone()); +} + +/// Drain timeline loader messages and apply them to the timeline cache. +#[profiling::function] +fn handle_timeline_loader_messages(damus: &mut Damus, app_ctx: &mut AppContext<'_>) { + let mut handled = 0; + while handled < MAX_TIMELINE_LOADER_MSGS_PER_FRAME { + let Some(msg) = damus.timeline_loader.try_recv() else { + break; + }; + handled += 1; + + match msg { + TimelineLoaderMsg::TimelineBatch { kind, notes } => { + let Some(timeline) = damus.timeline_cache.get_mut(&kind) else { + warn!("timeline loader batch for missing timeline {:?}", kind); + continue; + }; + let txn = Transaction::new(app_ctx.ndb).expect("txn"); + if let Some(pks) = + timeline.insert_new(&txn, app_ctx.ndb, app_ctx.note_cache, &notes) + { + pks.process(app_ctx.ndb, &txn, app_ctx.unknown_ids); + } + } + TimelineLoaderMsg::TimelineFinished { kind } => { + damus.inflight_timeline_loads.remove(&kind); + damus.loaded_timeline_loads.insert(kind); + } + TimelineLoaderMsg::Failed { kind, error } => { + warn!("timeline loader failed for {:?}: {}", kind, error); + damus.inflight_timeline_loads.remove(&kind); + } + } + } +} + #[profiling::function] fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Context) { app_ctx.img_cache.urls.cache.handle_io(); + damus + .timeline_loader + .start(ctx.clone(), app_ctx.ndb.clone()); + if damus.columns(app_ctx.accounts).columns().is_empty() { damus .columns_mut(app_ctx.i18n, app_ctx.accounts) @@ -247,14 +323,6 @@ fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Con damus .subscriptions() .insert("unknownids".to_string(), SubKind::OneShot); - if let Err(err) = timeline::setup_initial_nostrdb_subs( - app_ctx.ndb, - app_ctx.note_cache, - &mut damus.timeline_cache, - app_ctx.unknown_ids, - ) { - warn!("update_damus init: {err}"); - } if !app_ctx.settings.welcome_completed() { let split = @@ -276,6 +344,8 @@ fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Con DamusState::Initialized => (), }; + handle_timeline_loader_messages(damus, app_ctx); + if let Err(err) = try_process_event(damus, app_ctx, ctx) { error!("error processing event: {}", err); } @@ -546,6 +616,9 @@ impl Damus { threads, onboarding: Onboarding::default(), hovered_column: None, + timeline_loader: TimelineLoader::default(), + inflight_timeline_loads: HashSet::new(), + loaded_timeline_loads: HashSet::new(), } } @@ -597,6 +670,9 @@ impl Damus { threads: Threads::default(), onboarding: Onboarding::default(), hovered_column: None, + timeline_loader: TimelineLoader::default(), + inflight_timeline_loads: HashSet::new(), + loaded_timeline_loads: HashSet::new(), } } diff --git a/crates/notedeck_columns/src/column.rs b/crates/notedeck_columns/src/column.rs @@ -112,7 +112,7 @@ impl Columns { ) -> Option<TimelineOpenResult> { self.columns .push(Column::new(vec![Route::timeline(kind.to_owned())])); - timeline_cache.open(ndb, note_cache, txn, pool, kind) + timeline_cache.open(ndb, note_cache, txn, pool, kind, false) } pub fn new_column_picker(&mut self) { diff --git a/crates/notedeck_columns/src/lib.rs b/crates/notedeck_columns/src/lib.rs @@ -28,6 +28,7 @@ mod subscriptions; mod support; mod test_data; pub mod timeline; +mod timeline_loader; mod toolbar; pub mod ui; mod view_state; diff --git a/crates/notedeck_columns/src/nav.rs b/crates/notedeck_columns/src/nav.rs @@ -417,7 +417,8 @@ fn handle_navigating_timeline( }; let txn = Transaction::new(ndb).expect("txn"); - app.timeline_cache.open(ndb, note_cache, &txn, pool, &kind); + app.timeline_cache + .open(ndb, note_cache, &txn, pool, &kind, false); } pub enum RouterAction { diff --git a/crates/notedeck_columns/src/timeline/cache.rs b/crates/notedeck_columns/src/timeline/cache.rs @@ -169,10 +169,11 @@ impl TimelineCache { } } - /// Open a timeline, this is another way of saying insert a timeline - /// into the timeline cache. If there exists a timeline already, we - /// bump its subscription reference count. If it's new we start a new - /// subscription + /// Open a timeline, optionally loading local notes. + /// + /// When `load_local` is false, the timeline is created and subscribed + /// without running a blocking local query. Use this for startup paths + /// where initial notes are loaded asynchronously. pub fn open( &mut self, ndb: &Ndb, @@ -180,7 +181,35 @@ impl TimelineCache { txn: &Transaction, pool: &mut RelayPool, id: &TimelineKind, + load_local: bool, ) -> Option<TimelineOpenResult> { + if !load_local { + let timeline = if let Some(timeline) = self.timelines.get_mut(id) { + timeline + } else { + let Some(timeline) = id.clone().into_timeline(txn, ndb) else { + error!("Error creating timeline from {:?}", id); + return None; + }; + self.timelines.insert(id.clone(), timeline); + self.timelines.get_mut(id).expect("timeline inserted") + }; + + if let Some(filter) = timeline.filter.get_any_ready() { + debug!("got open with subscription for {:?}", &timeline.kind); + timeline.subscription.try_add_local(ndb, filter); + timeline.subscription.try_add_remote(pool, filter); + } else { + debug!( + "open skipped subscription; filter not ready for {:?}", + &timeline.kind + ); + } + + timeline.subscription.increment(); + return None; + } + let notes_resp = self.notes(ndb, note_cache, txn, id); let (mut open_result, timeline) = match notes_resp.vitality { Vitality::Stale(timeline) => { diff --git a/crates/notedeck_columns/src/timeline/kind.rs b/crates/notedeck_columns/src/timeline/kind.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::hash::{Hash, Hasher}; use tokenator::{ParseError, TokenParser, TokenSerializable, TokenWriter}; -use tracing::{error, warn}; +use tracing::{debug, error, warn}; #[derive(Clone, Hash, Copy, Default, Debug, PartialEq, Eq, Serialize, Deserialize)] pub enum PubkeySource { @@ -461,6 +461,7 @@ impl TimelineKind { } // TODO: probably should set default limit here + /// Build the filter state for this timeline kind. pub fn filters(&self, txn: &Transaction, ndb: &Ndb) -> FilterState { match self { TimelineKind::Search(s) => FilterState::ready(search_filter(s)), @@ -477,24 +478,34 @@ impl TimelineKind { } TimelineKind::Hashtag(hashtag) => { - let filters = hashtag - .iter() - .filter(|tag| !tag.is_empty()) - .map(|tag| { + let mut filters = Vec::new(); + for tag in hashtag.iter().filter(|tag| !tag.is_empty()) { + let tag_lower = tag.to_lowercase(); + filters.push( Filter::new() .kinds([1]) .limit(filter::default_limit()) - .tags([tag.to_lowercase().as_str()], 't') - .build() - }) - .collect::<Vec<_>>(); + .tags([tag_lower.as_str()], 't') + .build(), + ); + } + + if filters.is_empty() { + warn!(?hashtag, "hashtag timeline has no usable tags"); + } else if filters.len() != hashtag.len() { + debug!( + ?hashtag, + usable_tags = filters.len(), + "hashtag timeline dropped empty tags" + ); + } FilterState::ready(filters) } TimelineKind::Algo(algo_timeline) => match algo_timeline { AlgoTimeline::LastPerPubkey(list_k) => match list_k { - ListKind::Contact(pubkey) => last_per_pubkey_filter_state(ndb, pubkey), + ListKind::Contact(pubkey) => last_per_pubkey_filter_state(txn, ndb, pubkey), }, }, @@ -685,12 +696,17 @@ impl<'a> ColumnTitle<'a> { } } +/// Build the filter state for a contact list timeline. fn contact_filter_state(txn: &Transaction, ndb: &Ndb, pk: &Pubkey) -> FilterState { let contact_filter = contacts_filter(pk); - let results = ndb - .query(txn, std::slice::from_ref(&contact_filter), 1) - .expect("contact query failed?"); + let results = match ndb.query(txn, std::slice::from_ref(&contact_filter), 1) { + Ok(results) => results, + Err(err) => { + error!("contact query failed: {err}"); + return FilterState::Broken(FilterError::EmptyContactList); + } + }; if results.is_empty() { FilterState::needs_remote() @@ -709,13 +725,17 @@ fn contact_filter_state(txn: &Transaction, ndb: &Ndb, pk: &Pubkey) -> FilterStat } } -fn last_per_pubkey_filter_state(ndb: &Ndb, pk: &Pubkey) -> FilterState { +/// Build the filter state for a last-per-pubkey timeline. +fn last_per_pubkey_filter_state(txn: &Transaction, ndb: &Ndb, pk: &Pubkey) -> FilterState { let contact_filter = contacts_filter(pk.bytes()); - let txn = Transaction::new(ndb).expect("txn"); - let results = ndb - .query(&txn, std::slice::from_ref(&contact_filter), 1) - .expect("contact query failed?"); + let results = match ndb.query(txn, std::slice::from_ref(&contact_filter), 1) { + Ok(results) => results, + Err(err) => { + error!("contact query failed: {err}"); + return FilterState::Broken(FilterError::EmptyContactList); + } + }; if results.is_empty() { FilterState::needs_remote() diff --git a/crates/notedeck_columns/src/timeline/mod.rs b/crates/notedeck_columns/src/timeline/mod.rs @@ -285,6 +285,7 @@ impl Timeline { )) } + /// Create a hashtag timeline with ready filters. pub fn hashtag(hashtag: Vec<String>) -> Self { let filters = hashtag .iter() @@ -298,6 +299,10 @@ impl Timeline { }) .collect::<Vec<_>>(); + if filters.is_empty() { + warn!(?hashtag, "hashtag timeline created with no usable tags"); + } + Timeline::new( TimelineKind::Hashtag(hashtag), FilterState::ready(filters), @@ -617,7 +622,7 @@ pub fn setup_new_timeline( unknown_ids: &mut UnknownIds, ) { // if we're ready, setup local subs - if is_timeline_ready(ndb, pool, note_cache, timeline, accounts, unknown_ids) { + if is_timeline_ready(ndb, pool, timeline, accounts) { if let Err(err) = setup_timeline_nostrdb_sub(ndb, txn, note_cache, timeline, unknown_ids) { error!("setup_new_timeline: {err}"); } @@ -843,10 +848,8 @@ fn setup_timeline_nostrdb_sub( pub fn is_timeline_ready( ndb: &Ndb, pool: &mut RelayPool, - note_cache: &mut NoteCache, timeline: &mut Timeline, accounts: &Accounts, - unknown_ids: &mut UnknownIds, ) -> bool { // TODO: we should debounce the filter states a bit to make sure we have // seen all of the different contact lists from each relay @@ -916,12 +919,8 @@ pub fn is_timeline_ready( false } Ok(filter) => { - // we just switched to the ready state, we should send initial - // queries and setup the local subscription - info!("Found contact list! Setting up local and remote contact list query"); - let txn = Transaction::new(ndb).expect("txn"); - setup_initial_timeline(ndb, &txn, timeline, note_cache, unknown_ids, &filter) - .expect("setup init"); + // We just switched to the ready state; remote subscriptions can start now. + info!("Found contact list! Setting up remote contact list query"); timeline .filter .set_relay_state(relay_id, FilterState::ready_hybrid(filter.clone())); diff --git a/crates/notedeck_columns/src/timeline_loader.rs b/crates/notedeck_columns/src/timeline_loader.rs @@ -0,0 +1,203 @@ +//! Background loader for initial timeline NostrDB queries. + +use crossbeam_channel as chan; +use nostrdb::{Ndb, Transaction}; +use notedeck::{worker_count, AsyncLoader, FilterState, NoteRef}; + +use crate::timeline::kind::AlgoTimeline; +use crate::timeline::TimelineKind; + +use tracing::{info, warn}; + +const FOLD_BATCH_SIZE: usize = 200; +const MAX_TIMELINE_LOADER_WORKERS: usize = 4; + +/// Commands sent to the timeline loader worker thread. +pub enum TimelineLoaderCmd { + /// Load initial note refs for a timeline. + LoadTimeline { + /// Timeline identifier to apply batches to. + kind: TimelineKind, + }, +} + +/// Messages emitted by the timeline loader worker thread. +pub enum TimelineLoaderMsg { + /// Batch of note refs for a timeline. + TimelineBatch { + /// Timeline identifier to apply batches to. + kind: TimelineKind, + /// Note refs discovered by NostrDB fold. + notes: Vec<NoteRef>, + }, + /// Timeline initial load finished. + TimelineFinished { kind: TimelineKind }, + /// Loader error for a timeline. + Failed { kind: TimelineKind, error: String }, +} + +/// Handle for driving the timeline loader worker thread. +pub struct TimelineLoader { + loader: AsyncLoader<TimelineLoaderCmd, TimelineLoaderMsg>, +} + +impl TimelineLoader { + /// Create an uninitialized loader handle. + pub fn new() -> Self { + Self { + loader: AsyncLoader::new(), + } + } + + /// Start the loader workers if they have not been started yet. + pub fn start(&mut self, egui_ctx: egui::Context, ndb: Ndb) { + let workers = worker_count(MAX_TIMELINE_LOADER_WORKERS); + let started = self.loader.start( + egui_ctx, + ndb, + workers, + "columns-timeline-loader", + handle_cmd, + ); + if started { + info!(workers, "starting timeline loader workers"); + } + } + + /// Request an initial load for a timeline. + pub fn load_timeline(&self, kind: TimelineKind) { + self.loader.send(TimelineLoaderCmd::LoadTimeline { kind }); + } + + /// Try to receive the next loader message without blocking. + pub fn try_recv(&self) -> Option<TimelineLoaderMsg> { + self.loader.try_recv() + } +} + +impl Default for TimelineLoader { + fn default() -> Self { + Self::new() + } +} + +/// Handle loader commands on a worker thread. +fn handle_cmd( + cmd: TimelineLoaderCmd, + egui_ctx: &egui::Context, + ndb: &Ndb, + msg_tx: &chan::Sender<TimelineLoaderMsg>, +) { + let result = match cmd { + TimelineLoaderCmd::LoadTimeline { kind } => load_timeline(egui_ctx, ndb, msg_tx, kind), + }; + + if let Err((kind, err)) = result { + let _ = msg_tx.send(TimelineLoaderMsg::Failed { kind, error: err }); + egui_ctx.request_repaint(); + } +} + +/// Fold accumulator for batching note refs. +struct FoldAcc { + batch: Vec<NoteRef>, + msg_tx: chan::Sender<TimelineLoaderMsg>, + egui_ctx: egui::Context, + kind: TimelineKind, +} + +impl FoldAcc { + fn push_note(&mut self, note: NoteRef) -> Result<(), String> { + self.batch.push(note); + if self.batch.len() >= FOLD_BATCH_SIZE { + self.flush()?; + } + Ok(()) + } + + fn flush(&mut self) -> Result<(), String> { + if self.batch.is_empty() { + return Ok(()); + } + + let notes = std::mem::take(&mut self.batch); + self.msg_tx + .send(TimelineLoaderMsg::TimelineBatch { + kind: self.kind.clone(), + notes, + }) + .map_err(|_| "timeline loader channel closed".to_string())?; + self.egui_ctx.request_repaint(); + Ok(()) + } +} + +/// Run an initial timeline load and stream note ref batches. +fn load_timeline( + egui_ctx: &egui::Context, + ndb: &Ndb, + msg_tx: &chan::Sender<TimelineLoaderMsg>, + kind: TimelineKind, +) -> Result<(), (TimelineKind, String)> { + let txn = Transaction::new(ndb).map_err(|e| (kind.clone(), e.to_string()))?; + let filter_state = kind.filters(&txn, ndb); + let FilterState::Ready(filters) = filter_state else { + warn!(?kind, "timeline loader filter not ready"); + return Err((kind, "timeline filter not ready".to_string())); + }; + + let mut acc = FoldAcc { + batch: Vec::with_capacity(FOLD_BATCH_SIZE), + msg_tx: msg_tx.clone(), + egui_ctx: egui_ctx.clone(), + kind: kind.clone(), + }; + + let use_query = matches!(kind, TimelineKind::Algo(AlgoTimeline::LastPerPubkey(_))); + + for package in filters.local().packages { + if package.filters.is_empty() { + warn!(?kind, "timeline loader received empty filter package"); + } + + if use_query { + let mut lim = 0i32; + for filter in package.filters { + lim += filter.limit().unwrap_or(1) as i32; + } + + let cur_notes: Vec<NoteRef> = ndb + .query(&txn, package.filters, lim) + .map_err(|e| (kind.clone(), e.to_string()))? + .into_iter() + .map(NoteRef::from_query_result) + .collect(); + for note_ref in cur_notes { + if let Err(err) = acc.push_note(note_ref) { + tracing::error!("timeline loader push error: {err}"); + } + } + continue; + } + + let fold_result = ndb.fold(&txn, package.filters, acc, |mut acc, note| { + if let Some(key) = note.key() { + let note_ref = NoteRef { + key, + created_at: note.created_at(), + }; + if let Err(err) = acc.push_note(note_ref) { + tracing::error!("timeline loader flush error: {err}"); + } + } + acc + }); + + acc = fold_result.map_err(|e| (kind.clone(), e.to_string()))?; + } + + acc.flush().map_err(|e| (kind.clone(), e))?; + let _ = msg_tx.send(TimelineLoaderMsg::TimelineFinished { kind }); + egui_ctx.request_repaint(); + Ok(()) +} diff --git a/crates/notedeck_messages/src/loader.rs b/crates/notedeck_messages/src/loader.rs @@ -1,11 +1,11 @@ //! Background loader for Messages NostrDB queries. -use std::thread; - use crossbeam_channel as chan; use enostr::Pubkey; use nostrdb::{Filter, Ndb, NoteKey, Transaction}; +use notedeck::AsyncLoader; + use crate::{ cache::ConversationId, nip17::{chatroom_filter, conversation_filter}, @@ -44,41 +44,28 @@ pub enum LoaderMsg { /// Handle for driving the messages loader worker thread. pub struct MessagesLoader { - cmd_tx: Option<chan::Sender<LoaderCmd>>, - msg_rx: Option<chan::Receiver<LoaderMsg>>, + loader: AsyncLoader<LoaderCmd, LoaderMsg>, } impl MessagesLoader { /// Create an uninitialized loader handle. pub fn new() -> Self { Self { - cmd_tx: None, - msg_rx: None, + loader: AsyncLoader::new(), } } - /// Start the loader thread if it has not been started yet. + /// Start the loader workers if they have not been started yet. pub fn start(&mut self, egui_ctx: egui::Context, ndb: Ndb) { - if self.cmd_tx.is_some() { - return; - } - - let (cmd_tx, cmd_rx) = chan::unbounded::<LoaderCmd>(); - let (msg_tx, msg_rx) = chan::unbounded::<LoaderMsg>(); - - self.cmd_tx = Some(cmd_tx.clone()); - self.msg_rx = Some(msg_rx); - - spawn_worker(egui_ctx, ndb, cmd_rx, msg_tx); + let _ = self + .loader + .start(egui_ctx, ndb, 1, "messages-loader", handle_cmd); } /// Request a conversation list load for the given account. pub fn load_conversation_list(&self, account_pubkey: Pubkey) { - let Some(tx) = &self.cmd_tx else { - return; - }; - - let _ = tx.send(LoaderCmd::LoadConversationList { account_pubkey }); + self.loader + .send(LoaderCmd::LoadConversationList { account_pubkey }); } /// Request a conversation message load for the given conversation. @@ -88,11 +75,7 @@ impl MessagesLoader { participants: Vec<Pubkey>, me: Pubkey, ) { - let Some(tx) = &self.cmd_tx else { - return; - }; - - let _ = tx.send(LoaderCmd::LoadConversationMessages { + self.loader.send(LoaderCmd::LoadConversationMessages { conversation_id, participants, me, @@ -101,11 +84,7 @@ impl MessagesLoader { /// Try to receive the next loader message without blocking. pub fn try_recv(&self) -> Option<LoaderMsg> { - let Some(rx) = &self.msg_rx else { - return None; - }; - - rx.try_recv().ok() + self.loader.try_recv() } } @@ -163,44 +142,6 @@ impl FoldAcc { } } -/// Spawn the background loader worker thread. -fn spawn_worker( - egui_ctx: egui::Context, - ndb: Ndb, - cmd_rx: chan::Receiver<LoaderCmd>, - msg_tx: chan::Sender<LoaderMsg>, -) { - thread::Builder::new() - .name("messages-loader".into()) - .spawn(move || { - while let Ok(cmd) = cmd_rx.recv() { - let result = match cmd { - LoaderCmd::LoadConversationList { account_pubkey } => { - load_conversation_list(&egui_ctx, &ndb, &msg_tx, account_pubkey) - } - LoaderCmd::LoadConversationMessages { - conversation_id, - participants, - me, - } => load_conversation_messages( - &egui_ctx, - &ndb, - &msg_tx, - conversation_id, - participants, - me, - ), - }; - - if let Err(err) = result { - let _ = msg_tx.send(LoaderMsg::Failed(err)); - egui_ctx.request_repaint(); - } - } - }) - .expect("failed to spawn messages loader thread"); -} - /// Run a conversation list load and stream note keys. fn load_conversation_list( egui_ctx: &egui::Context, @@ -215,6 +156,30 @@ fn load_conversation_list( Ok(()) } +/// Handle loader commands on a worker thread. +fn handle_cmd( + cmd: LoaderCmd, + egui_ctx: &egui::Context, + ndb: &Ndb, + msg_tx: &chan::Sender<LoaderMsg>, +) { + let result = match cmd { + LoaderCmd::LoadConversationList { account_pubkey } => { + load_conversation_list(egui_ctx, ndb, msg_tx, account_pubkey) + } + LoaderCmd::LoadConversationMessages { + conversation_id, + participants, + me, + } => load_conversation_messages(egui_ctx, ndb, msg_tx, conversation_id, participants, me), + }; + + if let Err(err) = result { + let _ = msg_tx.send(LoaderMsg::Failed(err)); + egui_ctx.request_repaint(); + } +} + /// Run a conversation messages load and stream note keys. fn load_conversation_messages( egui_ctx: &egui::Context,