notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 18d0cff28692d0a79714d6e3f1076dee36f0074d
parent 89ab059eba6ad3ea792112b457c2b6fa7d9b3994
Author: kernelkind <kernelkind@gmail.com>
Date:   Wed, 25 Feb 2026 15:20:25 -0500

refactor(timeline-subs): key local timeline subs by account

Convert TimelineSub bookkeeping from a single shared state machine to per-account state while preserving the existing RelayPool-backed remote subscription behavior.

This makes local subscription lifetimes, dependers, and timeline cache open/pop/insert flows account-aware, and threads the selected account key through timeline polling/setup call paths.

Scoped-sub remote timeline ownership remains a follow-up refactor.

Signed-off-by: kernelkind <kernelkind@gmail.com>

Diffstat:
Mcrates/notedeck_columns/src/actionbar.rs | 20++++++++++++++++++--
Mcrates/notedeck_columns/src/app.rs | 19++++++++++++++++++-
Mcrates/notedeck_columns/src/column.rs | 12+++++++-----
Mcrates/notedeck_columns/src/decks.rs | 12++++++++----
Mcrates/notedeck_columns/src/nav.rs | 15++++++++++++---
Mcrates/notedeck_columns/src/route.rs | 4+++-
Mcrates/notedeck_columns/src/storage/decks.rs | 2+-
Mcrates/notedeck_columns/src/timeline/cache.rs | 32+++++++++++++++++++++-----------
Mcrates/notedeck_columns/src/timeline/mod.rs | 40++++++++++++++++++++++++++++++++--------
Mcrates/notedeck_columns/src/timeline/sub/timeline_sub.rs | 423++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Mcrates/notedeck_columns/src/ui/add_column.rs | 13+++++++++----
Mcrates/notedeck_columns/src/ui/profile/mod.rs | 1+
12 files changed, 365 insertions(+), 228 deletions(-)

