notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

lib.rs (12096B)


      1 pub mod cache;
      2 pub mod convo_renderable;
      3 pub mod loader;
      4 pub mod nav;
      5 pub mod nip17;
      6 mod relay_ensure;
      7 mod relay_prefetch;
      8 pub mod ui;
      9 
     10 use enostr::Pubkey;
     11 use hashbrown::{HashMap, HashSet};
     12 use nav::{process_messages_ui_response, Route};
     13 use nostrdb::{Ndb, Subscription, Transaction};
     14 use notedeck::{
     15     ui::is_narrow, Accounts, App, AppContext, AppResponse, RemoteApi, Router, SubKey, SubOwnerKey,
     16 };
     17 
     18 use crate::{
     19     cache::{ConversationCache, ConversationListState, ConversationStates},
     20     loader::{LoaderMsg, MessagesLoader},
     21     nip17::conversation_filter,
     22     relay_ensure::ensure_selected_account_dm_list,
     23     ui::{login_nsec_prompt, messages::messages_ui},
     24 };
     25 
     26 /// Max loader messages to process per frame to avoid UI stalls.
     27 const MAX_LOADER_MSGS_PER_FRAME: usize = 8;
     28 
     29 /// Messages application state and background loaders.
     30 pub struct MessagesApp {
     31     messages: ConversationsCtx,
     32     states: ConversationStates,
     33     router: Router<Route>,
     34     loader: MessagesLoader,
     35     inflight_messages: HashSet<cache::ConversationId>,
     36 }
     37 
     38 impl MessagesApp {
     39     pub fn new() -> Self {
     40         Self {
     41             messages: ConversationsCtx::default(),
     42             states: ConversationStates::default(),
     43             router: Router::new(vec![Route::ConvoList]),
     44             loader: MessagesLoader::new(),
     45             inflight_messages: HashSet::new(),
     46         }
     47     }
     48 }
     49 
     50 impl Default for MessagesApp {
     51     fn default() -> Self {
     52         Self::new()
     53     }
     54 }
     55 
     56 impl App for MessagesApp {
     57     #[profiling::function]
     58     fn update(&mut self, ctx: &mut AppContext<'_>, egui_ctx: &egui::Context) {
     59         let Some(cache) = self.messages.get_current_mut(ctx.accounts) else {
     60             return;
     61         };
     62 
     63         self.loader.start(egui_ctx.clone(), ctx.ndb.clone());
     64 
     65         's: {
     66             let Some(secret) = &ctx.accounts.get_selected_account().key.secret_key else {
     67                 break 's;
     68             };
     69 
     70             ctx.ndb.add_key(&secret.secret_bytes());
     71 
     72             let giftwrap_ndb = ctx.ndb.clone();
     73             let r = std::thread::Builder::new()
     74                 .name("process_giftwraps".into())
     75                 .spawn(move || {
     76                     let txn = Transaction::new(&giftwrap_ndb).expect("txn");
     77                     // although the actual giftwrap processing happens on the ingestion pool, this
     78                     // function still looks up giftwraps to process on the main thread, which can
     79                     // cause a freeze.
     80                     //
     81                     // TODO(jb55): move the giftwrap query logic into the internal threadpool so we
     82                     // don't have to spawn a thread here
     83                     giftwrap_ndb.process_giftwraps(&txn);
     84                 });
     85 
     86             if let Err(err) = r {
     87                 tracing::error!("failed to spawn process_giftwraps thread: {err}");
     88             }
     89         }
     90 
     91         ensure_selected_account_dm_relay_list(ctx.ndb, &mut ctx.remote, ctx.accounts, cache);
     92 
     93         match cache.state {
     94             ConversationListState::Initializing => {
     95                 initialize(ctx, cache, is_narrow(egui_ctx), &self.loader);
     96             }
     97             ConversationListState::Loading { subscription } => {
     98                 if let Some(sub) = subscription {
     99                     update_initialized(ctx, cache, sub);
    100                 }
    101             }
    102             ConversationListState::Initialized(subscription) => 's: {
    103                 let Some(sub) = subscription else {
    104                     break 's;
    105                 };
    106                 update_initialized(ctx, cache, sub);
    107             }
    108         }
    109 
    110         handle_loader_messages(
    111             ctx,
    112             cache,
    113             &self.loader,
    114             &mut self.inflight_messages,
    115             is_narrow(egui_ctx),
    116         );
    117     }
    118 
    119     #[profiling::function]
    120     fn render(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse {
    121         let Some(cache) = self.messages.get_current_mut(ctx.accounts) else {
    122             login_nsec_prompt(ui, ctx.i18n);
    123             return AppResponse::none();
    124         };
    125 
    126         let selected_pubkey = ctx.accounts.selected_account_pubkey();
    127 
    128         let contacts_state = ctx
    129             .accounts
    130             .get_selected_account()
    131             .data
    132             .contacts
    133             .get_state();
    134         let resp = messages_ui(
    135             cache,
    136             &mut self.states,
    137             ctx.media_jobs.sender(),
    138             ctx.ndb,
    139             selected_pubkey,
    140             ui,
    141             ctx.img_cache,
    142             &self.router,
    143             ctx.settings.get_settings_mut(),
    144             contacts_state,
    145             ctx.i18n,
    146         );
    147         let action = process_messages_ui_response(
    148             resp,
    149             ctx,
    150             cache,
    151             &mut self.router,
    152             is_narrow(ui.ctx()),
    153             &self.loader,
    154             &mut self.inflight_messages,
    155         );
    156 
    157         AppResponse::action(action)
    158     }
    159 }
    160 
    161 /// Start the conversation list loader and subscription for the active account.
    162 #[profiling::function]
    163 fn initialize(
    164     ctx: &mut AppContext,
    165     cache: &mut ConversationCache,
    166     is_narrow: bool,
    167     loader: &MessagesLoader,
    168 ) {
    169     let sub = match ctx
    170         .ndb
    171         .subscribe(&conversation_filter(ctx.accounts.selected_account_pubkey()))
    172     {
    173         Ok(sub) => Some(sub),
    174         Err(e) => {
    175             tracing::error!("couldn't sub ndb: {e}");
    176             None
    177         }
    178     };
    179 
    180     loader.load_conversation_list(*ctx.accounts.selected_account_pubkey());
    181     cache.state = ConversationListState::Loading { subscription: sub };
    182 
    183     if !is_narrow {
    184         cache.active = None;
    185     }
    186 }
    187 
    188 /// Poll the live subscription for new conversation notes.
    189 #[profiling::function]
    190 fn update_initialized(ctx: &mut AppContext, cache: &mut ConversationCache, sub: Subscription) {
    191     let notes = ctx.ndb.poll_for_notes(sub, 10);
    192     let txn = Transaction::new(ctx.ndb).expect("txn");
    193     for key in notes {
    194         let note = match ctx.ndb.get_note_by_key(&txn, key) {
    195             Ok(n) => n,
    196             Err(e) => {
    197                 tracing::error!("could not find note key: {e}");
    198                 continue;
    199             }
    200         };
    201         cache.ingest_chatroom_msg(note, key, ctx.ndb, &txn, ctx.note_cache, ctx.unknown_ids);
    202     }
    203 }
    204 
    205 /// Drain loader messages and apply updates to the conversation cache.
    206 #[profiling::function]
    207 fn handle_loader_messages(
    208     ctx: &mut AppContext<'_>,
    209     cache: &mut ConversationCache,
    210     loader: &MessagesLoader,
    211     inflight_messages: &mut HashSet<cache::ConversationId>,
    212     is_narrow: bool,
    213 ) {
    214     let mut handled = 0;
    215     while handled < MAX_LOADER_MSGS_PER_FRAME {
    216         let Some(msg) = loader.try_recv() else {
    217             break;
    218         };
    219         handled += 1;
    220 
    221         match msg {
    222             LoaderMsg::ConversationBatch(keys) => {
    223                 ingest_note_keys(ctx, cache, &keys);
    224             }
    225             LoaderMsg::ConversationFinished => {
    226                 let current =
    227                     std::mem::replace(&mut cache.state, ConversationListState::Initializing);
    228                 cache.state = match current {
    229                     ConversationListState::Loading { subscription } => {
    230                         ConversationListState::Initialized(subscription)
    231                     }
    232                     other => other,
    233                 };
    234 
    235                 if cache.active.is_none() && !is_narrow {
    236                     if let Some(first) = cache.first_convo_id() {
    237                         open_conversation_with_prefetch(
    238                             &mut ctx.remote,
    239                             ctx.accounts,
    240                             cache,
    241                             first,
    242                         );
    243                         request_conversation_messages(
    244                             cache,
    245                             ctx.accounts.selected_account_pubkey(),
    246                             first,
    247                             loader,
    248                             inflight_messages,
    249                         );
    250                     }
    251                 }
    252             }
    253             LoaderMsg::ConversationMessagesBatch { keys, .. } => {
    254                 ingest_note_keys(ctx, cache, &keys);
    255             }
    256             LoaderMsg::ConversationMessagesFinished { conversation_id } => {
    257                 inflight_messages.remove(&conversation_id);
    258             }
    259             LoaderMsg::Failed(err) => {
    260                 tracing::error!("messages loader error: {err}");
    261             }
    262         }
    263     }
    264 }
    265 
    266 /// Lookup note keys in NostrDB and ingest them into the conversation cache.
    267 fn ingest_note_keys(
    268     ctx: &mut AppContext<'_>,
    269     cache: &mut ConversationCache,
    270     keys: &[nostrdb::NoteKey],
    271 ) {
    272     let txn = Transaction::new(ctx.ndb).expect("txn");
    273     for key in keys {
    274         let note = match ctx.ndb.get_note_by_key(&txn, *key) {
    275             Ok(n) => n,
    276             Err(e) => {
    277                 tracing::error!("could not find note key: {e}");
    278                 continue;
    279             }
    280         };
    281         cache.ingest_chatroom_msg(note, *key, ctx.ndb, &txn, ctx.note_cache, ctx.unknown_ids);
    282     }
    283 }
    284 
    285 /// Schedule a background load for a conversation's message history.
    286 fn request_conversation_messages(
    287     cache: &ConversationCache,
    288     me: &Pubkey,
    289     conversation_id: cache::ConversationId,
    290     loader: &MessagesLoader,
    291     inflight_messages: &mut HashSet<cache::ConversationId>,
    292 ) {
    293     if inflight_messages.contains(&conversation_id) {
    294         return;
    295     }
    296 
    297     let Some(conversation) = cache.get(conversation_id) else {
    298         return;
    299     };
    300 
    301     inflight_messages.insert(conversation_id);
    302     loader.load_conversation_messages(
    303         conversation_id,
    304         conversation.metadata.participants.clone(),
    305         *me,
    306     );
    307 }
    308 
    309 /// Scoped-sub owner namespace for messages DM relay-list lifecycles.
    310 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
    311 enum RelayListOwner {
    312     Prefetch,
    313     Ensure,
    314 }
    315 
    316 const RELAY_LIST_KEY: &str = "dm_relay_list";
    317 
    318 /// Stable owner for DM relay-list prefetch subscriptions per selected account.
    319 fn list_prefetch_owner_key(account_pk: Pubkey) -> SubOwnerKey {
    320     SubOwnerKey::builder(RelayListOwner::Prefetch)
    321         .with(account_pk)
    322         .finish()
    323 }
    324 
    325 /// Stable owner for selected-account DM relay-list ensure subscriptions per selected account.
    326 fn list_ensure_owner_key(account_pk: Pubkey) -> SubOwnerKey {
    327     SubOwnerKey::builder(RelayListOwner::Ensure)
    328         .with(account_pk)
    329         .finish()
    330 }
    331 
    332 /// Stable key for one participant's DM relay-list remote stream.
    333 pub fn list_fetch_sub_key(participant: &Pubkey) -> SubKey {
    334     SubKey::builder(RELAY_LIST_KEY)
    335         .with(*participant.bytes())
    336         .finish()
    337 }
    338 
    339 #[profiling::function]
    340 pub(crate) fn ensure_selected_account_dm_relay_list(
    341     ndb: &mut Ndb,
    342     remote: &mut RemoteApi<'_>,
    343     accounts: &Accounts,
    344     cache: &mut ConversationCache,
    345 ) {
    346     ensure_selected_account_dm_list(ndb, remote, accounts, cache.dm_relay_list_ensure_mut())
    347 }
    348 
    349 /// Marks a conversation active and ensures participant relay-list prefetch.
    350 #[profiling::function]
    351 pub(crate) fn open_conversation_with_prefetch(
    352     remote: &mut RemoteApi<'_>,
    353     accounts: &Accounts,
    354     cache: &mut ConversationCache,
    355     conversation_id: cache::ConversationId,
    356 ) {
    357     cache.active = Some(conversation_id);
    358     relay_prefetch::ensure_conversation_prefetch(remote, accounts, cache, conversation_id);
    359 }
    360 
    361 /// Storage for conversations per account. Account management is performed by `Accounts`
    362 #[derive(Default)]
    363 struct ConversationsCtx {
    364     convos_per_acc: HashMap<Pubkey, ConversationCache>,
    365 }
    366 
    367 impl ConversationsCtx {
    368     /// Get the conversation cache for the selected account. Return None if we don't have a full kp
    369     pub fn get_current_mut(&mut self, accounts: &Accounts) -> Option<&mut ConversationCache> {
    370         accounts.get_selected_account().keypair().secret_key?;
    371 
    372         let current = accounts.selected_account_pubkey();
    373         Some(
    374             self.convos_per_acc
    375                 .raw_entry_mut()
    376                 .from_key(current)
    377                 .or_insert_with(|| (*current, ConversationCache::new()))
    378                 .1,
    379         )
    380     }
    381 }