notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 93468bb882e7b6f9dda730e45fd21b9eb2443f53
parent 50da963f76f5862c70b23c20a63b9575ed820f87
Author: William Casarin <jb55@jb55.com>
Date:   Wed, 25 Feb 2026 12:19:11 -0800

messages: load conversations via async ndb.fold

Diffstat:
MCargo.lock | 1+
Mcrates/notedeck_messages/Cargo.toml | 1+
Mcrates/notedeck_messages/src/cache/conversation.rs | 8++++++++
Mcrates/notedeck_messages/src/lib.rs | 177+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------
Acrates/notedeck_messages/src/loader.rs | 275+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcrates/notedeck_messages/src/nav.rs | 108+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------
6 files changed, 522 insertions(+), 48 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -4700,6 +4700,7 @@ name = "notedeck_messages" version = "0.7.1" dependencies = [ "chrono", + "crossbeam-channel", "egui", "egui_extras", "egui_nav", diff --git a/crates/notedeck_messages/Cargo.toml b/crates/notedeck_messages/Cargo.toml @@ -21,3 +21,4 @@ notedeck_ui = { workspace = true } chrono = { workspace = true } nostr = { workspace = true } egui_nav = { workspace = true } +crossbeam-channel = { workspace = true } diff --git a/crates/notedeck_messages/src/cache/conversation.rs b/crates/notedeck_messages/src/cache/conversation.rs @@ -351,9 +351,17 @@ impl ConversationMetadata { } } +/// Tracks the conversation list initialization and subscription lifecycle. #[derive(Default)] pub enum ConversationListState { + /// No loader request has been issued yet. #[default] Initializing, + /// Loader is streaming the initial conversation list. + Loading { + /// Optional live subscription for incoming conversation updates. + subscription: Option<Subscription>, + }, + /// Initial load completed; subscription remains active if available. Initialized(Option<Subscription>), // conversation list filter } diff --git a/crates/notedeck_messages/src/lib.rs b/crates/notedeck_messages/src/lib.rs @@ -1,11 +1,12 @@ pub mod cache; pub mod convo_renderable; +pub mod loader; pub mod nav; pub mod nip17; pub mod ui; use enostr::Pubkey; -use hashbrown::HashMap; +use hashbrown::{HashMap, HashSet}; use nav::{process_messages_ui_response, Route}; use nostrdb::{Subscription, Transaction}; use notedeck::{ @@ -14,14 +15,21 @@ use notedeck::{ use crate::{ cache::{ConversationCache, ConversationListState, ConversationStates}, + loader::{LoaderMsg, MessagesLoader}, nip17::conversation_filter, ui::{login_nsec_prompt, messages::messages_ui}, }; +/// Max loader messages to process per frame to avoid UI stalls. +const MAX_LOADER_MSGS_PER_FRAME: usize = 8; + +/// Messages application state and background loaders. pub struct MessagesApp { messages: ConversationsCtx, states: ConversationStates, router: Router<Route>, + loader: MessagesLoader, + inflight_messages: HashSet<cache::ConversationId>, } impl MessagesApp { @@ -30,6 +38,8 @@ impl MessagesApp { messages: ConversationsCtx::default(), states: ConversationStates::default(), router: Router::new(vec![Route::ConvoList]), + loader: MessagesLoader::new(), + inflight_messages: HashSet::new(), } } } @@ -50,6 +60,8 @@ impl App for MessagesApp { return AppResponse::none(); }; + self.loader.start(ui.ctx().clone(), ctx.ndb.clone()); + 's: { let Some(secret) = &ctx.accounts.get_selected_account().key.secret_key else { break 's; @@ -77,7 +89,14 @@ impl App for MessagesApp { } match cache.state { - ConversationListState::Initializing => initialize(ctx, cache, is_narrow(ui.ctx())), + ConversationListState::Initializing => { + initialize(ctx, cache, is_narrow(ui.ctx()), &self.loader); + } + ConversationListState::Loading { subscription } => { + if let Some(sub) = subscription { + update_initialized(ctx, cache, sub); + } + } ConversationListState::Initialized(subscription) => 's: { let Some(sub) = subscription else { break 's; @@ -86,6 +105,14 @@ impl App for MessagesApp { } } + handle_loader_messages( + ctx, + cache, + &self.loader, + &mut self.inflight_messages, + is_narrow(ui.ctx()), + ); + let selected_pubkey = ctx.accounts.selected_account_pubkey(); let contacts_state = ctx @@ -107,35 +134,28 @@ impl App for MessagesApp { contacts_state, ctx.i18n, ); - let action = - process_messages_ui_response(resp, ctx, cache, &mut self.router, is_narrow(ui.ctx())); + let action = process_messages_ui_response( + resp, + ctx, + cache, + &mut self.router, + is_narrow(ui.ctx()), + &self.loader, + &mut self.inflight_messages, + ); AppResponse::action(action) } } +/// Start the conversation list loader and subscription for the active account. #[profiling::function] -fn initialize(ctx: &mut AppContext, cache: &mut ConversationCache, is_narrow: bool) { - let txn = Transaction::new(ctx.ndb).expect("txn"); - cache.init_conversations( - ctx.ndb, - &txn, - ctx.accounts.selected_account_pubkey(), - &mut *ctx.note_cache, - &mut *ctx.unknown_ids, - ); - if !is_narrow { - if let Some(first) = cache.first_convo_id() { - cache.open_conversation( - ctx.ndb, - &txn, - first, - ctx.note_cache, - ctx.unknown_ids, - ctx.accounts.selected_account_pubkey(), - ); - } - } +fn initialize( + ctx: &mut AppContext, + cache: &mut ConversationCache, + is_narrow: bool, + loader: &MessagesLoader, +) { let sub = match ctx .ndb .subscribe(&conversation_filter(ctx.accounts.selected_account_pubkey())) @@ -147,9 +167,15 @@ fn initialize(ctx: &mut AppContext, cache: &mut ConversationCache, is_narrow: bo } }; - cache.state = ConversationListState::Initialized(sub); + loader.load_conversation_list(*ctx.accounts.selected_account_pubkey()); + cache.state = ConversationListState::Loading { subscription: sub }; + + if !is_narrow { + cache.active = None; + } } +/// Poll the live subscription for new conversation notes. #[profiling::function] fn update_initialized(ctx: &mut AppContext, cache: &mut ConversationCache, sub: Subscription) { let notes = ctx.ndb.poll_for_notes(sub, 10); @@ -166,6 +192,105 @@ fn update_initialized(ctx: &mut AppContext, cache: &mut ConversationCache, sub: } } +/// Drain loader messages and apply updates to the conversation cache. +#[profiling::function] +fn handle_loader_messages( + ctx: &mut AppContext<'_>, + cache: &mut ConversationCache, + loader: &MessagesLoader, + inflight_messages: &mut HashSet<cache::ConversationId>, + is_narrow: bool, +) { + let mut handled = 0; + while handled < MAX_LOADER_MSGS_PER_FRAME { + let Some(msg) = loader.try_recv() else { + break; + }; + handled += 1; + + match msg { + LoaderMsg::ConversationBatch(keys) => { + ingest_note_keys(ctx, cache, &keys); + } + LoaderMsg::ConversationFinished => { + let current = + std::mem::replace(&mut cache.state, ConversationListState::Initializing); + cache.state = match current { + ConversationListState::Loading { subscription } => { + ConversationListState::Initialized(subscription) + } + other => other, + }; + + if cache.active.is_none() && !is_narrow { + if let Some(first) = cache.first_convo_id() { + cache.active = Some(first); + request_conversation_messages( + cache, + ctx.accounts.selected_account_pubkey(), + first, + loader, + inflight_messages, + ); + } + } + } + LoaderMsg::ConversationMessagesBatch { keys, .. } => { + ingest_note_keys(ctx, cache, &keys); + } + LoaderMsg::ConversationMessagesFinished { conversation_id } => { + inflight_messages.remove(&conversation_id); + } + LoaderMsg::Failed(err) => { + tracing::error!("messages loader error: {err}"); + } + } + } +} + +/// Lookup note keys in NostrDB and ingest them into the conversation cache. +fn ingest_note_keys( + ctx: &mut AppContext<'_>, + cache: &mut ConversationCache, + keys: &[nostrdb::NoteKey], +) { + let txn = Transaction::new(ctx.ndb).expect("txn"); + for key in keys { + let note = match ctx.ndb.get_note_by_key(&txn, *key) { + Ok(n) => n, + Err(e) => { + tracing::error!("could not find note key: {e}"); + continue; + } + }; + cache.ingest_chatroom_msg(note, *key, ctx.ndb, &txn, ctx.note_cache, ctx.unknown_ids); + } +} + +/// Schedule a background load for a conversation's message history. +fn request_conversation_messages( + cache: &ConversationCache, + me: &Pubkey, + conversation_id: cache::ConversationId, + loader: &MessagesLoader, + inflight_messages: &mut HashSet<cache::ConversationId>, +) { + if inflight_messages.contains(&conversation_id) { + return; + } + + let Some(conversation) = cache.get(conversation_id) else { + return; + }; + + inflight_messages.insert(conversation_id); + loader.load_conversation_messages( + conversation_id, + conversation.metadata.participants.clone(), + *me, + ); +} + /// Storage for conversations per account. Account management is performed by `Accounts` #[derive(Default)] struct ConversationsCtx { diff --git a/crates/notedeck_messages/src/loader.rs b/crates/notedeck_messages/src/loader.rs @@ -0,0 +1,275 @@ +//! Background loader for Messages NostrDB queries. + +use std::thread; + +use crossbeam_channel as chan; +use enostr::Pubkey; +use nostrdb::{Filter, Ndb, NoteKey, Transaction}; + +use crate::{ + cache::ConversationId, + nip17::{chatroom_filter, conversation_filter}, +}; + +const FOLD_BATCH_SIZE: usize = 100; + +/// Commands sent to the messages loader worker thread. +pub enum LoaderCmd { + /// Load conversation list note keys for the current account. + LoadConversationList { account_pubkey: Pubkey }, + /// Load note keys for a specific conversation. + LoadConversationMessages { + conversation_id: ConversationId, + participants: Vec<Pubkey>, + me: Pubkey, + }, +} + +/// Messages emitted by the loader worker thread. +pub enum LoaderMsg { + /// Batch of conversation list note keys. + ConversationBatch(Vec<NoteKey>), + /// Conversation list load finished. + ConversationFinished, + /// Batch of note keys for a conversation. + ConversationMessagesBatch { + conversation_id: ConversationId, + keys: Vec<NoteKey>, + }, + /// Conversation messages load finished. + ConversationMessagesFinished { conversation_id: ConversationId }, + /// Loader error. + Failed(String), +} + +/// Handle for driving the messages loader worker thread. +pub struct MessagesLoader { + cmd_tx: Option<chan::Sender<LoaderCmd>>, + msg_rx: Option<chan::Receiver<LoaderMsg>>, +} + +impl MessagesLoader { + /// Create an uninitialized loader handle. + pub fn new() -> Self { + Self { + cmd_tx: None, + msg_rx: None, + } + } + + /// Start the loader thread if it has not been started yet. + pub fn start(&mut self, egui_ctx: egui::Context, ndb: Ndb) { + if self.cmd_tx.is_some() { + return; + } + + let (cmd_tx, cmd_rx) = chan::unbounded::<LoaderCmd>(); + let (msg_tx, msg_rx) = chan::unbounded::<LoaderMsg>(); + + self.cmd_tx = Some(cmd_tx.clone()); + self.msg_rx = Some(msg_rx); + + spawn_worker(egui_ctx, ndb, cmd_rx, msg_tx); + } + + /// Request a conversation list load for the given account. + pub fn load_conversation_list(&self, account_pubkey: Pubkey) { + let Some(tx) = &self.cmd_tx else { + return; + }; + + let _ = tx.send(LoaderCmd::LoadConversationList { account_pubkey }); + } + + /// Request a conversation message load for the given conversation. + pub fn load_conversation_messages( + &self, + conversation_id: ConversationId, + participants: Vec<Pubkey>, + me: Pubkey, + ) { + let Some(tx) = &self.cmd_tx else { + return; + }; + + let _ = tx.send(LoaderCmd::LoadConversationMessages { + conversation_id, + participants, + me, + }); + } + + /// Try to receive the next loader message without blocking. + pub fn try_recv(&self) -> Option<LoaderMsg> { + let Some(rx) = &self.msg_rx else { + return None; + }; + + rx.try_recv().ok() + } +} + +impl Default for MessagesLoader { + fn default() -> Self { + Self::new() + } +} + +#[derive(Clone, Copy)] +/// Internal fold target kind for note key batches. +enum FoldKind { + ConversationList, + ConversationMessages { conversation_id: ConversationId }, +} + +/// Fold accumulator for batching note keys. +struct FoldAcc { + batch: Vec<NoteKey>, + msg_tx: chan::Sender<LoaderMsg>, + egui_ctx: egui::Context, + kind: FoldKind, +} + +impl FoldAcc { + fn push_key(&mut self, key: NoteKey) -> Result<(), String> { + self.batch.push(key); + if self.batch.len() >= FOLD_BATCH_SIZE { + self.flush()?; + } + Ok(()) + } + + fn flush(&mut self) -> Result<(), String> { + if self.batch.is_empty() { + return Ok(()); + } + + let keys = std::mem::take(&mut self.batch); + let msg = match self.kind { + FoldKind::ConversationList => LoaderMsg::ConversationBatch(keys), + FoldKind::ConversationMessages { conversation_id } => { + LoaderMsg::ConversationMessagesBatch { + conversation_id, + keys, + } + } + }; + + self.msg_tx + .send(msg) + .map_err(|_| "messages loader channel closed".to_string())?; + self.egui_ctx.request_repaint(); + Ok(()) + } +} + +/// Spawn the background loader worker thread. +fn spawn_worker( + egui_ctx: egui::Context, + ndb: Ndb, + cmd_rx: chan::Receiver<LoaderCmd>, + msg_tx: chan::Sender<LoaderMsg>, +) { + thread::Builder::new() + .name("messages-loader".into()) + .spawn(move || { + while let Ok(cmd) = cmd_rx.recv() { + let result = match cmd { + LoaderCmd::LoadConversationList { account_pubkey } => { + load_conversation_list(&egui_ctx, &ndb, &msg_tx, account_pubkey) + } + LoaderCmd::LoadConversationMessages { + conversation_id, + participants, + me, + } => load_conversation_messages( + &egui_ctx, + &ndb, + &msg_tx, + conversation_id, + participants, + me, + ), + }; + + if let Err(err) = result { + let _ = msg_tx.send(LoaderMsg::Failed(err)); + egui_ctx.request_repaint(); + } + } + }) + .expect("failed to spawn messages loader thread"); +} + +/// Run a conversation list load and stream note keys. +fn load_conversation_list( + egui_ctx: &egui::Context, + ndb: &Ndb, + msg_tx: &chan::Sender<LoaderMsg>, + account_pubkey: Pubkey, +) -> Result<(), String> { + let filters = conversation_filter(&account_pubkey); + fold_note_keys(egui_ctx, ndb, msg_tx, &filters, FoldKind::ConversationList)?; + let _ = msg_tx.send(LoaderMsg::ConversationFinished); + egui_ctx.request_repaint(); + Ok(()) +} + +/// Run a conversation messages load and stream note keys. +fn load_conversation_messages( + egui_ctx: &egui::Context, + ndb: &Ndb, + msg_tx: &chan::Sender<LoaderMsg>, + conversation_id: ConversationId, + participants: Vec<Pubkey>, + me: Pubkey, +) -> Result<(), String> { + let participant_bytes: Vec<[u8; 32]> = participants.iter().map(|p| *p.bytes()).collect(); + let participant_refs: Vec<&[u8; 32]> = participant_bytes.iter().collect(); + let filters = chatroom_filter(participant_refs, me.bytes()); + + fold_note_keys( + egui_ctx, + ndb, + msg_tx, + &filters, + FoldKind::ConversationMessages { conversation_id }, + )?; + + let _ = msg_tx.send(LoaderMsg::ConversationMessagesFinished { conversation_id }); + egui_ctx.request_repaint(); + Ok(()) +} + +/// Fold over NostrDB results and emit note key batches. +fn fold_note_keys( + egui_ctx: &egui::Context, + ndb: &Ndb, + msg_tx: &chan::Sender<LoaderMsg>, + filters: &[Filter], + kind: FoldKind, +) -> Result<(), String> { + let txn = Transaction::new(ndb).map_err(|e| e.to_string())?; + + let acc = FoldAcc { + batch: Vec::with_capacity(FOLD_BATCH_SIZE), + msg_tx: msg_tx.clone(), + egui_ctx: egui_ctx.clone(), + kind, + }; + + let acc = ndb + .fold(&txn, filters, acc, |mut acc, note| { + if let Some(key) = note.key() { + if let Err(err) = acc.push_key(key) { + tracing::error!("messages loader flush error: {err}"); + } + } + acc + }) + .map_err(|e| e.to_string())?; + + let mut acc = acc; + acc.flush()?; + Ok(()) +} diff --git a/crates/notedeck_messages/src/nav.rs b/crates/notedeck_messages/src/nav.rs @@ -1,12 +1,13 @@ use egui_nav::{NavAction, NavResponse}; use enostr::Pubkey; -use nostrdb::Transaction; +use hashbrown::HashSet; use notedeck::{AppAction, AppContext, ReplacementType, Router}; use crate::{ cache::{ ConversationCache, ConversationId, ConversationIdentifierUnowned, ParticipantSetUnowned, }, + loader::MessagesLoader, nip17::send_conversation_message, }; @@ -37,35 +38,65 @@ pub struct MessagesUiResponse { pub conversation_panel_response: Option<MessagesAction>, } +/// Apply UI responses and navigation actions to the messages router. pub fn process_messages_ui_response( resp: MessagesUiResponse, ctx: &mut AppContext, cache: &mut ConversationCache, router: &mut Router<Route>, is_narrow: bool, + loader: &MessagesLoader, + inflight_messages: &mut HashSet<ConversationId>, ) -> Option<AppAction> { let mut action = None; if let Some(convo_resp) = resp.conversation_panel_response { - action = handle_messages_action(convo_resp, ctx, cache, router, is_narrow); + action = handle_messages_action( + convo_resp, + ctx, + cache, + router, + is_narrow, + loader, + inflight_messages, + ); } let Some(nav) = resp.nav_response else { return action; }; - action.or(process_nav_resp(nav, ctx, cache, router, is_narrow)) + action.or(process_nav_resp( + nav, + ctx, + cache, + router, + is_narrow, + loader, + inflight_messages, + )) } +/// Handle navigation responses emitted by the messages UI. fn process_nav_resp( nav: NavResponse<Option<MessagesAction>>, ctx: &mut AppContext, cache: &mut ConversationCache, router: &mut Router<Route>, is_narrow: bool, + loader: &MessagesLoader, + inflight_messages: &mut HashSet<ConversationId>, ) -> Option<AppAction> { let mut app_action = None; if let Some(action) = nav.response.or(nav.title_response) { - app_action = handle_messages_action(action, ctx, cache, router, is_narrow); + app_action = handle_messages_action( + action, + ctx, + cache, + router, + is_narrow, + loader, + inflight_messages, + ); } let Some(action) = nav.action else { @@ -94,12 +125,15 @@ fn process_nav_resp( app_action } +/// Execute a messages action and return an optional app action. fn handle_messages_action( action: MessagesAction, ctx: &mut AppContext<'_>, cache: &mut ConversationCache, router: &mut Router<Route>, is_narrow: bool, + loader: &MessagesLoader, + inflight_messages: &mut HashSet<ConversationId>, ) -> Option<AppAction> { let mut app_action = None; match action { @@ -107,9 +141,15 @@ fn handle_messages_action( conversation_id, content, } => send_conversation_message(conversation_id, content, cache, ctx), - MessagesAction::Open(conversation_id) => { - open_coversation_action(conversation_id, ctx, cache, router, is_narrow); - } + MessagesAction::Open(conversation_id) => open_coversation_action( + conversation_id, + ctx, + cache, + router, + is_narrow, + loader, + inflight_messages, + ), MessagesAction::Create { recipient } => { let selected = ctx.accounts.selected_account_pubkey(); let participants = vec![recipient.bytes(), selected.bytes()]; @@ -120,15 +160,13 @@ fn handle_messages_action( )); cache.initialize_conversation(id, vec![recipient, *selected]); - - let txn = Transaction::new(ctx.ndb).expect("txn"); - cache.open_conversation( - ctx.ndb, - &txn, - id, - ctx.note_cache, - ctx.unknown_ids, + cache.active = Some(id); + request_conversation_messages( + cache, ctx.accounts.selected_account_pubkey(), + id, + loader, + inflight_messages, ); if is_narrow { @@ -149,24 +187,50 @@ fn handle_messages_action( app_action } +/// Activate a conversation and request its message history. fn open_coversation_action( id: ConversationId, ctx: &mut AppContext<'_>, cache: &mut ConversationCache, router: &mut Router<Route>, is_narrow: bool, + loader: &MessagesLoader, + inflight_messages: &mut HashSet<ConversationId>, ) { - let txn = Transaction::new(ctx.ndb).expect("txn"); - cache.open_conversation( - ctx.ndb, - &txn, - id, - ctx.note_cache, - ctx.unknown_ids, + cache.active = Some(id); + request_conversation_messages( + cache, ctx.accounts.selected_account_pubkey(), + id, + loader, + inflight_messages, ); if is_narrow { router.route_to(Route::Conversation); } } + +/// Schedule a background load for a conversation if it is not already in flight. +fn request_conversation_messages( + cache: &ConversationCache, + me: &Pubkey, + conversation_id: ConversationId, + loader: &MessagesLoader, + inflight_messages: &mut HashSet<ConversationId>, +) { + if inflight_messages.contains(&conversation_id) { + return; + } + + let Some(conversation) = cache.get(conversation_id) else { + return; + }; + + inflight_messages.insert(conversation_id); + loader.load_conversation_messages( + conversation_id, + conversation.metadata.participants.clone(), + *me, + ); +}