notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

commit 1ebe408f5b7778063de9057ad5ab6a76e91e9c2f
parent 18d0cff28692d0a79714d6e3f1076dee36f0074d
Author: kernelkind <kernelkind@gmail.com>
Date:   Wed, 25 Feb 2026 16:14:44 -0500

feat(outbox-int): move remote timeline ownership to scoped subs

Move timeline remote subscription lifetime management from legacy relay-pool/unified tracking into scoped subs while keeping the current timeline filter-state model intact for now.

This refactor:
- rewrites TimelineSub to track per-account local NostrDB subscriptions, dependers, and remote seeding state
- moves timeline remote open/pop ownership through ScopedSubApi and timeline owner/key helpers
- updates TimelineCache open/pop paths and timeline setup/readiness flows to use scoped subs
- threads scoped-sub plumbing through timeline open/close call sites (actionbar/nav/route/toolbar/decks/column/add-column/app)
- preserves legacy filter-state and subscriptions cleanup for follow-up commits

Also aligns account-aware reopen/init behavior so local NDB setup only runs for timelines with active dependers for the selected account.

Signed-off-by: kernelkind <kernelkind@gmail.com>

Diffstat:
Mcrates/notedeck_chrome/src/chrome.rs | 2--
Mcrates/notedeck_columns/src/actionbar.rs | 16+++++++---------
Mcrates/notedeck_columns/src/app.rs | 17+++++++++--------
Mcrates/notedeck_columns/src/column.rs | 10+++++-----
Mcrates/notedeck_columns/src/decks.rs | 28+++++++++++++---------------
Mcrates/notedeck_columns/src/nav.rs | 83++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Mcrates/notedeck_columns/src/route.rs | 7++-----
Mcrates/notedeck_columns/src/timeline/cache.rs | 74++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
Mcrates/notedeck_columns/src/timeline/mod.rs | 103+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------------
Mcrates/notedeck_columns/src/timeline/sub/timeline_sub.rs | 415+++++++++++++++----------------------------------------------------------------
Mcrates/notedeck_columns/src/toolbar.rs | 1-
Mcrates/notedeck_columns/src/ui/add_column.rs | 9++++++++-
12 files changed, 301 insertions(+), 464 deletions(-)

