thread_sub.rs (9818B)
1 use egui_nav::ReturnType; 2 use enostr::{Filter, NoteId, Pubkey}; 3 use hashbrown::HashMap; 4 use nostrdb::{Ndb, Subscription}; 5 use notedeck::{ 6 Accounts, RelaySelection, ScopedSubApi, ScopedSubIdentity, SubConfig, SubKey, SubOwnerKey, 7 }; 8 9 use crate::scoped_sub_owner_keys::thread_scope_owner_key; 10 use crate::timeline::{ 11 sub::{ndb_sub, ndb_unsub}, 12 ThreadSelection, 13 }; 14 15 type RootNoteId = NoteId; 16 17 // column id 18 type MetaId = usize; 19 20 /// Outcome of removing local thread subscriptions for a close action. 21 #[derive(Clone, Copy, Debug, Eq, PartialEq)] 22 enum UnsubscribeOutcome { 23 /// Local NDB sub(s) were removed, but the scope still has stack entries so the 24 /// remote scoped-sub owner should remain. 25 KeepOwner, 26 /// The thread scope was fully removed and the remote scoped-sub owner should 27 /// be released using the returned root note id. 28 DropOwner(RootNoteId), 29 } 30 31 /// Thread subscription manager keyed by account and column scope. 32 /// 33 /// Each opened thread scope installs one local NostrDB sub plus one scoped 34 /// remote sub owner. Closing scope releases owner and tears down local state. 35 #[derive(Default)] 36 pub struct ThreadSubs { 37 /// Per-account thread subscription bookkeeping. 38 by_account: HashMap<Pubkey, AccountThreadSubs>, 39 } 40 41 #[derive(Default)] 42 struct AccountThreadSubs { 43 scopes: HashMap<MetaId, Vec<Scope>>, 44 } 45 46 struct Scope { 47 root_id: NoteId, 48 stack: Vec<Sub>, 49 } 50 51 struct Sub { 52 _selected_id: NoteId, 53 sub: Subscription, 54 // Keep local filters alive for the full subscription lifetime. Thread 55 // filters use custom callbacks and can crash if dropped early. 56 _filters: Vec<Filter>, 57 } 58 59 impl ThreadSubs { 60 #[allow(clippy::too_many_arguments)] 61 pub fn subscribe( 62 &mut self, 63 ndb: &mut Ndb, 64 scoped_subs: &mut ScopedSubApi<'_, '_>, 65 meta_id: usize, 66 id: &ThreadSelection, 67 local_sub_filter: Vec<Filter>, 68 new_scope: bool, 69 remote_sub_filter: Vec<Filter>, 70 ) { 71 let account_pk = scoped_subs.selected_account_pubkey(); 72 let account_subs = self.by_account.entry(account_pk).or_default(); 73 let cur_scopes = account_subs.scopes.entry(meta_id).or_default(); 74 let added_local = if new_scope || cur_scopes.is_empty() { 75 local_sub_new_scope( 76 ndb, 77 scoped_subs, 78 account_pk, 79 meta_id, 80 id, 81 local_sub_filter, 82 remote_sub_filter, 83 cur_scopes, 84 ) 85 } else { 86 let cur_scope = cur_scopes.last_mut().expect("can't be empty"); 87 sub_current_scope(ndb, id, local_sub_filter, cur_scope) 88 }; 89 90 if added_local { 91 tracing::debug!( 92 "Sub stats: account={:?}, num locals: {}", 93 account_pk, 94 account_subs.scopes.len(), 95 ); 96 } 97 } 98 99 pub fn unsubscribe( 100 &mut self, 101 ndb: &mut Ndb, 102 scoped_subs: &mut ScopedSubApi<'_, '_>, 103 meta_id: usize, 104 id: &ThreadSelection, 105 return_type: ReturnType, 106 ) { 107 let account_pk = scoped_subs.selected_account_pubkey(); 108 let (owner_to_drop, remove_account_entry) = { 109 let Some(account_subs) = self.by_account.get_mut(&account_pk) else { 110 return; 111 }; 112 113 let Some(scopes) = account_subs.scopes.get_mut(&meta_id) else { 114 return; 115 }; 116 117 let scope_depth = scopes.len().saturating_sub(1); 118 let Some(unsub_outcome) = (match return_type { 119 ReturnType::Drag => unsubscribe_drag(scopes, ndb, id), 120 ReturnType::Click => unsubscribe_click(scopes, ndb, id), 121 }) else { 122 return; 123 }; 124 125 if scopes.is_empty() { 126 account_subs.scopes.remove(&meta_id); 127 } 128 129 tracing::debug!( 130 "unsub stats: account={:?}, num locals: {}, released owner: {}", 131 account_pk, 132 account_subs.scopes.len(), 133 matches!(unsub_outcome, UnsubscribeOutcome::DropOwner(_)), 134 ); 135 136 ( 137 match unsub_outcome { 138 UnsubscribeOutcome::KeepOwner => None, 139 UnsubscribeOutcome::DropOwner(root_id) => Some(thread_scope_owner_key( 140 account_pk, 141 meta_id, 142 &root_id, 143 scope_depth, 144 )), 145 }, 146 account_subs.scopes.is_empty(), 147 ) 148 }; 149 150 if remove_account_entry { 151 self.by_account.remove(&account_pk); 152 } 153 154 if let Some(owner) = owner_to_drop { 155 let _ = scoped_subs.drop_owner(owner); 156 } 157 } 158 159 pub fn get_local(&self, account_pk: &Pubkey, meta_id: usize) -> Option<&Subscription> { 160 self.by_account 161 .get(account_pk)? 162 .scopes 163 .get(&meta_id) 164 .and_then(|s| s.last()) 165 .and_then(|s| s.stack.last()) 166 .map(|s| &s.sub) 167 } 168 169 pub fn get_local_for_selected<'a>( 170 &'a self, 171 accounts: &Accounts, 172 meta_id: usize, 173 ) -> Option<&'a Subscription> { 174 self.get_local(accounts.selected_account_pubkey(), meta_id) 175 } 176 } 177 178 fn unsubscribe_drag( 179 scopes: &mut Vec<Scope>, 180 ndb: &mut Ndb, 181 id: &ThreadSelection, 182 ) -> Option<UnsubscribeOutcome> { 183 let Some(scope) = scopes.last_mut() else { 184 tracing::error!("called drag unsubscribe but there aren't any scopes left"); 185 return None; 186 }; 187 188 let Some(cur_sub) = scope.stack.pop() else { 189 tracing::error!("expected a scope to be left"); 190 return None; 191 }; 192 193 log_scope_root_mismatch(scope, id); 194 195 if !ndb_unsub(ndb, cur_sub.sub, id) { 196 // Keep local bookkeeping aligned with NDB when unsubscribe fails. 197 scope.stack.push(cur_sub); 198 return None; 199 } 200 201 if scope.stack.is_empty() { 202 let removed_scope = scopes.pop().expect("checked empty above"); 203 return Some(UnsubscribeOutcome::DropOwner(removed_scope.root_id)); 204 } 205 206 Some(UnsubscribeOutcome::KeepOwner) 207 } 208 209 fn unsubscribe_click( 210 scopes: &mut Vec<Scope>, 211 ndb: &mut Ndb, 212 id: &ThreadSelection, 213 ) -> Option<UnsubscribeOutcome> { 214 let Some(mut scope) = scopes.pop() else { 215 tracing::error!("called unsubscribe but there aren't any scopes left"); 216 return None; 217 }; 218 219 log_scope_root_mismatch(&scope, id); 220 while let Some(sub) = scope.stack.pop() { 221 if ndb_unsub(ndb, sub.sub, id) { 222 continue; 223 } 224 225 // Partial rollback: restore the failed local sub (and any remaining ones) 226 // to thread bookkeeping and keep the remote owner alive. 227 scope.stack.push(sub); 228 scopes.push(scope); 229 return None; 230 } 231 Some(UnsubscribeOutcome::DropOwner(scope.root_id)) 232 } 233 234 fn log_scope_root_mismatch(scope: &Scope, id: &ThreadSelection) { 235 if scope.root_id.bytes() != id.root_id.bytes() { 236 tracing::error!( 237 "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}", 238 scope.root_id.hex(), 239 id.root_id.bytes() 240 ); 241 } 242 } 243 244 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 245 enum ThreadScopedSub { 246 RepliesByRoot, 247 } 248 249 fn thread_remote_sub_key(root_id: &RootNoteId) -> SubKey { 250 SubKey::builder(ThreadScopedSub::RepliesByRoot) 251 .with(*root_id.bytes()) 252 .finish() 253 } 254 255 fn sub_current_scope( 256 ndb: &mut Ndb, 257 selection: &ThreadSelection, 258 local_sub_filter: Vec<Filter>, 259 cur_scope: &mut Scope, 260 ) -> bool { 261 if selection.root_id.bytes() != cur_scope.root_id.bytes() { 262 tracing::error!( 263 "Somehow the current scope's root is not equal to the selected note's root" 264 ); 265 } 266 267 if let Some(sub) = ndb_sub(ndb, &local_sub_filter, selection) { 268 cur_scope.stack.push(Sub { 269 _selected_id: NoteId::new(*selection.selected_or_root()), 270 sub, 271 _filters: local_sub_filter, 272 }); 273 return true; 274 } 275 276 false 277 } 278 279 fn sub_remote( 280 scoped_subs: &mut ScopedSubApi<'_, '_>, 281 owner: SubOwnerKey, 282 key: SubKey, 283 filter: Vec<Filter>, 284 id: impl std::fmt::Debug, 285 ) { 286 tracing::debug!("Remote subscribe for {:?}", id); 287 288 let identity = ScopedSubIdentity::account(owner, key); 289 let config = SubConfig { 290 relays: RelaySelection::AccountsRead, 291 filters: filter, 292 use_transparent: false, 293 }; 294 let _ = scoped_subs.ensure_sub(identity, config); 295 } 296 297 #[allow(clippy::too_many_arguments)] 298 fn local_sub_new_scope( 299 ndb: &mut Ndb, 300 scoped_subs: &mut ScopedSubApi<'_, '_>, 301 account_pk: Pubkey, 302 meta_id: usize, 303 id: &ThreadSelection, 304 local_sub_filter: Vec<Filter>, 305 remote_sub_filter: Vec<Filter>, 306 scopes: &mut Vec<Scope>, 307 ) -> bool { 308 let root_id = id.root_id.to_note_id(); 309 let scope_depth = scopes.len(); 310 let owner = thread_scope_owner_key(account_pk, meta_id, &root_id, scope_depth); 311 tracing::info!( 312 "thread sub with owner: pk: {account_pk:?}, col: {meta_id}, rootid: {root_id:?}, depth: {scope_depth}" 313 ); 314 sub_remote( 315 scoped_subs, 316 owner, 317 thread_remote_sub_key(&root_id), 318 remote_sub_filter, 319 id, 320 ); 321 322 let Some(sub) = ndb_sub(ndb, &local_sub_filter, id) else { 323 let _ = scoped_subs.drop_owner(owner); 324 return false; 325 }; 326 327 scopes.push(Scope { 328 root_id, 329 stack: vec![Sub { 330 _selected_id: NoteId::new(*id.selected_or_root()), 331 sub, 332 _filters: local_sub_filter, 333 }], 334 }); 335 336 true 337 }