notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 3c31e1a651ed0b1ca5cc2b01e1bf94296da24fe9
parent faa40bb6163e5c1883e861bb5fe717adef2060e7
Author: kernelkind <kernelkind@gmail.com>
Date:   Mon, 16 Jun 2025 17:21:50 -0400

add `ThreadSubs` for managing local & remote subscriptions

Signed-off-by: kernelkind <kernelkind@gmail.com>

Diffstat:
MCargo.lock | 1+
Mcrates/notedeck_columns/Cargo.toml | 1+
Mcrates/notedeck_columns/src/multi_subscriber.rs | 264++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
3 files changed, 265 insertions(+), 1 deletion(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -3303,6 +3303,7 @@ dependencies = [ "egui_virtual_list", "ehttp", "enostr", + "hashbrown", "hex", "human_format", "image", diff --git a/crates/notedeck_columns/Cargo.toml b/crates/notedeck_columns/Cargo.toml @@ -51,6 +51,7 @@ sha2 = { workspace = true } base64 = { workspace = true } egui-winit = { workspace = true } profiling = { workspace = true } +hashbrown = { workspace = true } human_format = "1.1.0" [target.'cfg(any(target_os = "windows", target_os = "macos", target_os = "linux"))'.dependencies] diff --git a/crates/notedeck_columns/src/multi_subscriber.rs b/crates/notedeck_columns/src/multi_subscriber.rs @@ -1,8 +1,12 @@ -use enostr::{Filter, RelayPool}; +use egui_nav::ReturnType; +use enostr::{Filter, NoteId, RelayPool}; +use hashbrown::HashMap; use nostrdb::{Ndb, Subscription}; use tracing::{error, info}; use uuid::Uuid; +use crate::timeline::ThreadSelection; + #[derive(Debug)] pub struct MultiSubscriber { pub filters: Vec<Filter>, @@ -143,3 +147,261 @@ impl MultiSubscriber { } } } + +type RootNoteId = NoteId; + +#[derive(Default)] +pub struct ThreadSubs { + pub remotes: HashMap<RootNoteId, Remote>, + scopes: HashMap<MetaId, Vec<Scope>>, +} + +// column id +type MetaId = usize; + +pub struct Remote { + pub filter: Vec<Filter>, + subid: String, + dependers: usize, +} + +impl std::fmt::Debug for Remote { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Remote") + .field("subid", &self.subid) + .field("dependers", &self.dependers) + .finish() + } +} + +struct Scope { + pub root_id: NoteId, + stack: Vec<Sub>, +} + +pub struct Sub { + pub selected_id: NoteId, + pub sub: Subscription, + pub filter: Vec<Filter>, +} + +impl ThreadSubs { + #[allow(clippy::too_many_arguments)] + pub fn subscribe( + &mut self, + ndb: &mut Ndb, + pool: &mut RelayPool, + meta_id: usize, + id: &ThreadSelection, + local_sub_filter: Vec<Filter>, + new_scope: bool, + remote_sub_filter: impl FnOnce() -> Vec<Filter>, + ) { + let cur_scopes = self.scopes.entry(meta_id).or_default(); + + let new_subs = if new_scope || cur_scopes.is_empty() { + local_sub_new_scope(ndb, id, local_sub_filter, cur_scopes) + } else { + let cur_scope = cur_scopes.last_mut().expect("can't be empty"); + sub_current_scope(ndb, id, local_sub_filter, cur_scope) + }; + + let remote = match self.remotes.raw_entry_mut().from_key(&id.root_id.bytes()) { + hashbrown::hash_map::RawEntryMut::Occupied(entry) => entry.into_mut(), + hashbrown::hash_map::RawEntryMut::Vacant(entry) => { + let (_, res) = entry.insert( + NoteId::new(*id.root_id.bytes()), + sub_remote(pool, remote_sub_filter, id), + ); + + res + } + }; + + remote.dependers = remote.dependers.saturating_add_signed(new_subs); + let num_dependers = remote.dependers; + tracing::info!( + "Sub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}", + self.remotes.len(), + self.scopes.len(), + num_dependers, + ); + } + + pub fn unsubscribe( + &mut self, + ndb: &mut Ndb, + pool: &mut RelayPool, + meta_id: usize, + id: &ThreadSelection, + return_type: ReturnType, + ) { + let Some(scopes) = self.scopes.get_mut(&meta_id) else { + return; + }; + + let Some(remote) = self.remotes.get_mut(&id.root_id.bytes()) else { + tracing::error!("somehow we're unsubscribing but we don't have a remote"); + return; + }; + + match return_type { + ReturnType::Drag => { + if let Some(scope) = scopes.last_mut() { + let Some(cur_sub) = scope.stack.pop() else { + tracing::error!("expected a scope to be left"); + return; + }; + + if cur_sub.selected_id.bytes() != id.selected_or_root() { + tracing::error!("Somehow the current scope's root is not equal to the selected note's root"); + } + + if ndb_unsub(ndb, cur_sub.sub, id) { + remote.dependers = remote.dependers.saturating_sub(1); + } + + if scope.stack.is_empty() { + scopes.pop(); + } + } + } + ReturnType::Click => { + let Some(scope) = scopes.pop() else { + tracing::error!("called unsubscribe but there aren't any scopes left"); + return; + }; + + for sub in scope.stack { + if sub.selected_id.bytes() != id.selected_or_root() { + tracing::error!("Somehow the current scope's root is not equal to the selected note's root"); + } + + if ndb_unsub(ndb, sub.sub, id) { + remote.dependers = remote.dependers.saturating_sub(1); + } + } + } + } + + if scopes.is_empty() { + self.scopes.remove(&meta_id); + } + + let num_dependers = remote.dependers; + + if remote.dependers == 0 { + let remote = self + .remotes + .remove(&id.root_id.bytes()) + .expect("code above should guarentee existence"); + tracing::info!("Remotely unsubscribed: {}", remote.subid); + pool.unsubscribe(remote.subid); + } + + tracing::info!( + "unsub stats: num remotes: {}, num locals: {}, num remote dependers: {:?}", + self.remotes.len(), + self.scopes.len(), + num_dependers, + ); + } + + pub fn get_local(&self, meta_id: usize) -> Option<&Sub> { + self.scopes + .get(&meta_id) + .as_ref() + .and_then(|s| s.last()) + .and_then(|s| s.stack.last()) + } +} + +fn sub_current_scope( + ndb: &mut Ndb, + selection: &ThreadSelection, + local_sub_filter: Vec<Filter>, + cur_scope: &mut Scope, +) -> isize { + let mut new_subs = 0; + + if selection.root_id.bytes() != cur_scope.root_id.bytes() { + tracing::error!( + "Somehow the current scope's root is not equal to the selected note's root" + ); + } + + if let Some(sub) = ndb_sub(ndb, &local_sub_filter, selection) { + cur_scope.stack.push(Sub { + selected_id: NoteId::new(*selection.selected_or_root()), + sub, + filter: local_sub_filter, + }); + new_subs += 1; + } + + new_subs +} + +fn ndb_sub(ndb: &Ndb, filter: &[Filter], id: impl std::fmt::Debug) -> Option<Subscription> { + match ndb.subscribe(filter) { + Ok(s) => Some(s), + Err(e) => { + tracing::info!("Failed to get subscription for {:?}: {e}", id); + None + } + } +} + +fn ndb_unsub(ndb: &mut Ndb, sub: Subscription, id: impl std::fmt::Debug) -> bool { + match ndb.unsubscribe(sub) { + Ok(_) => true, + Err(e) => { + tracing::info!("Failed to unsub {:?}: {e}", id); + false + } + } +} + +fn sub_remote( + pool: &mut RelayPool, + remote_sub_filter: impl FnOnce() -> Vec<Filter>, + id: impl std::fmt::Debug, +) -> Remote { + let subid = Uuid::new_v4().to_string(); + + let filter = remote_sub_filter(); + + let remote = Remote { + filter: filter.clone(), + subid: subid.clone(), + dependers: 0, + }; + + tracing::info!("Remote subscribe for {:?}", id); + + pool.subscribe(subid, filter); + + remote +} + +fn local_sub_new_scope( + ndb: &mut Ndb, + id: &ThreadSelection, + local_sub_filter: Vec<Filter>, + scopes: &mut Vec<Scope>, +) -> isize { + let Some(sub) = ndb_sub(ndb, &local_sub_filter, id) else { + return 0; + }; + + scopes.push(Scope { + root_id: id.root_id.to_note_id(), + stack: vec![Sub { + selected_id: NoteId::new(*id.selected_or_root()), + sub, + filter: local_sub_filter, + }], + }); + + 1 +}