commit 4922bcd48b52377ff8b1ef20ab292872f9c5a161
parent 86398fcd1263fce10f0c70de2f902ad9a5a0ee3d
Author: kernelkind <kernelkind@gmail.com>
Date: Wed, 25 Feb 2026 19:00:43 -0500
refactor: move legacy event processing to dave
everywhere else uses outbox. Dave depends on negentropy which depends on
the old RelayPool, and that is too big of a change to rebase it into
Outbox
Signed-off-by: kernelkind <kernelkind@gmail.com>
Diffstat:
7 files changed, 133 insertions(+), 119 deletions(-)
diff --git a/crates/notedeck/src/app.rs b/crates/notedeck/src/app.rs
@@ -15,10 +15,7 @@ use crate::{NotedeckOptions, ScopedSubsState};
use egui::Margin;
use egui::ThemePreference;
use egui_winit::clipboard::Clipboard;
-use enostr::{
- OutboxPool, OutboxSession, OutboxSessionHandler, PoolEventBuf, PoolRelay, RelayEvent,
- RelayMessage, RelayPool,
-};
+use enostr::{OutboxPool, OutboxSession, OutboxSessionHandler, RelayPool};
use nostrdb::{Config, Ndb, Transaction};
use std::cell::RefCell;
use std::collections::BTreeSet;
@@ -159,6 +156,14 @@ impl eframe::App for Notedeck {
app_ctx.remote.process_events(ctx, app_ctx.ndb);
+ {
+ profiling::scope!("unknown id");
+ if app_ctx.unknown_ids.ready_to_send() {
+ let mut oneshot = app_ctx.remote.oneshot(app_ctx.accounts);
+ unknown_id_send(app_ctx.unknown_ids, &mut oneshot);
+ }
+ }
+
render_notedeck(app, &mut app_ctx, ctx);
{
@@ -527,105 +532,6 @@ impl<'a> NotedeckInternals<'a> {
}
}
-#[profiling::function]
-pub fn try_process_events_core(
- app_ctx: &mut AppContext<'_>,
- ctx: &egui::Context,
- mut receive: impl FnMut(&mut AppContext, PoolEventBuf),
-) {
- let ctx2 = ctx.clone();
- let wakeup = move || {
- ctx2.request_repaint();
- };
-
- app_ctx.legacy_pool.keepalive_ping(wakeup);
-
- // NOTE: we don't use the while let loop due to borrow issues
- #[allow(clippy::while_let_loop)]
- loop {
- let ev = if let Some(ev) = app_ctx.legacy_pool.try_recv() {
- ev.into_owned()
- } else {
- break;
- };
-
- match (&ev.event).into() {
- RelayEvent::Opened => {
- tracing::trace!("Opened relay {}", ev.relay);
- app_ctx
- .accounts
- .send_initial_filters(app_ctx.legacy_pool, &ev.relay);
- }
- RelayEvent::Closed => tracing::warn!("{} connection closed", &ev.relay),
- RelayEvent::Other(msg) => {
- tracing::trace!("relay {} sent other event {:?}", ev.relay, &msg)
- }
- RelayEvent::Error(error) => error!("relay {} had error: {error:?}", &ev.relay),
- RelayEvent::Message(msg) => {
- process_message_core(app_ctx, &ev.relay, &msg);
- }
- }
-
- receive(app_ctx, ev);
- }
-
- if app_ctx.unknown_ids.ready_to_send() {
- unknown_id_send(app_ctx.unknown_ids, app_ctx.legacy_pool);
- }
-}
-
-#[profiling::function]
-fn process_message_core(ctx: &mut AppContext<'_>, relay: &str, msg: &RelayMessage) {
- match msg {
- RelayMessage::Event(_subid, ev) => {
- let relay =
- if let Some(relay) = ctx.legacy_pool.relays.iter().find(|r| r.url() == relay) {
- relay
- } else {
- error!("couldn't find relay {} for note processing!?", relay);
- return;
- };
-
- match relay {
- PoolRelay::Websocket(_) => {
- //info!("processing event {}", event);
- tracing::trace!("processing event {ev}");
- if let Err(err) = ctx.ndb.process_event_with(
- ev,
- nostrdb::IngestMetadata::new()
- .client(false)
- .relay(relay.url()),
- ) {
- error!("error processing event {ev}: {err}");
- }
- }
- PoolRelay::Multicast(_) => {
- // multicast events are client events
- if let Err(err) = ctx.ndb.process_event_with(
- ev,
- nostrdb::IngestMetadata::new()
- .client(true)
- .relay(relay.url()),
- ) {
- error!("error processing multicast event {ev}: {err}");
- }
- }
- }
- }
- RelayMessage::Notice(msg) => tracing::warn!("Notice from {}: {}", relay, msg),
- RelayMessage::OK(cr) => info!("OK {:?}", cr),
- RelayMessage::Eose(id) => {
- tracing::trace!("Relay {} received eose: {id}", relay)
- }
- RelayMessage::Closed(sid, reason) => {
- tracing::trace!(
- "Relay {} with sub {sid} received close because: {reason}",
- relay
- );
- }
- }
-}
-
/// If a compacted database exists at `{dbpath}/compact/`, swap it into place
/// before opening ndb. This replaces the main data.mdb with the compacted one.
fn try_swap_compacted_db(dbpath: &str) {
diff --git a/crates/notedeck/src/lib.rs b/crates/notedeck/src/lib.rs
@@ -57,7 +57,7 @@ pub use account::accounts::{AccountData, AccountSubs, Accounts};
pub use account::contacts::{ContactState, IsFollowing};
pub use account::relay::RelayAction;
pub use account::FALLBACK_PUBKEY;
-pub use app::{try_process_events_core, App, AppAction, AppResponse, Notedeck};
+pub use app::{App, AppAction, AppResponse, Notedeck};
pub use args::Args;
pub use async_loader::{worker_count, AsyncLoader};
pub use context::{AppContext, SoftKeyboardContext};
@@ -115,7 +115,9 @@ pub use time::{
is_future_timestamp, time_ago_since, time_format, unix_time_secs, MAX_FUTURE_NOTE_SKEW_SECS,
};
pub use timecache::TimeCached;
-pub use unknowns::{get_unknown_note_ids, NoteRefsUnkIdAction, SingleUnkIdAction, UnknownIds};
+pub use unknowns::{
+ get_unknown_note_ids, unknown_id_send, NoteRefsUnkIdAction, SingleUnkIdAction, UnknownIds,
+};
pub use urls::{supported_mime_hosted_at_url, SupportedMimeType, UrlMimes};
pub use user_account::UserAccount;
pub use wallet::{
diff --git a/crates/notedeck/src/unknowns.rs b/crates/notedeck/src/unknowns.rs
@@ -1,7 +1,7 @@
use crate::{
note::NoteRef,
notecache::{CachedNote, NoteCache},
- Result,
+ OneshotApi, Result,
};
use enostr::{Filter, NoteId, Pubkey};
@@ -385,14 +385,14 @@ fn get_unknown_ids_filter(ids: &[&UnknownId]) -> Option<Vec<Filter>> {
Some(filters)
}
-pub fn unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut enostr::RelayPool) {
+pub fn unknown_id_send(unknown_ids: &mut UnknownIds, oneshot: &mut OneshotApi<'_, '_>) {
tracing::debug!("unknown_id_send called on: {:?}", &unknown_ids);
let filter = unknown_ids.filter().expect("filter");
tracing::debug!(
"Getting {} unknown ids from relays",
unknown_ids.ids_iter().len()
);
- let msg = enostr::ClientMessage::req("unknownids".to_string(), filter);
+
+ oneshot.oneshot(filter);
unknown_ids.clear();
- pool.send(&msg);
}
diff --git a/crates/notedeck_dashboard/src/lib.rs b/crates/notedeck_dashboard/src/lib.rs
@@ -7,7 +7,7 @@ use std::time::{Duration, Instant};
use crossbeam_channel as chan;
use nostrdb::{Filter, Ndb, Transaction};
-use notedeck::{AppContext, AppResponse, try_process_events_core};
+use notedeck::{AppContext, AppResponse};
use chrono::{Datelike, TimeZone, Utc};
@@ -245,8 +245,6 @@ impl Default for Dashboard {
impl notedeck::App for Dashboard {
fn update(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse {
- try_process_events_core(ctx, ui.ctx(), |_, _| {});
-
if !self.initialized {
self.initialized = true;
self.init(ui.ctx().clone(), ctx);
diff --git a/crates/notedeck_dave/src/events.rs b/crates/notedeck_dave/src/events.rs
@@ -0,0 +1,109 @@
+use enostr::{PoolEventBuf, PoolRelay, RelayEvent, RelayMessage};
+use notedeck::{AppContext, UnknownIds};
+use tracing::{error, info};
+
+pub fn try_process_events_core(
+ app_ctx: &mut AppContext<'_>,
+ ctx: &egui::Context,
+ mut receive: impl FnMut(&mut AppContext, PoolEventBuf),
+) {
+ let ctx2 = ctx.clone();
+ let wakeup = move || {
+ ctx2.request_repaint();
+ };
+
+ app_ctx.legacy_pool.keepalive_ping(wakeup);
+
+ // NOTE: we don't use the while let loop due to borrow issues
+ #[allow(clippy::while_let_loop)]
+ loop {
+ let ev = if let Some(ev) = app_ctx.legacy_pool.try_recv() {
+ ev.into_owned()
+ } else {
+ break;
+ };
+
+ match (&ev.event).into() {
+ RelayEvent::Opened => {
+ tracing::trace!("Opened relay {}", ev.relay);
+ }
+ RelayEvent::Closed => tracing::warn!("{} connection closed", &ev.relay),
+ RelayEvent::Other(msg) => {
+ tracing::trace!("relay {} sent other event {:?}", ev.relay, &msg)
+ }
+ RelayEvent::Error(error) => error!("relay {} had error: {error:?}", &ev.relay),
+ RelayEvent::Message(msg) => {
+ process_message_core(app_ctx, &ev.relay, &msg);
+ }
+ }
+
+ receive(app_ctx, ev);
+ }
+
+ if app_ctx.unknown_ids.ready_to_send() {
+ pool_unknown_id_send(app_ctx.unknown_ids, app_ctx.legacy_pool);
+ }
+}
+
+fn process_message_core(ctx: &mut AppContext<'_>, relay: &str, msg: &RelayMessage) {
+ match msg {
+ RelayMessage::Event(_subid, ev) => {
+ let relay =
+ if let Some(relay) = ctx.legacy_pool.relays.iter().find(|r| r.url() == relay) {
+ relay
+ } else {
+ error!("couldn't find relay {} for note processing!?", relay);
+ return;
+ };
+
+ match relay {
+ PoolRelay::Websocket(_) => {
+ //info!("processing event {}", event);
+ tracing::trace!("processing event {ev}");
+ if let Err(err) = ctx.ndb.process_event_with(
+ ev,
+ nostrdb::IngestMetadata::new()
+ .client(false)
+ .relay(relay.url()),
+ ) {
+ error!("error processing event {ev}: {err}");
+ }
+ }
+ PoolRelay::Multicast(_) => {
+ // multicast events are client events
+ if let Err(err) = ctx.ndb.process_event_with(
+ ev,
+ nostrdb::IngestMetadata::new()
+ .client(true)
+ .relay(relay.url()),
+ ) {
+ error!("error processing multicast event {ev}: {err}");
+ }
+ }
+ }
+ }
+ RelayMessage::Notice(msg) => tracing::warn!("Notice from {}: {}", relay, msg),
+ RelayMessage::OK(cr) => info!("OK {:?}", cr),
+ RelayMessage::Eose(id) => {
+ tracing::trace!("Relay {} received eose: {id}", relay)
+ }
+ RelayMessage::Closed(sid, reason) => {
+ tracing::trace!(
+ "Relay {} with sub {sid} received close because: {reason}",
+ relay
+ );
+ }
+ }
+}
+
+fn pool_unknown_id_send(unknown_ids: &mut UnknownIds, pool: &mut enostr::RelayPool) {
+ tracing::debug!("unknown_id_send called on: {:?}", &unknown_ids);
+ let filter = unknown_ids.filter().expect("filter");
+ tracing::debug!(
+ "Getting {} unknown ids from relays",
+ unknown_ids.ids_iter().len()
+ );
+ let msg = enostr::ClientMessage::req("unknownids".to_string(), filter);
+ unknown_ids.clear();
+ pool.send(&msg);
+}
diff --git a/crates/notedeck_dave/src/lib.rs b/crates/notedeck_dave/src/lib.rs
@@ -3,6 +3,7 @@ mod auto_accept;
mod avatar;
mod backend;
pub mod config;
+pub mod events;
pub mod file_update;
mod focus_queue;
pub(crate) mod git_status;
@@ -34,8 +35,8 @@ use enostr::KeypairUnowned;
use focus_queue::FocusQueue;
use nostrdb::{Subscription, Transaction};
use notedeck::{
- timed_serializer::TimedSerializer, try_process_events_core, ui::is_narrow, AppAction,
- AppContext, AppResponse, DataPath, DataPathType,
+ timed_serializer::TimedSerializer, ui::is_narrow, AppAction, AppContext, AppResponse, DataPath,
+ DataPathType,
};
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
@@ -207,6 +208,8 @@ pub struct Dave {
use update::PermissionPublish;
+use crate::events::try_process_events_core;
+
/// Info captured from a session before deletion, for publishing a "deleted" state event.
struct DeletedSessionInfo {
claude_session_id: String,
diff --git a/crates/notedeck_messages/src/lib.rs b/crates/notedeck_messages/src/lib.rs
@@ -9,9 +9,7 @@ use enostr::Pubkey;
use hashbrown::{HashMap, HashSet};
use nav::{process_messages_ui_response, Route};
use nostrdb::{Subscription, Transaction};
-use notedeck::{
- try_process_events_core, ui::is_narrow, Accounts, App, AppContext, AppResponse, Router,
-};
+use notedeck::{ui::is_narrow, Accounts, App, AppContext, AppResponse, Router};
use crate::{
cache::{ConversationCache, ConversationListState, ConversationStates},
@@ -53,8 +51,6 @@ impl Default for MessagesApp {
impl App for MessagesApp {
#[profiling::function]
fn update(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse {
- try_process_events_core(ctx, ui.ctx(), |_, _| {});
-
let Some(cache) = self.messages.get_current_mut(ctx.accounts) else {
login_nsec_prompt(ui, ctx.i18n);
return AppResponse::none();