notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

thread_sub.rs (9818B)


      1 use egui_nav::ReturnType;
      2 use enostr::{Filter, NoteId, Pubkey};
      3 use hashbrown::HashMap;
      4 use nostrdb::{Ndb, Subscription};
      5 use notedeck::{
      6     Accounts, RelaySelection, ScopedSubApi, ScopedSubIdentity, SubConfig, SubKey, SubOwnerKey,
      7 };
      8 
      9 use crate::scoped_sub_owner_keys::thread_scope_owner_key;
     10 use crate::timeline::{
     11     sub::{ndb_sub, ndb_unsub},
     12     ThreadSelection,
     13 };
     14 
     15 type RootNoteId = NoteId;
     16 
     17 // column id
     18 type MetaId = usize;
     19 
     20 /// Outcome of removing local thread subscriptions for a close action.
     21 #[derive(Clone, Copy, Debug, Eq, PartialEq)]
     22 enum UnsubscribeOutcome {
     23     /// Local NDB sub(s) were removed, but the scope still has stack entries so the
     24     /// remote scoped-sub owner should remain.
     25     KeepOwner,
     26     /// The thread scope was fully removed and the remote scoped-sub owner should
     27     /// be released using the returned root note id.
     28     DropOwner(RootNoteId),
     29 }
     30 
     31 /// Thread subscription manager keyed by account and column scope.
     32 ///
     33 /// Each opened thread scope installs one local NostrDB sub plus one scoped
     34 /// remote sub owner. Closing scope releases owner and tears down local state.
     35 #[derive(Default)]
     36 pub struct ThreadSubs {
     37     /// Per-account thread subscription bookkeeping.
     38     by_account: HashMap<Pubkey, AccountThreadSubs>,
     39 }
     40 
     41 #[derive(Default)]
     42 struct AccountThreadSubs {
     43     scopes: HashMap<MetaId, Vec<Scope>>,
     44 }
     45 
     46 struct Scope {
     47     root_id: NoteId,
     48     stack: Vec<Sub>,
     49 }
     50 
     51 struct Sub {
     52     _selected_id: NoteId,
     53     sub: Subscription,
     54     // Keep local filters alive for the full subscription lifetime. Thread
     55     // filters use custom callbacks and can crash if dropped early.
     56     _filters: Vec<Filter>,
     57 }
     58 
     59 impl ThreadSubs {
     60     #[allow(clippy::too_many_arguments)]
     61     pub fn subscribe(
     62         &mut self,
     63         ndb: &mut Ndb,
     64         scoped_subs: &mut ScopedSubApi<'_, '_>,
     65         meta_id: usize,
     66         id: &ThreadSelection,
     67         local_sub_filter: Vec<Filter>,
     68         new_scope: bool,
     69         remote_sub_filter: Vec<Filter>,
     70     ) {
     71         let account_pk = scoped_subs.selected_account_pubkey();
     72         let account_subs = self.by_account.entry(account_pk).or_default();
     73         let cur_scopes = account_subs.scopes.entry(meta_id).or_default();
     74         let added_local = if new_scope || cur_scopes.is_empty() {
     75             local_sub_new_scope(
     76                 ndb,
     77                 scoped_subs,
     78                 account_pk,
     79                 meta_id,
     80                 id,
     81                 local_sub_filter,
     82                 remote_sub_filter,
     83                 cur_scopes,
     84             )
     85         } else {
     86             let cur_scope = cur_scopes.last_mut().expect("can't be empty");
     87             sub_current_scope(ndb, id, local_sub_filter, cur_scope)
     88         };
     89 
     90         if added_local {
     91             tracing::debug!(
     92                 "Sub stats: account={:?}, num locals: {}",
     93                 account_pk,
     94                 account_subs.scopes.len(),
     95             );
     96         }
     97     }
     98 
     99     pub fn unsubscribe(
    100         &mut self,
    101         ndb: &mut Ndb,
    102         scoped_subs: &mut ScopedSubApi<'_, '_>,
    103         meta_id: usize,
    104         id: &ThreadSelection,
    105         return_type: ReturnType,
    106     ) {
    107         let account_pk = scoped_subs.selected_account_pubkey();
    108         let (owner_to_drop, remove_account_entry) = {
    109             let Some(account_subs) = self.by_account.get_mut(&account_pk) else {
    110                 return;
    111             };
    112 
    113             let Some(scopes) = account_subs.scopes.get_mut(&meta_id) else {
    114                 return;
    115             };
    116 
    117             let scope_depth = scopes.len().saturating_sub(1);
    118             let Some(unsub_outcome) = (match return_type {
    119                 ReturnType::Drag => unsubscribe_drag(scopes, ndb, id),
    120                 ReturnType::Click => unsubscribe_click(scopes, ndb, id),
    121             }) else {
    122                 return;
    123             };
    124 
    125             if scopes.is_empty() {
    126                 account_subs.scopes.remove(&meta_id);
    127             }
    128 
    129             tracing::debug!(
    130                 "unsub stats: account={:?}, num locals: {}, released owner: {}",
    131                 account_pk,
    132                 account_subs.scopes.len(),
    133                 matches!(unsub_outcome, UnsubscribeOutcome::DropOwner(_)),
    134             );
    135 
    136             (
    137                 match unsub_outcome {
    138                     UnsubscribeOutcome::KeepOwner => None,
    139                     UnsubscribeOutcome::DropOwner(root_id) => Some(thread_scope_owner_key(
    140                         account_pk,
    141                         meta_id,
    142                         &root_id,
    143                         scope_depth,
    144                     )),
    145                 },
    146                 account_subs.scopes.is_empty(),
    147             )
    148         };
    149 
    150         if remove_account_entry {
    151             self.by_account.remove(&account_pk);
    152         }
    153 
    154         if let Some(owner) = owner_to_drop {
    155             let _ = scoped_subs.drop_owner(owner);
    156         }
    157     }
    158 
    159     pub fn get_local(&self, account_pk: &Pubkey, meta_id: usize) -> Option<&Subscription> {
    160         self.by_account
    161             .get(account_pk)?
    162             .scopes
    163             .get(&meta_id)
    164             .and_then(|s| s.last())
    165             .and_then(|s| s.stack.last())
    166             .map(|s| &s.sub)
    167     }
    168 
    169     pub fn get_local_for_selected<'a>(
    170         &'a self,
    171         accounts: &Accounts,
    172         meta_id: usize,
    173     ) -> Option<&'a Subscription> {
    174         self.get_local(accounts.selected_account_pubkey(), meta_id)
    175     }
    176 }
    177 
    178 fn unsubscribe_drag(
    179     scopes: &mut Vec<Scope>,
    180     ndb: &mut Ndb,
    181     id: &ThreadSelection,
    182 ) -> Option<UnsubscribeOutcome> {
    183     let Some(scope) = scopes.last_mut() else {
    184         tracing::error!("called drag unsubscribe but there aren't any scopes left");
    185         return None;
    186     };
    187 
    188     let Some(cur_sub) = scope.stack.pop() else {
    189         tracing::error!("expected a scope to be left");
    190         return None;
    191     };
    192 
    193     log_scope_root_mismatch(scope, id);
    194 
    195     if !ndb_unsub(ndb, cur_sub.sub, id) {
    196         // Keep local bookkeeping aligned with NDB when unsubscribe fails.
    197         scope.stack.push(cur_sub);
    198         return None;
    199     }
    200 
    201     if scope.stack.is_empty() {
    202         let removed_scope = scopes.pop().expect("checked empty above");
    203         return Some(UnsubscribeOutcome::DropOwner(removed_scope.root_id));
    204     }
    205 
    206     Some(UnsubscribeOutcome::KeepOwner)
    207 }
    208 
    209 fn unsubscribe_click(
    210     scopes: &mut Vec<Scope>,
    211     ndb: &mut Ndb,
    212     id: &ThreadSelection,
    213 ) -> Option<UnsubscribeOutcome> {
    214     let Some(mut scope) = scopes.pop() else {
    215         tracing::error!("called unsubscribe but there aren't any scopes left");
    216         return None;
    217     };
    218 
    219     log_scope_root_mismatch(&scope, id);
    220     while let Some(sub) = scope.stack.pop() {
    221         if ndb_unsub(ndb, sub.sub, id) {
    222             continue;
    223         }
    224 
    225         // Partial rollback: restore the failed local sub (and any remaining ones)
    226         // to thread bookkeeping and keep the remote owner alive.
    227         scope.stack.push(sub);
    228         scopes.push(scope);
    229         return None;
    230     }
    231     Some(UnsubscribeOutcome::DropOwner(scope.root_id))
    232 }
    233 
    234 fn log_scope_root_mismatch(scope: &Scope, id: &ThreadSelection) {
    235     if scope.root_id.bytes() != id.root_id.bytes() {
    236         tracing::error!(
    237             "Somehow the current scope's root is not equal to the selected note's root. scope's root: {:?}, thread's root: {:?}",
    238             scope.root_id.hex(),
    239             id.root_id.bytes()
    240         );
    241     }
    242 }
    243 
    244 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
    245 enum ThreadScopedSub {
    246     RepliesByRoot,
    247 }
    248 
    249 fn thread_remote_sub_key(root_id: &RootNoteId) -> SubKey {
    250     SubKey::builder(ThreadScopedSub::RepliesByRoot)
    251         .with(*root_id.bytes())
    252         .finish()
    253 }
    254 
    255 fn sub_current_scope(
    256     ndb: &mut Ndb,
    257     selection: &ThreadSelection,
    258     local_sub_filter: Vec<Filter>,
    259     cur_scope: &mut Scope,
    260 ) -> bool {
    261     if selection.root_id.bytes() != cur_scope.root_id.bytes() {
    262         tracing::error!(
    263             "Somehow the current scope's root is not equal to the selected note's root"
    264         );
    265     }
    266 
    267     if let Some(sub) = ndb_sub(ndb, &local_sub_filter, selection) {
    268         cur_scope.stack.push(Sub {
    269             _selected_id: NoteId::new(*selection.selected_or_root()),
    270             sub,
    271             _filters: local_sub_filter,
    272         });
    273         return true;
    274     }
    275 
    276     false
    277 }
    278 
    279 fn sub_remote(
    280     scoped_subs: &mut ScopedSubApi<'_, '_>,
    281     owner: SubOwnerKey,
    282     key: SubKey,
    283     filter: Vec<Filter>,
    284     id: impl std::fmt::Debug,
    285 ) {
    286     tracing::debug!("Remote subscribe for {:?}", id);
    287 
    288     let identity = ScopedSubIdentity::account(owner, key);
    289     let config = SubConfig {
    290         relays: RelaySelection::AccountsRead,
    291         filters: filter,
    292         use_transparent: false,
    293     };
    294     let _ = scoped_subs.ensure_sub(identity, config);
    295 }
    296 
    297 #[allow(clippy::too_many_arguments)]
    298 fn local_sub_new_scope(
    299     ndb: &mut Ndb,
    300     scoped_subs: &mut ScopedSubApi<'_, '_>,
    301     account_pk: Pubkey,
    302     meta_id: usize,
    303     id: &ThreadSelection,
    304     local_sub_filter: Vec<Filter>,
    305     remote_sub_filter: Vec<Filter>,
    306     scopes: &mut Vec<Scope>,
    307 ) -> bool {
    308     let root_id = id.root_id.to_note_id();
    309     let scope_depth = scopes.len();
    310     let owner = thread_scope_owner_key(account_pk, meta_id, &root_id, scope_depth);
    311     tracing::info!(
    312         "thread sub with owner: pk: {account_pk:?}, col: {meta_id}, rootid: {root_id:?}, depth: {scope_depth}"
    313     );
    314     sub_remote(
    315         scoped_subs,
    316         owner,
    317         thread_remote_sub_key(&root_id),
    318         remote_sub_filter,
    319         id,
    320     );
    321 
    322     let Some(sub) = ndb_sub(ndb, &local_sub_filter, id) else {
    323         let _ = scoped_subs.drop_owner(owner);
    324         return false;
    325     };
    326 
    327     scopes.push(Scope {
    328         root_id,
    329         stack: vec![Sub {
    330             _selected_id: NoteId::new(*id.selected_or_root()),
    331             sub,
    332             _filters: local_sub_filter,
    333         }],
    334     });
    335 
    336     true
    337 }