timeline_sub.rs (4178B)
1 use enostr::Pubkey; 2 use hashbrown::HashMap; 3 use nostrdb::{Ndb, Subscription}; 4 use notedeck::filter::HybridFilter; 5 6 use crate::timeline::sub::ndb_sub; 7 8 /// Per-account local timeline subscription state with ref-counting. 9 /// 10 /// Remote timeline relay subscriptions are managed by scoped subs; this type 11 /// only tracks local NostrDB subscriptions and active dependers. 12 #[derive(Debug, Default)] 13 pub struct TimelineSub { 14 by_account: HashMap<Pubkey, AccountSubState>, 15 } 16 17 #[derive(Debug, Clone, Copy, Default)] 18 struct AccountSubState { 19 local: Option<Subscription>, 20 dependers: usize, 21 remote_seeded: bool, 22 } 23 24 fn should_remove_account_state(state: &AccountSubState) -> bool { 25 state.dependers == 0 && state.local.is_none() 26 } 27 28 fn unsubscribe_local_with_rollback(ndb: &mut Ndb, local: &mut Option<Subscription>, context: &str) { 29 let Some(local_sub) = local.take() else { 30 return; 31 }; 32 33 if let Err(e) = ndb.unsubscribe(local_sub) { 34 tracing::error!("{context}: ndb unsubscribe failed: {e}"); 35 *local = Some(local_sub); 36 } 37 } 38 39 impl TimelineSub { 40 fn state_for_account(&self, account_pk: &Pubkey) -> AccountSubState { 41 self.by_account.get(account_pk).copied().unwrap_or_default() 42 } 43 44 fn state_for_account_mut(&mut self, account_pk: Pubkey) -> &mut AccountSubState { 45 self.by_account.entry(account_pk).or_default() 46 } 47 48 /// Reset one account's local subscription state while preserving its depender count. 49 pub fn reset_for_account(&mut self, account_pk: Pubkey, ndb: &mut Ndb) { 50 let mut remove_account_state = false; 51 52 if let Some(state) = self.by_account.get_mut(&account_pk) { 53 unsubscribe_local_with_rollback( 54 ndb, 55 &mut state.local, 56 "TimelineSub::reset_for_account", 57 ); 58 remove_account_state = should_remove_account_state(state); 59 } 60 61 if remove_account_state { 62 self.by_account.remove(&account_pk); 63 } 64 } 65 66 pub fn try_add_local(&mut self, account_pk: Pubkey, ndb: &Ndb, filter: &HybridFilter) { 67 let state = self.state_for_account_mut(account_pk); 68 if state.local.is_some() { 69 return; 70 } 71 72 if let Some(sub) = ndb_sub(ndb, &filter.local().combined(), "") { 73 state.local = Some(sub); 74 } 75 } 76 77 pub fn increment(&mut self, account_pk: Pubkey) { 78 self.state_for_account_mut(account_pk).dependers += 1; 79 } 80 81 pub fn remote_seeded(&self, account_pk: &Pubkey) -> bool { 82 self.state_for_account(account_pk).remote_seeded 83 } 84 85 pub fn mark_remote_seeded(&mut self, account_pk: Pubkey) { 86 self.state_for_account_mut(account_pk).remote_seeded = true; 87 } 88 89 pub fn clear_remote_seeded(&mut self, account_pk: Pubkey) { 90 self.state_for_account_mut(account_pk).remote_seeded = false; 91 } 92 93 pub fn get_local(&self, account_pk: &Pubkey) -> Option<Subscription> { 94 self.state_for_account(account_pk).local 95 } 96 97 pub fn unsubscribe_or_decrement(&mut self, account_pk: Pubkey, ndb: &mut Ndb) { 98 let mut remove_account_state = false; 99 if let Some(state) = self.by_account.get_mut(&account_pk) { 100 if state.dependers > 1 { 101 state.dependers = state.dependers.saturating_sub(1); 102 return; 103 } 104 105 state.dependers = state.dependers.saturating_sub(1); 106 state.remote_seeded = false; 107 unsubscribe_local_with_rollback( 108 ndb, 109 &mut state.local, 110 "TimelineSub::unsubscribe_or_decrement", 111 ); 112 remove_account_state = should_remove_account_state(state); 113 } 114 115 if remove_account_state { 116 self.by_account.remove(&account_pk); 117 } 118 } 119 120 pub fn no_sub(&self, account_pk: &Pubkey) -> bool { 121 let state = self.state_for_account(account_pk); 122 state.dependers == 0 123 } 124 125 pub fn has_any_subs(&self) -> bool { 126 !self.by_account.is_empty() 127 } 128 129 pub fn dependers(&self, account_pk: &Pubkey) -> usize { 130 self.state_for_account(account_pk).dependers 131 } 132 }