lib.rs (12096B)
1 pub mod cache; 2 pub mod convo_renderable; 3 pub mod loader; 4 pub mod nav; 5 pub mod nip17; 6 mod relay_ensure; 7 mod relay_prefetch; 8 pub mod ui; 9 10 use enostr::Pubkey; 11 use hashbrown::{HashMap, HashSet}; 12 use nav::{process_messages_ui_response, Route}; 13 use nostrdb::{Ndb, Subscription, Transaction}; 14 use notedeck::{ 15 ui::is_narrow, Accounts, App, AppContext, AppResponse, RemoteApi, Router, SubKey, SubOwnerKey, 16 }; 17 18 use crate::{ 19 cache::{ConversationCache, ConversationListState, ConversationStates}, 20 loader::{LoaderMsg, MessagesLoader}, 21 nip17::conversation_filter, 22 relay_ensure::ensure_selected_account_dm_list, 23 ui::{login_nsec_prompt, messages::messages_ui}, 24 }; 25 26 /// Max loader messages to process per frame to avoid UI stalls. 27 const MAX_LOADER_MSGS_PER_FRAME: usize = 8; 28 29 /// Messages application state and background loaders. 30 pub struct MessagesApp { 31 messages: ConversationsCtx, 32 states: ConversationStates, 33 router: Router<Route>, 34 loader: MessagesLoader, 35 inflight_messages: HashSet<cache::ConversationId>, 36 } 37 38 impl MessagesApp { 39 pub fn new() -> Self { 40 Self { 41 messages: ConversationsCtx::default(), 42 states: ConversationStates::default(), 43 router: Router::new(vec![Route::ConvoList]), 44 loader: MessagesLoader::new(), 45 inflight_messages: HashSet::new(), 46 } 47 } 48 } 49 50 impl Default for MessagesApp { 51 fn default() -> Self { 52 Self::new() 53 } 54 } 55 56 impl App for MessagesApp { 57 #[profiling::function] 58 fn update(&mut self, ctx: &mut AppContext<'_>, egui_ctx: &egui::Context) { 59 let Some(cache) = self.messages.get_current_mut(ctx.accounts) else { 60 return; 61 }; 62 63 self.loader.start(egui_ctx.clone(), ctx.ndb.clone()); 64 65 's: { 66 let Some(secret) = &ctx.accounts.get_selected_account().key.secret_key else { 67 break 's; 68 }; 69 70 ctx.ndb.add_key(&secret.secret_bytes()); 71 72 let giftwrap_ndb = ctx.ndb.clone(); 73 let r = std::thread::Builder::new() 74 .name("process_giftwraps".into()) 75 .spawn(move || { 76 let txn = Transaction::new(&giftwrap_ndb).expect("txn"); 77 // although the actual giftwrap processing happens on the ingestion pool, this 78 // function still looks up giftwraps to process on the main thread, which can 79 // cause a freeze. 80 // 81 // TODO(jb55): move the giftwrap query logic into the internal threadpool so we 82 // don't have to spawn a thread here 83 giftwrap_ndb.process_giftwraps(&txn); 84 }); 85 86 if let Err(err) = r { 87 tracing::error!("failed to spawn process_giftwraps thread: {err}"); 88 } 89 } 90 91 ensure_selected_account_dm_relay_list(ctx.ndb, &mut ctx.remote, ctx.accounts, cache); 92 93 match cache.state { 94 ConversationListState::Initializing => { 95 initialize(ctx, cache, is_narrow(egui_ctx), &self.loader); 96 } 97 ConversationListState::Loading { subscription } => { 98 if let Some(sub) = subscription { 99 update_initialized(ctx, cache, sub); 100 } 101 } 102 ConversationListState::Initialized(subscription) => 's: { 103 let Some(sub) = subscription else { 104 break 's; 105 }; 106 update_initialized(ctx, cache, sub); 107 } 108 } 109 110 handle_loader_messages( 111 ctx, 112 cache, 113 &self.loader, 114 &mut self.inflight_messages, 115 is_narrow(egui_ctx), 116 ); 117 } 118 119 #[profiling::function] 120 fn render(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse { 121 let Some(cache) = self.messages.get_current_mut(ctx.accounts) else { 122 login_nsec_prompt(ui, ctx.i18n); 123 return AppResponse::none(); 124 }; 125 126 let selected_pubkey = ctx.accounts.selected_account_pubkey(); 127 128 let contacts_state = ctx 129 .accounts 130 .get_selected_account() 131 .data 132 .contacts 133 .get_state(); 134 let resp = messages_ui( 135 cache, 136 &mut self.states, 137 ctx.media_jobs.sender(), 138 ctx.ndb, 139 selected_pubkey, 140 ui, 141 ctx.img_cache, 142 &self.router, 143 ctx.settings.get_settings_mut(), 144 contacts_state, 145 ctx.i18n, 146 ); 147 let action = process_messages_ui_response( 148 resp, 149 ctx, 150 cache, 151 &mut self.router, 152 is_narrow(ui.ctx()), 153 &self.loader, 154 &mut self.inflight_messages, 155 ); 156 157 AppResponse::action(action) 158 } 159 } 160 161 /// Start the conversation list loader and subscription for the active account. 162 #[profiling::function] 163 fn initialize( 164 ctx: &mut AppContext, 165 cache: &mut ConversationCache, 166 is_narrow: bool, 167 loader: &MessagesLoader, 168 ) { 169 let sub = match ctx 170 .ndb 171 .subscribe(&conversation_filter(ctx.accounts.selected_account_pubkey())) 172 { 173 Ok(sub) => Some(sub), 174 Err(e) => { 175 tracing::error!("couldn't sub ndb: {e}"); 176 None 177 } 178 }; 179 180 loader.load_conversation_list(*ctx.accounts.selected_account_pubkey()); 181 cache.state = ConversationListState::Loading { subscription: sub }; 182 183 if !is_narrow { 184 cache.active = None; 185 } 186 } 187 188 /// Poll the live subscription for new conversation notes. 189 #[profiling::function] 190 fn update_initialized(ctx: &mut AppContext, cache: &mut ConversationCache, sub: Subscription) { 191 let notes = ctx.ndb.poll_for_notes(sub, 10); 192 let txn = Transaction::new(ctx.ndb).expect("txn"); 193 for key in notes { 194 let note = match ctx.ndb.get_note_by_key(&txn, key) { 195 Ok(n) => n, 196 Err(e) => { 197 tracing::error!("could not find note key: {e}"); 198 continue; 199 } 200 }; 201 cache.ingest_chatroom_msg(note, key, ctx.ndb, &txn, ctx.note_cache, ctx.unknown_ids); 202 } 203 } 204 205 /// Drain loader messages and apply updates to the conversation cache. 206 #[profiling::function] 207 fn handle_loader_messages( 208 ctx: &mut AppContext<'_>, 209 cache: &mut ConversationCache, 210 loader: &MessagesLoader, 211 inflight_messages: &mut HashSet<cache::ConversationId>, 212 is_narrow: bool, 213 ) { 214 let mut handled = 0; 215 while handled < MAX_LOADER_MSGS_PER_FRAME { 216 let Some(msg) = loader.try_recv() else { 217 break; 218 }; 219 handled += 1; 220 221 match msg { 222 LoaderMsg::ConversationBatch(keys) => { 223 ingest_note_keys(ctx, cache, &keys); 224 } 225 LoaderMsg::ConversationFinished => { 226 let current = 227 std::mem::replace(&mut cache.state, ConversationListState::Initializing); 228 cache.state = match current { 229 ConversationListState::Loading { subscription } => { 230 ConversationListState::Initialized(subscription) 231 } 232 other => other, 233 }; 234 235 if cache.active.is_none() && !is_narrow { 236 if let Some(first) = cache.first_convo_id() { 237 open_conversation_with_prefetch( 238 &mut ctx.remote, 239 ctx.accounts, 240 cache, 241 first, 242 ); 243 request_conversation_messages( 244 cache, 245 ctx.accounts.selected_account_pubkey(), 246 first, 247 loader, 248 inflight_messages, 249 ); 250 } 251 } 252 } 253 LoaderMsg::ConversationMessagesBatch { keys, .. } => { 254 ingest_note_keys(ctx, cache, &keys); 255 } 256 LoaderMsg::ConversationMessagesFinished { conversation_id } => { 257 inflight_messages.remove(&conversation_id); 258 } 259 LoaderMsg::Failed(err) => { 260 tracing::error!("messages loader error: {err}"); 261 } 262 } 263 } 264 } 265 266 /// Lookup note keys in NostrDB and ingest them into the conversation cache. 267 fn ingest_note_keys( 268 ctx: &mut AppContext<'_>, 269 cache: &mut ConversationCache, 270 keys: &[nostrdb::NoteKey], 271 ) { 272 let txn = Transaction::new(ctx.ndb).expect("txn"); 273 for key in keys { 274 let note = match ctx.ndb.get_note_by_key(&txn, *key) { 275 Ok(n) => n, 276 Err(e) => { 277 tracing::error!("could not find note key: {e}"); 278 continue; 279 } 280 }; 281 cache.ingest_chatroom_msg(note, *key, ctx.ndb, &txn, ctx.note_cache, ctx.unknown_ids); 282 } 283 } 284 285 /// Schedule a background load for a conversation's message history. 286 fn request_conversation_messages( 287 cache: &ConversationCache, 288 me: &Pubkey, 289 conversation_id: cache::ConversationId, 290 loader: &MessagesLoader, 291 inflight_messages: &mut HashSet<cache::ConversationId>, 292 ) { 293 if inflight_messages.contains(&conversation_id) { 294 return; 295 } 296 297 let Some(conversation) = cache.get(conversation_id) else { 298 return; 299 }; 300 301 inflight_messages.insert(conversation_id); 302 loader.load_conversation_messages( 303 conversation_id, 304 conversation.metadata.participants.clone(), 305 *me, 306 ); 307 } 308 309 /// Scoped-sub owner namespace for messages DM relay-list lifecycles. 310 #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] 311 enum RelayListOwner { 312 Prefetch, 313 Ensure, 314 } 315 316 const RELAY_LIST_KEY: &str = "dm_relay_list"; 317 318 /// Stable owner for DM relay-list prefetch subscriptions per selected account. 319 fn list_prefetch_owner_key(account_pk: Pubkey) -> SubOwnerKey { 320 SubOwnerKey::builder(RelayListOwner::Prefetch) 321 .with(account_pk) 322 .finish() 323 } 324 325 /// Stable owner for selected-account DM relay-list ensure subscriptions per selected account. 326 fn list_ensure_owner_key(account_pk: Pubkey) -> SubOwnerKey { 327 SubOwnerKey::builder(RelayListOwner::Ensure) 328 .with(account_pk) 329 .finish() 330 } 331 332 /// Stable key for one participant's DM relay-list remote stream. 333 pub fn list_fetch_sub_key(participant: &Pubkey) -> SubKey { 334 SubKey::builder(RELAY_LIST_KEY) 335 .with(*participant.bytes()) 336 .finish() 337 } 338 339 #[profiling::function] 340 pub(crate) fn ensure_selected_account_dm_relay_list( 341 ndb: &mut Ndb, 342 remote: &mut RemoteApi<'_>, 343 accounts: &Accounts, 344 cache: &mut ConversationCache, 345 ) { 346 ensure_selected_account_dm_list(ndb, remote, accounts, cache.dm_relay_list_ensure_mut()) 347 } 348 349 /// Marks a conversation active and ensures participant relay-list prefetch. 350 #[profiling::function] 351 pub(crate) fn open_conversation_with_prefetch( 352 remote: &mut RemoteApi<'_>, 353 accounts: &Accounts, 354 cache: &mut ConversationCache, 355 conversation_id: cache::ConversationId, 356 ) { 357 cache.active = Some(conversation_id); 358 relay_prefetch::ensure_conversation_prefetch(remote, accounts, cache, conversation_id); 359 } 360 361 /// Storage for conversations per account. Account management is performed by `Accounts` 362 #[derive(Default)] 363 struct ConversationsCtx { 364 convos_per_acc: HashMap<Pubkey, ConversationCache>, 365 } 366 367 impl ConversationsCtx { 368 /// Get the conversation cache for the selected account. Return None if we don't have a full kp 369 pub fn get_current_mut(&mut self, accounts: &Accounts) -> Option<&mut ConversationCache> { 370 accounts.get_selected_account().keypair().secret_key?; 371 372 let current = accounts.selected_account_pubkey(); 373 Some( 374 self.convos_per_acc 375 .raw_entry_mut() 376 .from_key(current) 377 .or_insert_with(|| (*current, ConversationCache::new())) 378 .1, 379 ) 380 } 381 }