diff --git a/crates/notedeck_columns/src/actionbar.rs b/crates/notedeck_columns/src/actionbar.rs @@ -122,7 +122,15 @@ fn execute_note_action( let kind = TimelineKind::Profile(pubkey); router_action = Some(RouterAction::route_to(Route::Timeline(kind.clone()))); timeline_res = timeline_cache - .open(ndb, note_cache, txn, pool, &kind, false) + .open( + ndb, + note_cache, + txn, + *accounts.selected_account_pubkey(), + pool, + &kind, + false, + ) .map(NotesOpenResult::Timeline); } NoteAction::Note { @@ -160,7 +168,15 @@ fn execute_note_action( let kind = TimelineKind::Hashtag(vec![htag.clone()]); router_action = Some(RouterAction::route_to(Route::Timeline(kind.clone()))); timeline_res = timeline_cache - .open(ndb, note_cache, txn, pool, &kind, false) + .open( + ndb, + note_cache, + txn, + *accounts.selected_account_pubkey(), + pool, + &kind, + false, + ) .map(NotesOpenResult::Timeline); } NoteAction::Repost(note_id) => { diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs @@ -203,6 +203,7 @@ fn try_process_event( }); for (kind, timeline) in &mut damus.timeline_cache { + let selected_account_pk = *app_ctx.accounts.selected_account_pubkey(); let is_ready = timeline::is_timeline_ready( app_ctx.ndb, app_ctx.legacy_pool, @@ -218,12 +219,14 @@ fn try_process_event( app_ctx.ndb, kind, timeline, + app_ctx.accounts.selected_account_pubkey(), ); let txn = Transaction::new(app_ctx.ndb).expect("txn"); // only thread timelines are reversed let reversed = false; if let Err(err) = timeline.poll_notes_into_view( + &selected_account_pk, app_ctx.ndb, &txn, app_ctx.unknown_ids, @@ -279,6 +282,7 @@ fn schedule_timeline_load( ndb: &nostrdb::Ndb, kind: &TimelineKind, timeline: &mut timeline::Timeline, + account_pk: &Pubkey, ) { if loaded.contains(kind) || inflight.contains(kind) { return; @@ -289,7 +293,9 @@ fn schedule_timeline_load( }; if timeline.kind.should_subscribe_locally() { - timeline.subscription.try_add_local(ndb, &filter); + timeline + .subscription + .try_add_local(*account_pk, ndb, &filter); } loader.load_timeline(kind.clone()); @@ -353,6 +359,16 @@ fn update_damus(damus: &mut Damus, app_ctx: &mut AppContext<'_>, ctx: &egui::Con .subscriptions() .insert("unknownids".to_string(), SubKind::OneShot); + if let Err(err) = timeline::setup_initial_nostrdb_subs( + app_ctx.ndb, + app_ctx.note_cache, + &mut damus.timeline_cache, + app_ctx.unknown_ids, + *app_ctx.accounts.selected_account_pubkey(), + ) { + warn!("update_damus init: {err}"); + } + if !app_ctx.settings.welcome_completed() { let split = egui_nav::Split::PercentFromTop(egui_nav::Percent::new(40).expect("40 <= 100")); @@ -596,6 +612,7 @@ impl Damus { &txn, app_context.ndb, app_context.note_cache, + *app_context.accounts.selected_account_pubkey(), app_context.legacy_pool, &timeline_kind, ) { diff --git a/crates/notedeck_columns/src/column.rs b/crates/notedeck_columns/src/column.rs @@ -3,7 +3,7 @@ use crate::{ route::{ColumnsRouter, Route, SingletonRouter}, timeline::{Timeline, TimelineCache, TimelineKind}, }; -use enostr::RelayPool; +use enostr::{Pubkey, RelayPool}; use nostrdb::{Ndb, Transaction}; use notedeck::NoteCache; use std::iter::Iterator; @@ -101,18 +101,20 @@ impl Columns { SelectionResult::NewSelection(selected_index) } + #[allow(clippy::too_many_arguments)] pub fn add_new_timeline_column( &mut self, timeline_cache: &mut TimelineCache, txn: &Transaction, ndb: &Ndb, note_cache: &mut NoteCache, + account_pk: Pubkey, pool: &mut RelayPool, kind: &TimelineKind, ) -> Option<TimelineOpenResult> { self.columns .push(Column::new(vec![Route::timeline(kind.to_owned())])); - timeline_cache.open(ndb, note_cache, txn, pool, kind, false) + timeline_cache.open(ndb, note_cache, txn, account_pk, pool, kind, false) } pub fn new_column_picker(&mut self) { @@ -124,15 +126,15 @@ impl Columns { pub fn insert_intermediary_routes( &mut self, timeline_cache: &mut TimelineCache, + account_pk: Pubkey, intermediary_routes: Vec<IntermediaryRoute>, ) { let routes = intermediary_routes .into_iter() .map(|r| match r { - IntermediaryRoute::Timeline(mut timeline) => { + IntermediaryRoute::Timeline(timeline) => { let route = Route::timeline(timeline.kind.clone()); - timeline.subscription.increment(); - timeline_cache.insert(timeline.kind.clone(), *timeline); + timeline_cache.insert(timeline.kind.clone(), account_pk, *timeline); route } IntermediaryRoute::Route(route) => route, diff --git a/crates/notedeck_columns/src/decks.rs b/crates/notedeck_columns/src/decks.rs @@ -178,7 +178,7 @@ impl DecksCache { }; info!("Removing decks for {:?}", key); - decks.unsubscribe_all(timeline_cache, ndb, pool); + decks.unsubscribe_all(timeline_cache, ndb, *key, pool); if !self.account_to_decks.contains_key(&self.fallback_pubkey) { self.account_to_decks @@ -294,13 +294,14 @@ impl Decks { index: usize, timeline_cache: &mut TimelineCache, ndb: &mut nostrdb::Ndb, + account_pk: Pubkey, pool: &mut enostr::RelayPool, ) { let Some(deck) = self.remove_deck_internal(index) else { return; }; - delete_deck(deck, timeline_cache, ndb, pool); + delete_deck(deck, timeline_cache, ndb, account_pk, pool); } fn remove_deck_internal(&mut self, index: usize) -> Option<Deck> { @@ -357,10 +358,11 @@ impl Decks { self, timeline_cache: &mut TimelineCache, ndb: &mut nostrdb::Ndb, + account_pk: Pubkey, pool: &mut enostr::RelayPool, ) { for deck in self.decks { - delete_deck(deck, timeline_cache, ndb, pool); + delete_deck(deck, timeline_cache, ndb, account_pk, pool); } } } @@ -369,6 +371,7 @@ fn delete_deck( mut deck: Deck, timeline_cache: &mut TimelineCache, ndb: &mut nostrdb::Ndb, + account_pk: Pubkey, pool: &mut enostr::RelayPool, ) { let cols = deck.columns_mut(); @@ -377,7 +380,7 @@ fn delete_deck( let kinds_to_pop = cols.delete_column(i); for kind in &kinds_to_pop { - if let Err(err) = timeline_cache.pop(kind, ndb, pool) { + if let Err(err) = timeline_cache.pop(kind, account_pk, ndb, pool) { error!("error popping timeline: {err}"); } } @@ -461,6 +464,7 @@ pub fn add_demo_columns( &txn, ctx.ndb, ctx.note_cache, + *ctx.accounts.selected_account_pubkey(), ctx.legacy_pool, kind, ) { diff --git a/crates/notedeck_columns/src/nav.rs b/crates/notedeck_columns/src/nav.rs @@ -147,8 +147,11 @@ impl SwitchingAction { ColumnsAction::Remove(index) => { let kinds_to_pop = get_active_columns_mut(ctx.i18n, ctx.accounts, decks_cache) .delete_column(index); + let selected_account_pk = *ctx.accounts.selected_account_pubkey(); for kind in &kinds_to_pop { - if let Err(err) = timeline_cache.pop(kind, ctx.ndb, ctx.legacy_pool) { + if let Err(err) = + timeline_cache.pop(kind, selected_account_pk, ctx.ndb, ctx.legacy_pool) + { error!("error popping timeline: {err}"); } } @@ -167,6 +170,7 @@ impl SwitchingAction { index, timeline_cache, ctx.ndb, + *ctx.accounts.selected_account_pubkey(), ctx.legacy_pool, ); } @@ -412,6 +416,7 @@ fn handle_navigating_timeline( app: &mut Damus, col: usize, ) { + let account_pk = *accounts.selected_account_pubkey(); let kind = { let Route::Timeline(kind) = app.columns(accounts).column(col).router().top() else { return; @@ -426,7 +431,7 @@ fn handle_navigating_timeline( let txn = Transaction::new(ndb).expect("txn"); app.timeline_cache - .open(ndb, note_cache, &txn, pool, &kind, false); + .open(ndb, note_cache, &txn, account_pk, pool, &kind, false); } pub enum RouterAction { @@ -533,9 +538,13 @@ fn process_render_nav_action( RenderNavAction::PfpClicked => Some(RouterAction::PfpClicked), RenderNavAction::RemoveColumn => { let kinds_to_pop = app.columns_mut(ctx.i18n, ctx.accounts).delete_column(col); + let selected_account_pk = *ctx.accounts.selected_account_pubkey(); for kind in &kinds_to_pop { - if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, ctx.legacy_pool) { + if let Err(err) = + app.timeline_cache + .pop(kind, selected_account_pk, ctx.ndb, ctx.legacy_pool) + { error!("error popping timeline: {err}"); } } diff --git a/crates/notedeck_columns/src/route.rs b/crates/notedeck_columns/src/route.rs @@ -806,7 +806,9 @@ pub fn cleanup_popped_route( ) { match route { Route::Timeline(kind) => { - if let Err(err) = timeline_cache.pop(kind, ndb, pool) { + if let Err(err) = + timeline_cache.pop(kind, scoped_subs.selected_account_pubkey(), ndb, pool) + { tracing::error!("popping timeline had an error: {err} for {:?}", kind); } } diff --git a/crates/notedeck_columns/src/storage/decks.rs b/crates/notedeck_columns/src/storage/decks.rs @@ -331,7 +331,7 @@ fn deserialize_columns( match CleanIntermediaryRoute::parse(&mut parser, deck_user) { Ok(route_intermediary) => { if let Some(ir) = route_intermediary.into_intermediary_route(ndb) { - cols.insert_intermediary_routes(timeline_cache, vec![ir]); + cols.insert_intermediary_routes(timeline_cache, *deck_user, vec![ir]); } } Err(err) => { diff --git a/crates/notedeck_columns/src/timeline/cache.rs b/crates/notedeck_columns/src/timeline/cache.rs @@ -6,7 +6,7 @@ use crate::{ use notedeck::{filter, FilterState, NoteCache, NoteRef}; -use enostr::RelayPool; +use enostr::{Pubkey, RelayPool}; use nostrdb::{Filter, Ndb, Transaction}; use std::collections::HashMap; use tracing::{debug, error, info, warn}; @@ -53,6 +53,7 @@ impl TimelineCache { pub fn pop( &mut self, id: &TimelineKind, + account_pk: Pubkey, ndb: &mut Ndb, pool: &mut RelayPool, ) -> Result<(), Error> { @@ -62,9 +63,11 @@ impl TimelineCache { return Err(Error::TimelineNotFound); }; - timeline.subscription.unsubscribe_or_decrement(ndb, pool); + timeline + .subscription + .unsubscribe_or_decrement(account_pk, ndb, pool); - if timeline.subscription.no_sub() { + if !timeline.subscription.has_any_subs() { debug!( "popped last timeline {:?}, removing from timeline cache", id @@ -105,12 +108,13 @@ impl TimelineCache { res } - pub fn insert(&mut self, id: TimelineKind, timeline: Timeline) { + pub fn insert(&mut self, id: TimelineKind, account_pk: Pubkey, mut timeline: Timeline) { if let Some(cur_timeline) = self.timelines.get_mut(&id) { - cur_timeline.subscription.increment(); + cur_timeline.subscription.increment(account_pk); return; }; + timeline.subscription.increment(account_pk); self.timelines.insert(id, timeline); } @@ -177,11 +181,13 @@ impl TimelineCache { /// without running a blocking local query. Use this for startup paths /// where initial notes are loaded asynchronously. #[profiling::function] + #[allow(clippy::too_many_arguments)] pub fn open( &mut self, ndb: &Ndb, note_cache: &mut NoteCache, txn: &Transaction, + account_pk: Pubkey, pool: &mut RelayPool, id: &TimelineKind, load_local: bool, @@ -200,8 +206,10 @@ impl TimelineCache { if let Some(filter) = timeline.filter.get_any_ready() { debug!("got open with subscription for {:?}", &timeline.kind); - timeline.subscription.try_add_local(ndb, filter); - timeline.subscription.try_add_remote(pool, filter); + timeline.subscription.try_add_local(account_pk, ndb, filter); + timeline + .subscription + .try_add_remote(account_pk, pool, filter); } else { debug!( "open skipped subscription; filter not ready for {:?}", @@ -209,7 +217,7 @@ impl TimelineCache { ); } - timeline.subscription.increment(); + timeline.subscription.increment(account_pk); return None; } @@ -251,8 +259,10 @@ impl TimelineCache { if let Some(filter) = timeline.filter.get_any_ready() { debug!("got open with *new* subscription for {:?}", &timeline.kind); - timeline.subscription.try_add_local(ndb, filter); - timeline.subscription.try_add_remote(pool, filter); + timeline.subscription.try_add_local(account_pk, ndb, filter); + timeline + .subscription + .try_add_remote(account_pk, pool, filter); } else { // This should never happen reasoning, self.notes would have // failed above if the filter wasn't ready @@ -261,7 +271,7 @@ impl TimelineCache { ); }; - timeline.subscription.increment(); + timeline.subscription.increment(account_pk); if let Some(unknowns) = notes_resp.unknown_pks { match &mut open_result { diff --git a/crates/notedeck_columns/src/timeline/mod.rs b/crates/notedeck_columns/src/timeline/mod.rs @@ -516,6 +516,7 @@ impl Timeline { #[profiling::function] pub fn poll_notes_into_view( &mut self, + account_pk: &Pubkey, ndb: &Ndb, txn: &Transaction, unknown_ids: &mut UnknownIds, @@ -529,7 +530,7 @@ impl Timeline { let sub = self .subscription - .get_local() + .get_local(account_pk) .ok_or(Error::App(notedeck::Error::no_active_sub()))?; let new_note_ids = { @@ -632,9 +633,12 @@ pub fn setup_new_timeline( accounts: &Accounts, unknown_ids: &mut UnknownIds, ) { + let account_pk = *accounts.selected_account_pubkey(); // if we're ready, setup local subs if is_timeline_ready(ndb, pool, timeline, accounts) { - if let Err(err) = setup_timeline_nostrdb_sub(ndb, txn, note_cache, timeline, unknown_ids) { + if let Err(err) = + setup_timeline_nostrdb_sub(ndb, txn, note_cache, timeline, unknown_ids, account_pk) + { error!("setup_new_timeline: {err}"); } } @@ -642,7 +646,7 @@ pub fn setup_new_timeline( for relay in &mut pool.relays { send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts); } - timeline.subscription.increment(); + timeline.subscription.increment(account_pk); } /// Send initial filters for a specific relay. This typically gets called @@ -675,6 +679,7 @@ pub fn send_initial_timeline_filter( timeline: &mut Timeline, accounts: &Accounts, ) { + let account_pk = *accounts.selected_account_pubkey(); let filter_state = timeline.filter.get_mut(relay.url()); match filter_state { @@ -728,7 +733,7 @@ pub fn send_initial_timeline_filter( if let Err(err) = relay.subscribe(sub_id.clone(), new_filters.clone()) { error!("error subscribing: {err}"); } else { - timeline.subscription.force_add_remote(sub_id); + timeline.subscription.force_add_remote(account_pk, sub_id); } } @@ -812,10 +817,13 @@ fn setup_initial_timeline( note_cache: &mut NoteCache, unknown_ids: &mut UnknownIds, filters: &HybridFilter, + account_pk: Pubkey, ) -> Result<()> { // some timelines are one-shot and a refreshed, like last_per_pubkey algo feed if timeline.kind.should_subscribe_locally() { - timeline.subscription.try_add_local(ndb, filters); + timeline + .subscription + .try_add_local(account_pk, ndb, filters); } debug!( @@ -863,10 +871,13 @@ pub fn setup_initial_nostrdb_subs( note_cache: &mut NoteCache, timeline_cache: &mut TimelineCache, unknown_ids: &mut UnknownIds, + account_pk: Pubkey, ) -> Result<()> { for (_kind, timeline) in timeline_cache { let txn = Transaction::new(ndb).expect("txn"); - if let Err(err) = setup_timeline_nostrdb_sub(ndb, &txn, note_cache, timeline, unknown_ids) { + if let Err(err) = + setup_timeline_nostrdb_sub(ndb, &txn, note_cache, timeline, unknown_ids, account_pk) + { error!("setup_initial_nostrdb_subs: {err}"); } } @@ -880,6 +891,7 @@ fn setup_timeline_nostrdb_sub( note_cache: &mut NoteCache, timeline: &mut Timeline, unknown_ids: &mut UnknownIds, + account_pk: Pubkey, ) -> Result<()> { let filter_state = timeline .filter @@ -887,7 +899,15 @@ fn setup_timeline_nostrdb_sub( .ok_or(Error::App(notedeck::Error::empty_contact_list()))? .to_owned(); - setup_initial_timeline(ndb, txn, timeline, note_cache, unknown_ids, &filter_state)?; + setup_initial_timeline( + ndb, + txn, + timeline, + note_cache, + unknown_ids, + &filter_state, + account_pk, + )?; Ok(()) } @@ -1009,7 +1029,11 @@ pub fn is_timeline_ready( //let ck = &timeline.kind; //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); - timeline.subscription.try_add_remote(pool, &filter); + timeline.subscription.try_add_remote( + *accounts.selected_account_pubkey(), + pool, + &filter, + ); true } } diff --git a/crates/notedeck_columns/src/timeline/sub/timeline_sub.rs b/crates/notedeck_columns/src/timeline/sub/timeline_sub.rs @@ -1,6 +1,8 @@ use notedeck::{filter::HybridFilter, UnifiedSubscription}; +use enostr::Pubkey; use enostr::RelayPool; +use hashbrown::HashMap; use nostrdb::{Ndb, Subscription}; use crate::{subscriptions, timeline::sub::ndb_sub}; @@ -14,10 +16,14 @@ fn unsubscribe_local(ndb: &mut Ndb, local: Subscription, context: &str) -> bool true } -#[derive(Debug)] +/// Per-account timeline subscription state with ref-counting. +/// +/// This still manages legacy relay-pool remote subscriptions for now; scoped-sub +/// remote ownership is migrated in a follow-up refactor. +#[derive(Debug, Default)] pub struct TimelineSub { filter: Option<HybridFilter>, - state: SubState, + by_account: HashMap<Pubkey, SubState>, } #[derive(Debug, Clone)] @@ -39,213 +45,212 @@ enum SubState { }, } -impl Default for TimelineSub { +impl Default for SubState { fn default() -> Self { - Self { - state: SubState::NoSub { dependers: 0 }, - filter: None, - } + Self::NoSub { dependers: 0 } } } impl TimelineSub { - /// Reset the subscription state, properly unsubscribing from ndb and - /// relay pool before clearing. - /// - /// Used when the contact list changes and we need to rebuild the - /// timeline with a new filter. Preserves the depender count so that - /// shared subscription reference counting remains correct. - pub fn reset(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { - let before = self.state.clone(); - - let Some(dependers) = (match &self.state { - SubState::NoSub { dependers } => Some(*dependers), + fn state_for_account(&self, account_pk: &Pubkey) -> SubState { + self.by_account.get(account_pk).cloned().unwrap_or_default() + } + + fn set_state_for_account(&mut self, account_pk: Pubkey, state: SubState) { + if matches!(state, SubState::NoSub { dependers: 0 }) { + self.by_account.remove(&account_pk); + return; + } + + self.by_account.insert(account_pk, state); + } + + /// Reset one account's subscription state while preserving its depender count. + pub fn reset_for_account(&mut self, account_pk: Pubkey, ndb: &mut Ndb, pool: &mut RelayPool) { + let before = self.state_for_account(&account_pk); + let next = match before.clone() { + SubState::NoSub { dependers } => SubState::NoSub { dependers }, SubState::LocalOnly { local, dependers } => { - if !unsubscribe_local(ndb, *local, "TimelineSub::reset") { + if !unsubscribe_local(ndb, local, "TimelineSub::reset_for_account") { return; } - Some(*dependers) + SubState::NoSub { dependers } } - SubState::RemoteOnly { remote, dependers } => { - pool.unsubscribe(remote.to_owned()); - Some(*dependers) + pool.unsubscribe(remote); + SubState::NoSub { dependers } } - SubState::Unified { unified, dependers } => { - pool.unsubscribe(unified.remote.to_owned()); - if !unsubscribe_local(ndb, unified.local, "TimelineSub::reset") { - self.state = SubState::LocalOnly { - local: unified.local, - dependers: *dependers, - }; + pool.unsubscribe(unified.remote.clone()); + if !unsubscribe_local(ndb, unified.local, "TimelineSub::reset_for_account") { + self.set_state_for_account( + account_pk, + SubState::LocalOnly { + local: unified.local, + dependers, + }, + ); return; } - Some(*dependers) + SubState::NoSub { dependers } } - }) else { - return; }; - self.state = SubState::NoSub { dependers }; + self.set_state_for_account(account_pk, next); self.filter = None; - tracing::debug!("TimelineSub::reset: {:?} => {:?}", before, self.state); + tracing::debug!( + "TimelineSub::reset_for_account({account_pk:?}): {:?} => {:?}", + before, + self.state_for_account(&account_pk) + ); } - pub fn try_add_local(&mut self, ndb: &Ndb, filter: &HybridFilter) { - let before = self.state.clone(); - match &mut self.state { + pub fn try_add_local(&mut self, account_pk: Pubkey, ndb: &Ndb, filter: &HybridFilter) { + let before = self.state_for_account(&account_pk); + + let Some(next) = (match before.clone() { SubState::NoSub { dependers } => { let Some(sub) = ndb_sub(ndb, &filter.local().combined(), "") else { return; }; - self.filter = Some(filter.to_owned()); - self.state = SubState::LocalOnly { + Some(SubState::LocalOnly { local: sub, - dependers: *dependers, - } + dependers, + }) } - SubState::LocalOnly { - local: _, - dependers: _, - } => {} + SubState::LocalOnly { .. } => None, SubState::RemoteOnly { remote, dependers } => { let Some(local) = ndb_sub(ndb, &filter.local().combined(), "") else { return; }; - self.state = SubState::Unified { - unified: UnifiedSubscription { - local, - remote: remote.to_owned(), - }, - dependers: *dependers, - }; + Some(SubState::Unified { + unified: UnifiedSubscription { local, remote }, + dependers, + }) } - SubState::Unified { - unified: _, - dependers: _, - } => {} - } + SubState::Unified { .. } => None, + }) else { + return; + }; + + self.set_state_for_account(account_pk, next); + tracing::debug!( - "TimelineSub::try_add_local: {:?} => {:?}", + "TimelineSub::try_add_local({account_pk:?}): {:?} => {:?}", before, - self.state + self.state_for_account(&account_pk) ); } - pub fn force_add_remote(&mut self, subid: String) { - let before = self.state.clone(); - match &mut self.state { - SubState::NoSub { dependers } => { - self.state = SubState::RemoteOnly { + pub fn force_add_remote(&mut self, account_pk: Pubkey, subid: String) { + let before = self.state_for_account(&account_pk); + + let next = match before.clone() { + SubState::NoSub { dependers } => SubState::RemoteOnly { + remote: subid, + dependers, + }, + SubState::LocalOnly { local, dependers } => SubState::Unified { + unified: UnifiedSubscription { + local, remote: subid, - dependers: *dependers, - } - } - SubState::LocalOnly { local, dependers } => { - self.state = SubState::Unified { - unified: UnifiedSubscription { - local: *local, - remote: subid, - }, - dependers: *dependers, - } - } - SubState::RemoteOnly { - remote: _, - dependers: _, - } => {} - SubState::Unified { - unified: _, - dependers: _, - } => {} - } + }, + dependers, + }, + SubState::RemoteOnly { .. } | SubState::Unified { .. } => return, + }; + + self.set_state_for_account(account_pk, next); + tracing::debug!( - "TimelineSub::force_add_remote: {:?} => {:?}", + "TimelineSub::force_add_remote({account_pk:?}): {:?} => {:?}", before, - self.state + self.state_for_account(&account_pk) ); } - pub fn try_add_remote(&mut self, pool: &mut RelayPool, filter: &HybridFilter) { - let before = self.state.clone(); - match &mut self.state { + pub fn try_add_remote( + &mut self, + account_pk: Pubkey, + pool: &mut RelayPool, + filter: &HybridFilter, + ) { + let before = self.state_for_account(&account_pk); + + let next = match before.clone() { SubState::NoSub { dependers } => { let subid = subscriptions::new_sub_id(); pool.subscribe(subid.clone(), filter.remote().to_vec()); self.filter = Some(filter.to_owned()); - self.state = SubState::RemoteOnly { + SubState::RemoteOnly { remote: subid, - dependers: *dependers, - }; + dependers, + } } SubState::LocalOnly { local, dependers } => { let subid = subscriptions::new_sub_id(); pool.subscribe(subid.clone(), filter.remote().to_vec()); self.filter = Some(filter.to_owned()); - self.state = SubState::Unified { + SubState::Unified { unified: UnifiedSubscription { - local: *local, + local, remote: subid, }, - dependers: *dependers, + dependers, } } - SubState::RemoteOnly { - remote: _, - dependers: _, - } => {} - SubState::Unified { - unified: _, - dependers: _, - } => {} - } + SubState::RemoteOnly { .. } | SubState::Unified { .. } => return, + }; + + self.set_state_for_account(account_pk, next); + tracing::debug!( - "TimelineSub::try_add_remote: {:?} => {:?}", + "TimelineSub::try_add_remote({account_pk:?}): {:?} => {:?}", before, - self.state + self.state_for_account(&account_pk) ); } - pub fn increment(&mut self) { - let before = self.state.clone(); - match &mut self.state { - SubState::NoSub { dependers } => { - *dependers += 1; - } - SubState::LocalOnly { - local: _, - dependers, - } => { - *dependers += 1; - } - SubState::RemoteOnly { - remote: _, - dependers, - } => { - *dependers += 1; - } - SubState::Unified { - unified: _, - dependers, - } => { - *dependers += 1; - } - } + pub fn increment(&mut self, account_pk: Pubkey) { + let before = self.state_for_account(&account_pk); - tracing::debug!("TimelineSub::increment: {:?} => {:?}", before, self.state); + let next = match before.clone() { + SubState::NoSub { dependers } => SubState::NoSub { + dependers: dependers + 1, + }, + SubState::LocalOnly { local, dependers } => SubState::LocalOnly { + local, + dependers: dependers + 1, + }, + SubState::RemoteOnly { remote, dependers } => SubState::RemoteOnly { + remote, + dependers: dependers + 1, + }, + SubState::Unified { unified, dependers } => SubState::Unified { + unified, + dependers: dependers + 1, + }, + }; + + self.set_state_for_account(account_pk, next); + + tracing::debug!( + "TimelineSub::increment({account_pk:?}): {:?} => {:?}", + before, + self.state_for_account(&account_pk) + ); } - pub fn get_local(&self) -> Option<Subscription> { - match &self.state { + pub fn get_local(&self, account_pk: &Pubkey) -> Option<Subscription> { + match self.state_for_account(account_pk) { SubState::NoSub { dependers: _ } => None, SubState::LocalOnly { local, dependers: _, - } => Some(*local), + } => Some(local), SubState::RemoteOnly { remote: _, dependers: _, @@ -257,62 +262,97 @@ impl TimelineSub { } } - pub fn unsubscribe_or_decrement(&mut self, ndb: &mut Ndb, pool: &mut RelayPool) { - let before = self.state.clone(); - 's: { - match &mut self.state { - SubState::NoSub { dependers } => *dependers = dependers.saturating_sub(1), - SubState::LocalOnly { local, dependers } => { - if *dependers > 1 { - *dependers = dependers.saturating_sub(1); - break 's; - } + pub fn unsubscribe_or_decrement( + &mut self, + account_pk: Pubkey, + ndb: &mut Ndb, + pool: &mut RelayPool, + ) { + let before = self.state_for_account(&account_pk); - // Keep local state intact if NDB unsubscribe fails. - if !unsubscribe_local(ndb, *local, "TimelineSub::unsubscribe_or_decrement") { - break 's; - } + let next = match before.clone() { + SubState::NoSub { dependers } => SubState::NoSub { + dependers: dependers.saturating_sub(1), + }, + SubState::LocalOnly { local, dependers } => { + if dependers > 1 { + return self.set_and_log_after_decrement( + account_pk, + before, + SubState::LocalOnly { + local, + dependers: dependers.saturating_sub(1), + }, + ); + } - self.state = SubState::NoSub { dependers: 0 }; + // Keep local state intact if NDB unsubscribe fails. + if !unsubscribe_local(ndb, local, "TimelineSub::unsubscribe_or_decrement") { + return; } - SubState::RemoteOnly { remote, dependers } => { - if *dependers > 1 { - *dependers = dependers.saturating_sub(1); - break 's; - } - pool.unsubscribe(remote.to_owned()); + SubState::NoSub { dependers: 0 } + } + SubState::RemoteOnly { remote, dependers } => { + if dependers > 1 { + return self.set_and_log_after_decrement( + account_pk, + before, + SubState::RemoteOnly { + remote, + dependers: dependers.saturating_sub(1), + }, + ); + } - self.state = SubState::NoSub { dependers: 0 }; + pool.unsubscribe(remote); + SubState::NoSub { dependers: 0 } + } + SubState::Unified { unified, dependers } => { + if dependers > 1 { + return self.set_and_log_after_decrement( + account_pk, + before, + SubState::Unified { + unified, + dependers: dependers.saturating_sub(1), + }, + ); } - SubState::Unified { unified, dependers } => { - if *dependers > 1 { - *dependers = dependers.saturating_sub(1); - break 's; - } - pool.unsubscribe(unified.remote.to_owned()); + pool.unsubscribe(unified.remote.clone()); - // Remote is already gone above; fall back to local-only on NDB failure. - if !unsubscribe_local( - ndb, - unified.local, - "TimelineSub::unsubscribe_or_decrement", - ) { - self.state = SubState::LocalOnly { - local: unified.local, - dependers: *dependers, - } - } else { - self.state = SubState::NoSub { dependers: 0 }; + // Remote is already gone above; fall back to local-only on NDB failure. + if !unsubscribe_local(ndb, unified.local, "TimelineSub::unsubscribe_or_decrement") { + SubState::LocalOnly { + local: unified.local, + dependers, } + } else { + SubState::NoSub { dependers: 0 } } } - } + }; + + self.set_state_for_account(account_pk, next); tracing::debug!( - "TimelineSub::unsubscribe_or_decrement: {:?} => {:?}", + "TimelineSub::unsubscribe_or_decrement({account_pk:?}): {:?} => {:?}", before, - self.state + self.state_for_account(&account_pk) + ); + } + + fn set_and_log_after_decrement( + &mut self, + account_pk: Pubkey, + before: SubState, + next: SubState, + ) { + self.set_state_for_account(account_pk, next); + tracing::debug!( + "TimelineSub::unsubscribe_or_decrement({account_pk:?}): {:?} => {:?}", + before, + self.state_for_account(&account_pk) ); } @@ -320,25 +360,32 @@ impl TimelineSub { self.filter.as_ref() } - pub fn no_sub(&self) -> bool { - matches!(self.state, SubState::NoSub { dependers: _ }) + pub fn no_sub(&self, account_pk: &Pubkey) -> bool { + matches!( + self.state_for_account(account_pk), + SubState::NoSub { dependers: _ } + ) + } + + pub fn has_any_subs(&self) -> bool { + !self.by_account.is_empty() } - pub fn dependers(&self) -> usize { - match &self.state { - SubState::NoSub { dependers } => *dependers, + pub fn dependers(&self, account_pk: &Pubkey) -> usize { + match self.state_for_account(account_pk) { + SubState::NoSub { dependers } => dependers, SubState::LocalOnly { local: _, dependers, - } => *dependers, + } => dependers, SubState::RemoteOnly { remote: _, dependers, - } => *dependers, + } => dependers, SubState::Unified { unified: _, dependers, - } => *dependers, + } => dependers, } } } diff --git a/crates/notedeck_columns/src/ui/add_column.rs b/crates/notedeck_columns/src/ui/add_column.rs @@ -893,14 +893,15 @@ fn attach_timeline_column( col: usize, timeline_kind: TimelineKind, ) -> bool { + let account_pk = *ctx.accounts.selected_account_pubkey(); let already_open_for_account = app .timeline_cache .get(&timeline_kind) - .is_some_and(|timeline| timeline.subscription.dependers() > 0); + .is_some_and(|timeline| timeline.subscription.dependers(&account_pk) > 0); if already_open_for_account { if let Some(timeline) = app.timeline_cache.get_mut(&timeline_kind) { - timeline.subscription.increment(); + timeline.subscription.increment(account_pk); } app.columns_mut(ctx.i18n, ctx.accounts) @@ -935,7 +936,7 @@ fn attach_timeline_column( .column_mut(col) .router_mut() .route_to_replaced(Route::timeline(route_kind.clone())); - app.timeline_cache.insert(route_kind, timeline); + app.timeline_cache.insert(route_kind, account_pk, timeline); true } @@ -1156,7 +1157,11 @@ fn handle_create_people_list(app: &mut Damus, ctx: &mut AppContext<'_>, col: usi .router_mut() .route_to_replaced(Route::timeline(timeline.kind.clone())); - app.timeline_cache.insert(timeline.kind.clone(), timeline); + app.timeline_cache.insert( + timeline.kind.clone(), + *ctx.accounts.selected_account_pubkey(), + timeline, + ); } pub fn hashtag_ui( diff --git a/crates/notedeck_columns/src/ui/profile/mod.rs b/crates/notedeck_columns/src/ui/profile/mod.rs @@ -105,6 +105,7 @@ impl<'a, 'd> ProfileView<'a, 'd> { let reversed = false; // poll for new notes and insert them into our existing notes if let Err(e) = profile_timeline.poll_notes_into_view( + self.note_context.accounts.selected_account_pubkey(), self.note_context.ndb, &txn, self.note_context.unknown_ids,