notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit d3ddba250a669bfbcb84e39b14d9ac5308745dd0
parent 7372860e2daade9cf49e620c0f1a1b33e796cbd8
Author: kernelkind <kernelkind@gmail.com>
Date:   Tue, 16 Dec 2025 14:33:16 -0500

refactor(process-event): extract shared logic to notedeck crate

Signed-off-by: kernelkind <kernelkind@gmail.com>

Diffstat:
Mcrates/enostr/src/lib.rs | 2+-
Mcrates/notedeck/src/app.rs | 94++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mcrates/notedeck/src/lib.rs | 2+-
Mcrates/notedeck/src/unknowns.rs | 12++++++++++++
Mcrates/notedeck_columns/src/app.rs | 138++++++++++++++++++-------------------------------------------------------------
5 files changed, 138 insertions(+), 110 deletions(-)

diff --git a/crates/enostr/src/lib.rs b/crates/enostr/src/lib.rs @@ -17,7 +17,7 @@ pub use note::{Note, NoteId}; pub use profile::ProfileState; pub use pubkey::{Pubkey, PubkeyRef}; pub use relay::message::{RelayEvent, RelayMessage}; -pub use relay::pool::{PoolEvent, PoolRelay, RelayPool}; +pub use relay::pool::{PoolEvent, PoolEventBuf, PoolRelay, RelayPool}; pub use relay::subs_debug::{OwnedRelayEvent, RelayLogEvent, SubsDebug, TransferStats}; pub use relay::{Relay, RelayStatus}; diff --git a/crates/notedeck/src/app.rs b/crates/notedeck/src/app.rs @@ -1,6 +1,7 @@ use crate::account::FALLBACK_PUBKEY; use crate::i18n::Localization; use crate::persist::{AppSizeHandler, SettingsHandler}; +use crate::unknowns::unknown_id_send; use crate::wallet::GlobalWallet; use crate::zaps::Zaps; use crate::NotedeckOptions; @@ -13,7 +14,7 @@ use crate::{JobPool, MediaJobs}; use egui::Margin; use egui::ThemePreference; use egui_winit::clipboard::Clipboard; -use enostr::RelayPool; +use enostr::{PoolEventBuf, PoolRelay, RelayEvent, RelayMessage, RelayPool}; use nostrdb::{Config, Ndb, Transaction}; use std::cell::RefCell; use std::collections::BTreeSet; @@ -426,3 +427,94 @@ pub fn install_crypto() { let provider = rustls::crypto::aws_lc_rs::default_provider(); let _ = provider.install_default(); } + +pub fn try_process_events_core( + app_ctx: &mut AppContext<'_>, + ctx: &egui::Context, + mut receive: impl FnMut(&mut AppContext, PoolEventBuf), +) { + let ctx2 = ctx.clone(); + let wakeup = move || { + ctx2.request_repaint(); + }; + + app_ctx.pool.keepalive_ping(wakeup); + + // NOTE: we don't use the while let loop due to borrow issues + #[allow(clippy::while_let_loop)] + loop { + profiling::scope!("receiving events"); + let ev = if let Some(ev) = app_ctx.pool.try_recv() { + ev.into_owned() + } else { + break; + }; + + match (&ev.event).into() { + RelayEvent::Opened => { + tracing::trace!("Opened relay {}", ev.relay); + app_ctx + .accounts + .send_initial_filters(app_ctx.pool, &ev.relay); + } + RelayEvent::Closed => tracing::warn!("{} connection closed", &ev.relay), + RelayEvent::Other(msg) => { + tracing::trace!("relay {} sent other event {:?}", ev.relay, &msg) + } + RelayEvent::Error(error) => error!("relay {} had error: {error:?}", &ev.relay), + RelayEvent::Message(msg) => { + process_message_core(app_ctx, &ev.relay, &msg); + } + } + + receive(app_ctx, ev); + } + + if app_ctx.unknown_ids.ready_to_send() { + unknown_id_send(app_ctx.unknown_ids, app_ctx.pool); + } +} + +fn process_message_core(ctx: &mut AppContext<'_>, relay: &str, msg: &RelayMessage) { + match msg { + RelayMessage::Event(_subid, ev) => { + let relay = if let Some(relay) = ctx.pool.relays.iter().find(|r| r.url() == relay) { + relay + } else { + error!("couldn't find relay {} for note processing!?", relay); + return; + }; + + match relay { + PoolRelay::Websocket(_) => { + //info!("processing event {}", event); + tracing::trace!("processing event {ev}"); + if let Err(err) = ctx.ndb.process_event_with( + ev, + nostrdb::IngestMetadata::new() + .client(false) + .relay(relay.url()), + ) { + error!("error processing event {ev}: {err}"); + } + } + PoolRelay::Multicast(_) => { + // multicast events are client events + if let Err(err) = ctx.ndb.process_event_with( + ev, + nostrdb::IngestMetadata::new() + .client(true) + .relay(relay.url()), + ) { + error!("error processing multicast event {ev}: {err}"); + } + } + } + } + RelayMessage::Notice(msg) => tracing::warn!("Notice from {}: {}", relay, msg), + RelayMessage::OK(cr) => info!("OK {:?}", cr), + RelayMessage::Eose(id) => { + tracing::trace!("Relay {} received eose: {id}", relay) + } + } +} diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs @@ -47,7 +47,7 @@ pub use account::accounts::{AccountData, AccountSubs, Accounts}; pub use account::contacts::{ContactState, IsFollowing}; pub use account::relay::RelayAction; pub use account::FALLBACK_PUBKEY; -pub use app::{App, AppAction, AppResponse, Notedeck}; +pub use app::{try_process_events_core, App, AppAction, AppResponse, Notedeck}; pub use args::Args; pub use context::{AppContext, SoftKeyboardContext}; pub use error::{show_one_error_message, Error, FilterError, ZapError}; diff --git a/crates/notedeck/src/unknowns.rs b/crates/notedeck/src/unknowns.rs @@ -383,3 +383,15 @@ fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option<Vec<Filter>> { Some(filters) } + +pub fn unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut enostr::RelayPool) { + tracing::debug!("unknown_id_send called on: {:?}", &unknown_ids); + let filter = unknown_ids.filter().expect("filter"); + tracing::debug!( + "Getting {} unknown ids from relays", + unknown_ids.ids_iter().len() + ); + let msg = enostr::ClientMessage::req("unknownids".to_string(), filter); + unknown_ids.clear(); + pool.send(&msg); +} diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs @@ -17,12 +17,12 @@ use crate::{ Result, }; use egui_extras::{Size, StripBuilder}; -use enostr::{ClientMessage, PoolRelay, Pubkey, RelayEvent, RelayMessage, RelayPool}; +use enostr::{ClientMessage, Pubkey, RelayEvent, RelayMessage}; use nostrdb::Transaction; use notedeck::{ - tr, ui::is_narrow, Accounts, AppAction, AppContext, AppResponse, DataPath, DataPathType, - FilterState, Images, Localization, MediaJobSender, NotedeckOptions, SettingsHandler, - UnknownIds, + tr, try_process_events_core, ui::is_narrow, Accounts, AppAction, AppContext, AppResponse, + DataPath, DataPathType, FilterState, Images, Localization, MediaJobSender, NotedeckOptions, + SettingsHandler, }; use notedeck_ui::{ media::{MediaViewer, MediaViewerFlags, MediaViewerState}, @@ -30,7 +30,7 @@ use notedeck_ui::{ }; use std::collections::{BTreeSet, HashMap}; use std::path::Path; -use tracing::{debug, error, info, trace, warn}; +use tracing::{error, info, warn}; use uuid::Uuid; #[derive(Debug, Eq, PartialEq, Clone)] @@ -161,47 +161,22 @@ fn try_process_event( get_active_columns_mut(app_ctx.i18n, app_ctx.accounts, &mut damus.decks_cache); ctx.input(|i| handle_egui_events(i, current_columns, damus.hovered_column)); - let ctx2 = ctx.clone(); - let wakeup = move || { - ctx2.request_repaint(); - }; - - app_ctx.pool.keepalive_ping(wakeup); - - // NOTE: we don't use the while let loop due to borrow issues - #[allow(clippy::while_let_loop)] - loop { - profiling::scope!("receiving events"); - let ev = if let Some(ev) = app_ctx.pool.try_recv() { - ev.into_owned() - } else { - break; - }; - - match (&ev.event).into() { - RelayEvent::Opened => { - app_ctx - .accounts - .send_initial_filters(app_ctx.pool, &ev.relay); - - timeline::send_initial_timeline_filters( - damus.options.contains(AppOptions::SinceOptimize), - &mut damus.timeline_cache, - &mut damus.subscriptions, - app_ctx.pool, - &ev.relay, - app_ctx.accounts, - ); - } - // TODO: handle reconnects - RelayEvent::Closed => warn!("{} connection closed", &ev.relay), - RelayEvent::Error(e) => error!("{}: {}", &ev.relay, e), - RelayEvent::Other(msg) => trace!("other event {:?}", &msg), - RelayEvent::Message(msg) => { - process_message(damus, app_ctx, &ev.relay, &msg); - } + try_process_events_core(app_ctx, ctx, |app_ctx, ev| match (&ev.event).into() { + RelayEvent::Opened => { + timeline::send_initial_timeline_filters( + damus.options.contains(AppOptions::SinceOptimize), + &mut damus.timeline_cache, + &mut damus.subscriptions, + app_ctx.pool, + &ev.relay, + app_ctx.accounts, + ); } - } + RelayEvent::Message(msg) => { + process_message(damus, app_ctx, &ev.relay, &msg); + } + _ => {} + }); for (kind, timeline) in &mut damus.timeline_cache { let is_ready = timeline::is_timeline_ready( @@ -239,25 +214,9 @@ fn try_process_event( follow_packs.poll_for_notes(app_ctx.ndb, app_ctx.unknown_ids); } - if app_ctx.unknown_ids.ready_to_send() { - unknown_id_send(app_ctx.unknown_ids, app_ctx.pool); - } - Ok(()) } -fn unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut RelayPool) { - debug!("unknown_id_send called on: {:?}", &unknown_ids); - let filter = unknown_ids.filter().expect("filter"); - debug!( - "Getting {} unknown ids from relays", - unknown_ids.ids_iter().len() - ); - let msg = ClientMessage::req("unknownids".to_string(), filter); - unknown_ids.clear(); - pool.send(&msg); -} - #[profiling::function] fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Context) { app_ctx.img_cache.urls.cache.handle_io(); @@ -381,53 +340,18 @@ fn handle_eose( } fn process_message(damus: &mut Damus, ctx: &mut AppContext<'_>, relay: &str, msg: &RelayMessage) { - match msg { - RelayMessage::Event(_subid, ev) => { - let relay = if let Some(relay) = ctx.pool.relays.iter().find(|r| r.url() == relay) { - relay - } else { - error!("couldn't find relay {} for note processing!?", relay); - return; - }; + let RelayMessage::Eose(sid) = msg else { + return; + }; - match relay { - PoolRelay::Websocket(_) => { - //info!("processing event {}", event); - if let Err(err) = ctx.ndb.process_event_with( - ev, - nostrdb::IngestMetadata::new() - .client(false) - .relay(relay.url()), - ) { - error!("error processing event {ev}: {err}"); - } - } - PoolRelay::Multicast(_) => { - // multicast events are client events - if let Err(err) = ctx.ndb.process_event_with( - ev, - nostrdb::IngestMetadata::new() - .client(true) - .relay(relay.url()), - ) { - error!("error processing multicast event {ev}: {err}"); - } - } - } - } - RelayMessage::Notice(msg) => warn!("Notice from {}: {}", relay, msg), - RelayMessage::OK(cr) => info!("OK {:?}", cr), - RelayMessage::Eose(sid) => { - if let Err(err) = handle_eose( - &damus.subscriptions, - &mut damus.timeline_cache, - ctx, - sid, - relay, - ) { - error!("error handling eose: {}", err); - } - } + if let Err(err) = handle_eose( + &damus.subscriptions, + &mut damus.timeline_cache, + ctx, + sid, + relay, + ) { + error!("error handling eose: {}", err); } }