diff --git a/crates/notedeck_chrome/src/chrome.rs b/crates/notedeck_chrome/src/chrome.rs @@ -574,7 +574,6 @@ fn chrome_handle_app_action( &mut columns.timeline_cache, &mut columns.threads, ctx.note_cache, - ctx.legacy_pool, &mut ctx.remote, &txn, ctx.unknown_ids, @@ -632,7 +631,6 @@ fn columns_route_to_profile( &mut columns.timeline_cache, &mut columns.threads, ctx.note_cache, - ctx.legacy_pool, &mut ctx.remote, &txn, ctx.unknown_ids, diff --git a/crates/notedeck_columns/src/actionbar.rs b/crates/notedeck_columns/src/actionbar.rs @@ -12,7 +12,7 @@ use crate::{ }; use egui_nav::Percent; -use enostr::{FilledKeypair, NoteId, Pubkey, RelayPool}; +use enostr::{FilledKeypair, NoteId, Pubkey}; use nostrdb::{IngestMetadata, Ndb, NoteBuilder, NoteKey, Transaction}; use notedeck::{ get_wallet_for, is_future_timestamp, @@ -52,7 +52,6 @@ fn execute_note_action( timeline_cache: &mut TimelineCache, threads: &mut Threads, note_cache: &mut NoteCache, - pool: &mut RelayPool, remote: &mut RemoteApi<'_>, txn: &Transaction, accounts: &mut Accounts, @@ -121,14 +120,15 @@ fn execute_note_action( NoteAction::Profile(pubkey) => { let kind = TimelineKind::Profile(pubkey); router_action = Some(RouterAction::route_to(Route::Timeline(kind.clone()))); + let mut scoped_subs = remote.scoped_subs(accounts); timeline_res = timeline_cache .open( ndb, note_cache, txn, - *accounts.selected_account_pubkey(), - pool, + &mut scoped_subs, &kind, + *accounts.selected_account_pubkey(), false, ) .map(NotesOpenResult::Timeline); @@ -144,7 +144,6 @@ fn execute_note_action( break 'ex; }; let mut scoped_subs = remote.scoped_subs(accounts); - timeline_res = threads .open( ndb, @@ -167,14 +166,15 @@ fn execute_note_action( NoteAction::Hashtag(htag) => { let kind = TimelineKind::Hashtag(vec![htag.clone()]); router_action = Some(RouterAction::route_to(Route::Timeline(kind.clone()))); + let mut scoped_subs = remote.scoped_subs(&*accounts); timeline_res = timeline_cache .open( ndb, note_cache, txn, - *accounts.selected_account_pubkey(), - pool, + &mut scoped_subs, &kind, + *accounts.selected_account_pubkey(), false, ) .map(NotesOpenResult::Timeline); @@ -277,7 +277,6 @@ pub fn execute_and_process_note_action( timeline_cache: &mut TimelineCache, threads: &mut Threads, note_cache: &mut NoteCache, - pool: &mut RelayPool, remote: &mut RemoteApi<'_>, txn: &Transaction, unknown_ids: &mut UnknownIds, @@ -305,7 +304,6 @@ pub fn execute_and_process_note_action( timeline_cache, threads, note_cache, - pool, remote, txn, accounts, diff --git a/crates/notedeck_columns/src/app.rs b/crates/notedeck_columns/src/app.rs @@ -187,6 +187,7 @@ fn try_process_event( try_process_events_core(app_ctx, ctx, |app_ctx, ev| match (&ev.event).into() { RelayEvent::Opened => { + let mut scoped_subs = app_ctx.remote.scoped_subs(app_ctx.accounts); timeline::send_initial_timeline_filters( damus.options.contains(AppOptions::SinceOptimize), &mut damus.timeline_cache, @@ -194,6 +195,7 @@ fn try_process_event( app_ctx.legacy_pool, &ev.relay, app_ctx.accounts, + &mut scoped_subs, ); } RelayEvent::Message(msg) => { @@ -204,12 +206,10 @@ fn try_process_event( for (kind, timeline) in &mut damus.timeline_cache { let selected_account_pk = *app_ctx.accounts.selected_account_pubkey(); - let is_ready = timeline::is_timeline_ready( - app_ctx.ndb, - app_ctx.legacy_pool, - timeline, - app_ctx.accounts, - ); + let is_ready = { + let mut scoped_subs = app_ctx.remote.scoped_subs(app_ctx.accounts); + timeline::is_timeline_ready(app_ctx.ndb, &mut scoped_subs, timeline, app_ctx.accounts) + }; if is_ready { schedule_timeline_load( @@ -607,14 +607,15 @@ impl Damus { let txn = Transaction::new(app_context.ndb).unwrap(); for col in &parsed_args.columns { let timeline_kind = col.clone().into_timeline_kind(); + let mut scoped_subs = app_context.remote.scoped_subs(app_context.accounts); if let Some(add_result) = columns.add_new_timeline_column( &mut timeline_cache, &txn, app_context.ndb, app_context.note_cache, - *app_context.accounts.selected_account_pubkey(), - app_context.legacy_pool, + &mut scoped_subs, &timeline_kind, + *app_context.accounts.selected_account_pubkey(), ) { add_result.process( app_context.ndb, diff --git a/crates/notedeck_columns/src/column.rs b/crates/notedeck_columns/src/column.rs @@ -3,9 +3,9 @@ use crate::{ route::{ColumnsRouter, Route, SingletonRouter}, timeline::{Timeline, TimelineCache, TimelineKind}, }; -use enostr::{Pubkey, RelayPool}; +use enostr::Pubkey; use nostrdb::{Ndb, Transaction}; -use notedeck::NoteCache; +use notedeck::{NoteCache, ScopedSubApi}; use std::iter::Iterator; use tracing::warn; @@ -108,13 +108,13 @@ impl Columns { txn: &Transaction, ndb: &Ndb, note_cache: &mut NoteCache, - account_pk: Pubkey, - pool: &mut RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, kind: &TimelineKind, + account_pk: Pubkey, ) -> Option<TimelineOpenResult> { self.columns .push(Column::new(vec![Route::timeline(kind.to_owned())])); - timeline_cache.open(ndb, note_cache, txn, account_pk, pool, kind, false) + timeline_cache.open(ndb, note_cache, txn, scoped_subs, kind, account_pk, false) } pub fn new_column_picker(&mut self) { diff --git a/crates/notedeck_columns/src/decks.rs b/crates/notedeck_columns/src/decks.rs @@ -1,8 +1,8 @@ use std::collections::{hash_map::ValuesMut, HashMap}; -use enostr::{Pubkey, RelayPool}; +use enostr::Pubkey; use nostrdb::Transaction; -use notedeck::{tr, AppContext, Localization, FALLBACK_PUBKEY}; +use notedeck::{tr, AppContext, Localization, ScopedSubApi, FALLBACK_PUBKEY}; use tracing::{error, info}; use crate::{ @@ -171,14 +171,14 @@ impl DecksCache { key: &Pubkey, timeline_cache: &mut TimelineCache, ndb: &mut nostrdb::Ndb, - pool: &mut RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, ) { let Some(decks) = self.account_to_decks.remove(key) else { return; }; info!("Removing decks for {:?}", key); - decks.unsubscribe_all(timeline_cache, ndb, *key, pool); + decks.unsubscribe_all(timeline_cache, ndb, scoped_subs); if !self.account_to_decks.contains_key(&self.fallback_pubkey) { self.account_to_decks @@ -294,14 +294,13 @@ impl Decks { index: usize, timeline_cache: &mut TimelineCache, ndb: &mut nostrdb::Ndb, - account_pk: Pubkey, - pool: &mut enostr::RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, ) { let Some(deck) = self.remove_deck_internal(index) else { return; }; - delete_deck(deck, timeline_cache, ndb, account_pk, pool); + delete_deck(deck, timeline_cache, ndb, scoped_subs); } fn remove_deck_internal(&mut self, index: usize) -> Option<Deck> { @@ -358,11 +357,10 @@ impl Decks { self, timeline_cache: &mut TimelineCache, ndb: &mut nostrdb::Ndb, - account_pk: Pubkey, - pool: &mut enostr::RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, ) { for deck in self.decks { - delete_deck(deck, timeline_cache, ndb, account_pk, pool); + delete_deck(deck, timeline_cache, ndb, scoped_subs); } } } @@ -371,8 +369,7 @@ fn delete_deck( mut deck: Deck, timeline_cache: &mut TimelineCache, ndb: &mut nostrdb::Ndb, - account_pk: Pubkey, - pool: &mut enostr::RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, ) { let cols = deck.columns_mut(); let num_cols = cols.num_columns(); @@ -380,7 +377,7 @@ fn delete_deck( let kinds_to_pop = cols.delete_column(i); for kind in &kinds_to_pop { - if let Err(err) = timeline_cache.pop(kind, account_pk, ndb, pool) { + if let Err(err) = timeline_cache.pop(kind, ndb, scoped_subs) { error!("error popping timeline: {err}"); } } @@ -459,14 +456,15 @@ pub fn add_demo_columns( let txn = Transaction::new(ctx.ndb).unwrap(); for kind in &timeline_kinds { + let mut scoped_subs = ctx.remote.scoped_subs(ctx.accounts); if let Some(results) = columns.add_new_timeline_column( timeline_cache, &txn, ctx.ndb, ctx.note_cache, - *ctx.accounts.selected_account_pubkey(), - ctx.legacy_pool, + &mut scoped_subs, kind, + pubkey, ) { results.process( ctx.ndb, diff --git a/crates/notedeck_columns/src/nav.rs b/crates/notedeck_columns/src/nav.rs @@ -35,7 +35,7 @@ use crate::{ use egui_nav::{ Nav, NavAction, NavResponse, NavUiType, PopupResponse, PopupSheet, RouteResponse, Split, }; -use enostr::{ProfileState, RelayPool}; +use enostr::ProfileState; use nostrdb::{Filter, Ndb, Transaction}; use notedeck::{ get_current_default_msats, nav::DragResponse, tr, ui::is_narrow, Accounts, AppContext, @@ -134,12 +134,13 @@ impl SwitchingAction { break 's; } + let mut scoped_subs = ctx.remote.scoped_subs(ctx.accounts); decks_cache.remove( ctx.i18n, to_remove, timeline_cache, ctx.ndb, - ctx.legacy_pool, + &mut scoped_subs, ); } }, @@ -147,11 +148,9 @@ impl SwitchingAction { ColumnsAction::Remove(index) => { let kinds_to_pop = get_active_columns_mut(ctx.i18n, ctx.accounts, decks_cache) .delete_column(index); - let selected_account_pk = *ctx.accounts.selected_account_pubkey(); for kind in &kinds_to_pop { - if let Err(err) = - timeline_cache.pop(kind, selected_account_pk, ctx.ndb, ctx.legacy_pool) - { + let mut scoped_subs = ctx.remote.scoped_subs(ctx.accounts); + if let Err(err) = timeline_cache.pop(kind, ctx.ndb, &mut scoped_subs) { error!("error popping timeline: {err}"); } } @@ -166,12 +165,12 @@ impl SwitchingAction { get_decks_mut(ctx.i18n, ctx.accounts, decks_cache).set_active(index) } DecksAction::Removing(index) => { + let mut scoped_subs = ctx.remote.scoped_subs(ctx.accounts); get_decks_mut(ctx.i18n, ctx.accounts, decks_cache).remove_deck( index, timeline_cache, ctx.ndb, - *ctx.accounts.selected_account_pubkey(), - ctx.legacy_pool, + &mut scoped_subs, ); } }, @@ -307,7 +306,6 @@ fn process_nav_resp( &mut app.threads, &mut app.view_state, ctx.ndb, - ctx.legacy_pool, &mut ctx.remote.scoped_subs(ctx.accounts), return_type, col, @@ -324,14 +322,17 @@ fn process_nav_resp( .data_mut(|d| d.insert_temp(toolbar_visible_id, true)); handle_navigating_edit_profile(ctx.ndb, ctx.accounts, app, col); - handle_navigating_timeline( - ctx.ndb, - ctx.note_cache, - ctx.legacy_pool, - ctx.accounts, - app, - col, - ); + { + let mut scoped_subs = ctx.remote.scoped_subs(ctx.accounts); + handle_navigating_timeline( + ctx.ndb, + ctx.note_cache, + &mut scoped_subs, + ctx.accounts, + app, + col, + ); + } let cur_router = app .columns_mut(ctx.i18n, ctx.accounts) @@ -355,14 +356,17 @@ fn process_nav_resp( .select_column(col as i32); handle_navigating_edit_profile(ctx.ndb, ctx.accounts, app, col); - handle_navigating_timeline( - ctx.ndb, - ctx.note_cache, - ctx.legacy_pool, - ctx.accounts, - app, - col, - ); + { + let mut scoped_subs = ctx.remote.scoped_subs(ctx.accounts); + handle_navigating_timeline( + ctx.ndb, + ctx.note_cache, + &mut scoped_subs, + ctx.accounts, + app, + col, + ); + } } } } @@ -411,27 +415,36 @@ fn handle_navigating_edit_profile(ndb: &Ndb, accounts: &Accounts, app: &mut Damu fn handle_navigating_timeline( ndb: &Ndb, note_cache: &mut NoteCache, - pool: &mut RelayPool, + scoped_subs: &mut notedeck::ScopedSubApi<'_, '_>, accounts: &Accounts, app: &mut Damus, col: usize, ) { - let account_pk = *accounts.selected_account_pubkey(); + let account_pk = accounts.selected_account_pubkey(); let kind = { let Route::Timeline(kind) = app.columns(accounts).column(col).router().top() else { return; }; - if app.timeline_cache.get(kind).is_some() { - return; + if let Some(timeline) = app.timeline_cache.get(kind) { + if timeline.subscription.dependers(account_pk) > 0 { + return; + } } kind.to_owned() }; let txn = Transaction::new(ndb).expect("txn"); - app.timeline_cache - .open(ndb, note_cache, &txn, account_pk, pool, &kind, false); + app.timeline_cache.open( + ndb, + note_cache, + &txn, + scoped_subs, + &kind, + *account_pk, + false, + ); } pub enum RouterAction { @@ -538,13 +551,10 @@ fn process_render_nav_action( RenderNavAction::PfpClicked => Some(RouterAction::PfpClicked), RenderNavAction::RemoveColumn => { let kinds_to_pop = app.columns_mut(ctx.i18n, ctx.accounts).delete_column(col); - let selected_account_pk = *ctx.accounts.selected_account_pubkey(); for kind in &kinds_to_pop { - if let Err(err) = - app.timeline_cache - .pop(kind, selected_account_pk, ctx.ndb, ctx.legacy_pool) - { + let mut scoped_subs = ctx.remote.scoped_subs(ctx.accounts); + if let Err(err) = app.timeline_cache.pop(kind, ctx.ndb, &mut scoped_subs) { error!("error popping timeline: {err}"); } } @@ -582,7 +592,6 @@ fn process_render_nav_action( &mut app.timeline_cache, &mut app.threads, ctx.note_cache, - ctx.legacy_pool, &mut ctx.remote, &txn, ctx.unknown_ids, diff --git a/crates/notedeck_columns/src/route.rs b/crates/notedeck_columns/src/route.rs @@ -1,5 +1,5 @@ use egui_nav::{Percent, ReturnType}; -use enostr::{NoteId, Pubkey, RelayPool}; +use enostr::{NoteId, Pubkey}; use nostrdb::Ndb; use notedeck::{ tr, Localization, NoteZapTargetOwned, ReplacementType, ReportTarget, RootNoteIdBuf, Router, @@ -799,16 +799,13 @@ pub fn cleanup_popped_route( threads: &mut Threads, view_state: &mut ViewState, ndb: &mut Ndb, - pool: &mut RelayPool, scoped_subs: &mut ScopedSubApi, return_type: ReturnType, col_index: usize, ) { match route { Route::Timeline(kind) => { - if let Err(err) = - timeline_cache.pop(kind, scoped_subs.selected_account_pubkey(), ndb, pool) - { + if let Err(err) = timeline_cache.pop(kind, ndb, scoped_subs) { tracing::error!("popping timeline had an error: {err} for {:?}", kind); } } diff --git a/crates/notedeck_columns/src/timeline/cache.rs b/crates/notedeck_columns/src/timeline/cache.rs @@ -1,12 +1,16 @@ use crate::{ actionbar::TimelineOpenResult, error::Error, - timeline::{Timeline, TimelineKind, UnknownPksOwned}, + timeline::{ + drop_timeline_remote_owner, ensure_remote_timeline_subscription, Timeline, TimelineKind, + UnknownPksOwned, + }, }; +use notedeck::ScopedSubApi; use notedeck::{filter, FilterState, NoteCache, NoteRef}; -use enostr::{Pubkey, RelayPool}; +use enostr::Pubkey; use nostrdb::{Filter, Ndb, Transaction}; use std::collections::HashMap; use tracing::{debug, error, info, warn}; @@ -53,9 +57,8 @@ impl TimelineCache { pub fn pop( &mut self, id: &TimelineKind, - account_pk: Pubkey, ndb: &mut Ndb, - pool: &mut RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, ) -> Result<(), Error> { let timeline = if let Some(timeline) = self.timelines.get_mut(id) { timeline @@ -63,9 +66,15 @@ impl TimelineCache { return Err(Error::TimelineNotFound); }; + let account_pk = scoped_subs.selected_account_pubkey(); timeline .subscription - .unsubscribe_or_decrement(account_pk, ndb, pool); + .unsubscribe_or_decrement(account_pk, ndb); + + if timeline.subscription.no_sub(&account_pk) { + timeline.subscription.clear_remote_seeded(account_pk); + drop_timeline_remote_owner(timeline, account_pk, scoped_subs); + } if !timeline.subscription.has_any_subs() { debug!( @@ -187,9 +196,9 @@ impl TimelineCache { ndb: &Ndb, note_cache: &mut NoteCache, txn: &Transaction, - account_pk: Pubkey, - pool: &mut RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, id: &TimelineKind, + account_pk: Pubkey, load_local: bool, ) -> Option<TimelineOpenResult> { if !load_local { @@ -207,9 +216,12 @@ impl TimelineCache { if let Some(filter) = timeline.filter.get_any_ready() { debug!("got open with subscription for {:?}", &timeline.kind); timeline.subscription.try_add_local(account_pk, ndb, filter); - timeline - .subscription - .try_add_remote(account_pk, pool, filter); + ensure_remote_timeline_subscription( + timeline, + account_pk, + filter.remote().to_vec(), + scoped_subs, + ); } else { debug!( "open skipped subscription; filter not ready for {:?}", @@ -221,23 +233,12 @@ impl TimelineCache { return None; } + let account_pk = scoped_subs.selected_account_pubkey(); let notes_resp = self.notes(ndb, note_cache, txn, id); let (mut open_result, timeline) = match notes_resp.vitality { Vitality::Stale(timeline) => { // The timeline cache is stale, let's update it - let notes = { - let mut notes = Vec::new(); - for package in timeline.subscription.get_filter()?.local().packages { - let cur_notes = find_new_notes( - timeline.all_or_any_entries().latest(), - package.filters, - txn, - ndb, - ); - notes.extend(cur_notes); - } - notes - }; + let notes = collect_stale_notes(timeline, txn, ndb); let open_result = if notes.is_empty() { None @@ -260,9 +261,12 @@ impl TimelineCache { if let Some(filter) = timeline.filter.get_any_ready() { debug!("got open with *new* subscription for {:?}", &timeline.kind); timeline.subscription.try_add_local(account_pk, ndb, filter); - timeline - .subscription - .try_add_remote(account_pk, pool, filter); + ensure_remote_timeline_subscription( + timeline, + account_pk, + filter.remote().to_vec(), + scoped_subs, + ); } else { // This should never happen reasoning, self.notes would have // failed above if the filter wasn't ready @@ -304,6 +308,24 @@ impl TimelineCache { } } +fn collect_stale_notes(timeline: &Timeline, txn: &Transaction, ndb: &Ndb) -> Vec<NoteRef> { + let Some(filter) = timeline.filter.get_any_ready() else { + return Vec::new(); + }; + + let mut notes = Vec::new(); + for package in filter.local().packages { + let cur_notes = find_new_notes( + timeline.all_or_any_entries().latest(), + package.filters, + txn, + ndb, + ); + notes.extend(cur_notes); + } + notes +} + pub struct GetNotesResponse<'a> { vitality: Vitality<'a, Timeline>, unknown_pks: Option<UnknownPksOwned>, diff --git a/crates/notedeck_columns/src/timeline/mod.rs b/crates/notedeck_columns/src/timeline/mod.rs @@ -1,5 +1,6 @@ use crate::{ error::Error, + scoped_sub_owner_keys::timeline_remote_owner_key, subscriptions::{self, SubKind, Subscriptions}, timeline::{ kind::{people_list_note_filter, AlgoTimeline, ListKind, PeopleListRef}, @@ -14,7 +15,8 @@ use notedeck::{ contacts::hybrid_contacts_filter, filter::{self, HybridFilter}, is_future_timestamp, tr, unix_time_secs, Accounts, CachedNote, ContactState, FilterError, - FilterState, FilterStates, Localization, NoteCache, NoteRef, UnknownIds, + FilterState, FilterStates, Localization, NoteCache, NoteRef, RelaySelection, ScopedSubApi, + ScopedSubIdentity, SubConfig, SubKey, UnknownIds, }; use egui_virtual_list::VirtualList; @@ -40,6 +42,60 @@ pub use note_units::{CompositeType, InsertionResponse, NoteUnits}; pub use timeline_units::{TimelineUnits, UnknownPks}; pub use unit::{CompositeUnit, NoteUnit, ReactionUnit, RepostUnit}; +#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] +enum TimelineScopedSub { + RemoteByKind, +} + +fn timeline_remote_sub_key(kind: &TimelineKind) -> SubKey { + SubKey::builder(TimelineScopedSub::RemoteByKind) + .with(kind) + .finish() +} + +fn timeline_remote_sub_config(remote_filters: Vec<Filter>) -> SubConfig { + SubConfig { + relays: RelaySelection::AccountsRead, + filters: remote_filters, + use_transparent: false, + } +} + +pub(crate) fn ensure_remote_timeline_subscription( + timeline: &mut Timeline, + account_pk: Pubkey, + remote_filters: Vec<Filter>, + scoped_subs: &mut ScopedSubApi<'_, '_>, +) { + let owner = timeline_remote_owner_key(account_pk, &timeline.kind); + let identity = ScopedSubIdentity::account(owner, timeline_remote_sub_key(&timeline.kind)); + let config = timeline_remote_sub_config(remote_filters); + let _ = scoped_subs.ensure_sub(identity, config); + timeline.subscription.mark_remote_seeded(account_pk); +} + +pub(crate) fn update_remote_timeline_subscription( + timeline: &mut Timeline, + account_pk: Pubkey, + remote_filters: Vec<Filter>, + scoped_subs: &mut ScopedSubApi<'_, '_>, +) { + let owner = timeline_remote_owner_key(account_pk, &timeline.kind); + let identity = ScopedSubIdentity::account(owner, timeline_remote_sub_key(&timeline.kind)); + let config = timeline_remote_sub_config(remote_filters); + let _ = scoped_subs.set_sub(identity, config); + timeline.subscription.mark_remote_seeded(account_pk); +} + +pub fn drop_timeline_remote_owner( + timeline: &Timeline, + account_pk: Pubkey, + scoped_subs: &mut ScopedSubApi<'_, '_>, +) { + let owner = timeline_remote_owner_key(account_pk, &timeline.kind); + let _ = scoped_subs.drop_owner(owner); +} + #[derive(Copy, Clone, Eq, PartialEq, Debug, Default, PartialOrd, Ord)] pub enum ViewFilter { MentionsOnly, @@ -628,6 +684,7 @@ pub fn setup_new_timeline( txn: &Transaction, subs: &mut Subscriptions, pool: &mut RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, note_cache: &mut NoteCache, since_optimize: bool, accounts: &Accounts, @@ -635,7 +692,7 @@ pub fn setup_new_timeline( ) { let account_pk = *accounts.selected_account_pubkey(); // if we're ready, setup local subs - if is_timeline_ready(ndb, pool, timeline, accounts) { + if is_timeline_ready(ndb, scoped_subs, timeline, accounts) { if let Err(err) = setup_timeline_nostrdb_sub(ndb, txn, note_cache, timeline, unknown_ids, account_pk) { @@ -644,7 +701,7 @@ pub fn setup_new_timeline( } for relay in &mut pool.relays { - send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts); + send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts, scoped_subs); } timeline.subscription.increment(account_pk); } @@ -661,12 +718,13 @@ pub fn send_initial_timeline_filters( pool: &mut RelayPool, relay_id: &str, accounts: &Accounts, + scoped_subs: &mut ScopedSubApi<'_, '_>, ) -> Option<()> { info!("Sending initial filters to {}", relay_id); let relay = &mut pool.relays.iter_mut().find(|r| r.url() == relay_id)?; for (_kind, timeline) in timeline_cache { - send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts); + send_initial_timeline_filter(since_optimize, subs, relay, timeline, accounts, scoped_subs); } Some(()) @@ -678,6 +736,7 @@ pub fn send_initial_timeline_filter( relay: &mut PoolRelay, timeline: &mut Timeline, accounts: &Accounts, + scoped_subs: &mut ScopedSubApi<'_, '_>, ) { let account_pk = *accounts.selected_account_pubkey(); let filter_state = timeline.filter.get_mut(relay.url()); @@ -726,15 +785,7 @@ pub fn send_initial_timeline_filter( filter }).collect(); - //let sub_id = damus.gen_subid(&SubKind::Initial); - let sub_id = subscriptions::new_sub_id(); - subs.subs.insert(sub_id.clone(), SubKind::Initial); - - if let Err(err) = relay.subscribe(sub_id.clone(), new_filters.clone()) { - error!("error subscribing: {err}"); - } else { - timeline.subscription.force_add_remote(account_pk, sub_id); - } + update_remote_timeline_subscription(timeline, account_pk, new_filters, scoped_subs); } // we need some data first @@ -874,6 +925,10 @@ pub fn setup_initial_nostrdb_subs( account_pk: Pubkey, ) -> Result<()> { for (_kind, timeline) in timeline_cache { + if timeline.subscription.dependers(&account_pk) == 0 { + continue; + } + let txn = Transaction::new(ndb).expect("txn"); if let Err(err) = setup_timeline_nostrdb_sub(ndb, &txn, note_cache, timeline, unknown_ids, account_pk) @@ -919,13 +974,24 @@ fn setup_timeline_nostrdb_sub( #[profiling::function] pub fn is_timeline_ready( ndb: &Ndb, - pool: &mut RelayPool, + scoped_subs: &mut ScopedSubApi<'_, '_>, timeline: &mut Timeline, accounts: &Accounts, ) -> bool { // TODO: we should debounce the filter states a bit to make sure we have // seen all of the different contact lists from each relay - if let Some(_f) = timeline.filter.get_any_ready() { + if let Some(filter) = timeline.filter.get_any_ready() { + let account_pk = *accounts.selected_account_pubkey(); + if timeline.subscription.dependers(&account_pk) > 0 + && !timeline.subscription.remote_seeded(&account_pk) + { + ensure_remote_timeline_subscription( + timeline, + account_pk, + filter.remote().to_vec(), + scoped_subs, + ); + } return true; } @@ -1029,10 +1095,11 @@ pub fn is_timeline_ready( //let ck = &timeline.kind; //let subid = damus.gen_subid(&SubKind::Column(ck.clone())); - timeline.subscription.try_add_remote( + update_remote_timeline_subscription( + timeline, *accounts.selected_account_pubkey(), - pool, - &filter, + filter.remote().to_vec(), + scoped_subs, ); true } diff --git a/crates/notedeck_columns/src/timeline/sub/timeline_sub.rs b/crates/notedeck_columns/src/timeline/sub/timeline_sub.rs @@ -1,370 +1,125 @@ -use notedeck::{filter::HybridFilter, UnifiedSubscription}; - use enostr::Pubkey; -use enostr::RelayPool; use hashbrown::HashMap; use nostrdb::{Ndb, Subscription}; +use notedeck::filter::HybridFilter; -use crate::{subscriptions, timeline::sub::ndb_sub}; - -fn unsubscribe_local(ndb: &mut Ndb, local: Subscription, context: &str) -> bool { - if let Err(e) = ndb.unsubscribe(local) { - tracing::error!("{context}: failed to unsubscribe from ndb: {e}"); - return false; - } +use crate::timeline::sub::ndb_sub; - true -} - -/// Per-account timeline subscription state with ref-counting. +/// Per-account local timeline subscription state with ref-counting. /// -/// This still manages legacy relay-pool remote subscriptions for now; scoped-sub -/// remote ownership is migrated in a follow-up refactor. +/// Remote timeline relay subscriptions are managed by scoped subs; this type +/// only tracks local NostrDB subscriptions and active dependers. #[derive(Debug, Default)] pub struct TimelineSub { - filter: Option<HybridFilter>, - by_account: HashMap<Pubkey, SubState>, + by_account: HashMap<Pubkey, AccountSubState>, +} + +#[derive(Debug, Clone, Copy, Default)] +struct AccountSubState { + local: Option<Subscription>, + dependers: usize, + remote_seeded: bool, } -#[derive(Debug, Clone)] -enum SubState { - NoSub { - dependers: usize, - }, - LocalOnly { - local: Subscription, - dependers: usize, - }, - RemoteOnly { - remote: String, - dependers: usize, - }, - Unified { - unified: UnifiedSubscription, - dependers: usize, - }, +fn should_remove_account_state(state: &AccountSubState) -> bool { + state.dependers == 0 && state.local.is_none() } -impl Default for SubState { - fn default() -> Self { - Self::NoSub { dependers: 0 } +fn unsubscribe_local_with_rollback(ndb: &mut Ndb, local: &mut Option<Subscription>, context: &str) { + let Some(local_sub) = local.take() else { + return; + }; + + if let Err(e) = ndb.unsubscribe(local_sub) { + tracing::error!("{context}: ndb unsubscribe failed: {e}"); + *local = Some(local_sub); } } impl TimelineSub { - fn state_for_account(&self, account_pk: &Pubkey) -> SubState { - self.by_account.get(account_pk).cloned().unwrap_or_default() + fn state_for_account(&self, account_pk: &Pubkey) -> AccountSubState { + self.by_account.get(account_pk).copied().unwrap_or_default() } - fn set_state_for_account(&mut self, account_pk: Pubkey, state: SubState) { - if matches!(state, SubState::NoSub { dependers: 0 }) { - self.by_account.remove(&account_pk); - return; - } - - self.by_account.insert(account_pk, state); + fn state_for_account_mut(&mut self, account_pk: Pubkey) -> &mut AccountSubState { + self.by_account.entry(account_pk).or_default() } - /// Reset one account's subscription state while preserving its depender count. - pub fn reset_for_account(&mut self, account_pk: Pubkey, ndb: &mut Ndb, pool: &mut RelayPool) { - let before = self.state_for_account(&account_pk); + /// Reset one account's local subscription state while preserving its depender count. + pub fn reset_for_account(&mut self, account_pk: Pubkey, ndb: &mut Ndb) { + let mut remove_account_state = false; - let next = match before.clone() { - SubState::NoSub { dependers } => SubState::NoSub { dependers }, - SubState::LocalOnly { local, dependers } => { - if !unsubscribe_local(ndb, local, "TimelineSub::reset_for_account") { - return; - } - SubState::NoSub { dependers } - } - SubState::RemoteOnly { remote, dependers } => { - pool.unsubscribe(remote); - SubState::NoSub { dependers } - } - SubState::Unified { unified, dependers } => { - pool.unsubscribe(unified.remote.clone()); - if !unsubscribe_local(ndb, unified.local, "TimelineSub::reset_for_account") { - self.set_state_for_account( - account_pk, - SubState::LocalOnly { - local: unified.local, - dependers, - }, - ); - return; - } - SubState::NoSub { dependers } - } - }; - - self.set_state_for_account(account_pk, next); - self.filter = None; + if let Some(state) = self.by_account.get_mut(&account_pk) { + unsubscribe_local_with_rollback( + ndb, + &mut state.local, + "TimelineSub::reset_for_account", + ); + remove_account_state = should_remove_account_state(state); + } - tracing::debug!( - "TimelineSub::reset_for_account({account_pk:?}): {:?} => {:?}", - before, - self.state_for_account(&account_pk) - ); + if remove_account_state { + self.by_account.remove(&account_pk); + } } pub fn try_add_local(&mut self, account_pk: Pubkey, ndb: &Ndb, filter: &HybridFilter) { - let before = self.state_for_account(&account_pk); - - let Some(next) = (match before.clone() { - SubState::NoSub { dependers } => { - let Some(sub) = ndb_sub(ndb, &filter.local().combined(), "") else { - return; - }; - self.filter = Some(filter.to_owned()); - Some(SubState::LocalOnly { - local: sub, - dependers, - }) - } - SubState::LocalOnly { .. } => None, - SubState::RemoteOnly { remote, dependers } => { - let Some(local) = ndb_sub(ndb, &filter.local().combined(), "") else { - return; - }; - Some(SubState::Unified { - unified: UnifiedSubscription { local, remote }, - dependers, - }) - } - SubState::Unified { .. } => None, - }) else { + let state = self.state_for_account_mut(account_pk); + if state.local.is_some() { return; - }; - - self.set_state_for_account(account_pk, next); + } - tracing::debug!( - "TimelineSub::try_add_local({account_pk:?}): {:?} => {:?}", - before, - self.state_for_account(&account_pk) - ); + if let Some(sub) = ndb_sub(ndb, &filter.local().combined(), "") { + state.local = Some(sub); + } } - pub fn force_add_remote(&mut self, account_pk: Pubkey, subid: String) { - let before = self.state_for_account(&account_pk); - - let next = match before.clone() { - SubState::NoSub { dependers } => SubState::RemoteOnly { - remote: subid, - dependers, - }, - SubState::LocalOnly { local, dependers } => SubState::Unified { - unified: UnifiedSubscription { - local, - remote: subid, - }, - dependers, - }, - SubState::RemoteOnly { .. } | SubState::Unified { .. } => return, - }; - - self.set_state_for_account(account_pk, next); - - tracing::debug!( - "TimelineSub::force_add_remote({account_pk:?}): {:?} => {:?}", - before, - self.state_for_account(&account_pk) - ); + pub fn increment(&mut self, account_pk: Pubkey) { + self.state_for_account_mut(account_pk).dependers += 1; } - pub fn try_add_remote( - &mut self, - account_pk: Pubkey, - pool: &mut RelayPool, - filter: &HybridFilter, - ) { - let before = self.state_for_account(&account_pk); - - let next = match before.clone() { - SubState::NoSub { dependers } => { - let subid = subscriptions::new_sub_id(); - pool.subscribe(subid.clone(), filter.remote().to_vec()); - self.filter = Some(filter.to_owned()); - SubState::RemoteOnly { - remote: subid, - dependers, - } - } - SubState::LocalOnly { local, dependers } => { - let subid = subscriptions::new_sub_id(); - pool.subscribe(subid.clone(), filter.remote().to_vec()); - self.filter = Some(filter.to_owned()); - SubState::Unified { - unified: UnifiedSubscription { - local, - remote: subid, - }, - dependers, - } - } - SubState::RemoteOnly { .. } | SubState::Unified { .. } => return, - }; - - self.set_state_for_account(account_pk, next); - - tracing::debug!( - "TimelineSub::try_add_remote({account_pk:?}): {:?} => {:?}", - before, - self.state_for_account(&account_pk) - ); + pub fn remote_seeded(&self, account_pk: &Pubkey) -> bool { + self.state_for_account(account_pk).remote_seeded } - pub fn increment(&mut self, account_pk: Pubkey) { - let before = self.state_for_account(&account_pk); - - let next = match before.clone() { - SubState::NoSub { dependers } => SubState::NoSub { - dependers: dependers + 1, - }, - SubState::LocalOnly { local, dependers } => SubState::LocalOnly { - local, - dependers: dependers + 1, - }, - SubState::RemoteOnly { remote, dependers } => SubState::RemoteOnly { - remote, - dependers: dependers + 1, - }, - SubState::Unified { unified, dependers } => SubState::Unified { - unified, - dependers: dependers + 1, - }, - }; - - self.set_state_for_account(account_pk, next); + pub fn mark_remote_seeded(&mut self, account_pk: Pubkey) { + self.state_for_account_mut(account_pk).remote_seeded = true; + } - tracing::debug!( - "TimelineSub::increment({account_pk:?}): {:?} => {:?}", - before, - self.state_for_account(&account_pk) - ); + pub fn clear_remote_seeded(&mut self, account_pk: Pubkey) { + self.state_for_account_mut(account_pk).remote_seeded = false; } pub fn get_local(&self, account_pk: &Pubkey) -> Option<Subscription> { - match self.state_for_account(account_pk) { - SubState::NoSub { dependers: _ } => None, - SubState::LocalOnly { - local, - dependers: _, - } => Some(local), - SubState::RemoteOnly { - remote: _, - dependers: _, - } => None, - SubState::Unified { - unified, - dependers: _, - } => Some(unified.local), - } + self.state_for_account(account_pk).local } - pub fn unsubscribe_or_decrement( - &mut self, - account_pk: Pubkey, - ndb: &mut Ndb, - pool: &mut RelayPool, - ) { - let before = self.state_for_account(&account_pk); - - let next = match before.clone() { - SubState::NoSub { dependers } => SubState::NoSub { - dependers: dependers.saturating_sub(1), - }, - SubState::LocalOnly { local, dependers } => { - if dependers > 1 { - return self.set_and_log_after_decrement( - account_pk, - before, - SubState::LocalOnly { - local, - dependers: dependers.saturating_sub(1), - }, - ); - } - - // Keep local state intact if NDB unsubscribe fails. - if !unsubscribe_local(ndb, local, "TimelineSub::unsubscribe_or_decrement") { - return; - } - - SubState::NoSub { dependers: 0 } - } - SubState::RemoteOnly { remote, dependers } => { - if dependers > 1 { - return self.set_and_log_after_decrement( - account_pk, - before, - SubState::RemoteOnly { - remote, - dependers: dependers.saturating_sub(1), - }, - ); - } - - pool.unsubscribe(remote); - SubState::NoSub { dependers: 0 } + pub fn unsubscribe_or_decrement(&mut self, account_pk: Pubkey, ndb: &mut Ndb) { + let mut remove_account_state = false; + if let Some(state) = self.by_account.get_mut(&account_pk) { + if state.dependers > 1 { + state.dependers = state.dependers.saturating_sub(1); + return; } - SubState::Unified { unified, dependers } => { - if dependers > 1 { - return self.set_and_log_after_decrement( - account_pk, - before, - SubState::Unified { - unified, - dependers: dependers.saturating_sub(1), - }, - ); - } - - pool.unsubscribe(unified.remote.clone()); - // Remote is already gone above; fall back to local-only on NDB failure. - if !unsubscribe_local(ndb, unified.local, "TimelineSub::unsubscribe_or_decrement") { - SubState::LocalOnly { - local: unified.local, - dependers, - } - } else { - SubState::NoSub { dependers: 0 } - } - } - }; - - self.set_state_for_account(account_pk, next); - tracing::debug!( - "TimelineSub::unsubscribe_or_decrement({account_pk:?}): {:?} => {:?}", - before, - self.state_for_account(&account_pk) - ); - } - - fn set_and_log_after_decrement( - &mut self, - account_pk: Pubkey, - before: SubState, - next: SubState, - ) { - self.set_state_for_account(account_pk, next); - tracing::debug!( - "TimelineSub::unsubscribe_or_decrement({account_pk:?}): {:?} => {:?}", - before, - self.state_for_account(&account_pk) - ); - } + state.dependers = state.dependers.saturating_sub(1); + state.remote_seeded = false; + unsubscribe_local_with_rollback( + ndb, + &mut state.local, + "TimelineSub::unsubscribe_or_decrement", + ); + remove_account_state = should_remove_account_state(state); + } - pub fn get_filter(&self) -> Option<&HybridFilter> { - self.filter.as_ref() + if remove_account_state { + self.by_account.remove(&account_pk); + } } pub fn no_sub(&self, account_pk: &Pubkey) -> bool { - matches!( - self.state_for_account(account_pk), - SubState::NoSub { dependers: _ } - ) + let state = self.state_for_account(account_pk); + state.dependers == 0 } pub fn has_any_subs(&self) -> bool { @@ -372,20 +127,6 @@ impl TimelineSub { } pub fn dependers(&self, account_pk: &Pubkey) -> usize { - match self.state_for_account(account_pk) { - SubState::NoSub { dependers } => dependers, - SubState::LocalOnly { - local: _, - dependers, - } => dependers, - SubState::RemoteOnly { - remote: _, - dependers, - } => dependers, - SubState::Unified { - unified: _, - dependers, - } => dependers, - } + self.state_for_account(account_pk).dependers } } diff --git a/crates/notedeck_columns/src/toolbar.rs b/crates/notedeck_columns/src/toolbar.rs @@ -102,7 +102,6 @@ fn pop_to_root(app: &mut Damus, ctx: &mut AppContext, col_index: usize) { &mut app.threads, &mut app.view_state, ctx.ndb, - ctx.legacy_pool, &mut ctx.remote.scoped_subs(ctx.accounts), ReturnType::Click, col_index, diff --git a/crates/notedeck_columns/src/ui/add_column.rs b/crates/notedeck_columns/src/ui/add_column.rs @@ -919,12 +919,14 @@ fn attach_timeline_column( return false; }; + let mut scoped_subs = ctx.remote.scoped_subs(ctx.accounts); crate::timeline::setup_new_timeline( &mut timeline, ctx.ndb, &txn, &mut app.subscriptions, ctx.legacy_pool, + &mut scoped_subs, ctx.note_cache, app.options.contains(AppOptions::SinceOptimize), ctx.accounts, @@ -936,7 +938,11 @@ fn attach_timeline_column( .column_mut(col) .router_mut() .route_to_replaced(Route::timeline(route_kind.clone())); - app.timeline_cache.insert(route_kind, account_pk, timeline); + app.timeline_cache.insert( + route_kind, + *ctx.accounts.selected_account_pubkey(), + timeline, + ); true } @@ -1146,6 +1152,7 @@ fn handle_create_people_list(app: &mut Damus, ctx: &mut AppContext<'_>, col: usi &txn, &mut app.subscriptions, ctx.legacy_pool, + &mut ctx.remote.scoped_subs(ctx.accounts), ctx.note_cache, app.options.contains(AppOptions::SinceOptimize), ctx.accounts,