loader.rs (6741B)
1 //! Background loader for Messages NostrDB queries. 2 3 use crossbeam_channel as chan; 4 use enostr::Pubkey; 5 use nostrdb::{Filter, Ndb, NoteKey, Transaction}; 6 7 use notedeck::AsyncLoader; 8 9 use crate::{ 10 cache::ConversationId, 11 nip17::{chatroom_filter, conversation_filter}, 12 }; 13 14 const FOLD_BATCH_SIZE: usize = 100; 15 16 /// Commands sent to the messages loader worker thread. 17 pub enum LoaderCmd { 18 /// Load conversation list note keys for the current account. 19 LoadConversationList { account_pubkey: Pubkey }, 20 /// Load note keys for a specific conversation. 21 LoadConversationMessages { 22 conversation_id: ConversationId, 23 participants: Vec<Pubkey>, 24 me: Pubkey, 25 }, 26 } 27 28 /// Messages emitted by the loader worker thread. 29 pub enum LoaderMsg { 30 /// Batch of conversation list note keys. 31 ConversationBatch(Vec<NoteKey>), 32 /// Conversation list load finished. 33 ConversationFinished, 34 /// Batch of note keys for a conversation. 35 ConversationMessagesBatch { 36 conversation_id: ConversationId, 37 keys: Vec<NoteKey>, 38 }, 39 /// Conversation messages load finished. 40 ConversationMessagesFinished { conversation_id: ConversationId }, 41 /// Loader error. 42 Failed(String), 43 } 44 45 /// Handle for driving the messages loader worker thread. 46 pub struct MessagesLoader { 47 loader: AsyncLoader<LoaderCmd, LoaderMsg>, 48 } 49 50 impl MessagesLoader { 51 /// Create an uninitialized loader handle. 52 pub fn new() -> Self { 53 Self { 54 loader: AsyncLoader::new(), 55 } 56 } 57 58 /// Start the loader workers if they have not been started yet. 59 pub fn start(&mut self, egui_ctx: egui::Context, ndb: Ndb) { 60 let _ = self 61 .loader 62 .start(egui_ctx, ndb, 1, "messages-loader", handle_cmd); 63 } 64 65 /// Request a conversation list load for the given account. 66 pub fn load_conversation_list(&self, account_pubkey: Pubkey) { 67 self.loader 68 .send(LoaderCmd::LoadConversationList { account_pubkey }); 69 } 70 71 /// Request a conversation message load for the given conversation. 72 pub fn load_conversation_messages( 73 &self, 74 conversation_id: ConversationId, 75 participants: Vec<Pubkey>, 76 me: Pubkey, 77 ) { 78 self.loader.send(LoaderCmd::LoadConversationMessages { 79 conversation_id, 80 participants, 81 me, 82 }); 83 } 84 85 /// Try to receive the next loader message without blocking. 86 pub fn try_recv(&self) -> Option<LoaderMsg> { 87 self.loader.try_recv() 88 } 89 } 90 91 impl Default for MessagesLoader { 92 fn default() -> Self { 93 Self::new() 94 } 95 } 96 97 #[derive(Clone, Copy)] 98 /// Internal fold target kind for note key batches. 99 enum FoldKind { 100 ConversationList, 101 ConversationMessages { conversation_id: ConversationId }, 102 } 103 104 /// Fold accumulator for batching note keys. 105 struct FoldAcc { 106 batch: Vec<NoteKey>, 107 msg_tx: chan::Sender<LoaderMsg>, 108 egui_ctx: egui::Context, 109 kind: FoldKind, 110 } 111 112 impl FoldAcc { 113 fn push_key(&mut self, key: NoteKey) -> Result<(), String> { 114 self.batch.push(key); 115 if self.batch.len() >= FOLD_BATCH_SIZE { 116 self.flush()?; 117 } 118 Ok(()) 119 } 120 121 fn flush(&mut self) -> Result<(), String> { 122 if self.batch.is_empty() { 123 return Ok(()); 124 } 125 126 let keys = std::mem::take(&mut self.batch); 127 let msg = match self.kind { 128 FoldKind::ConversationList => LoaderMsg::ConversationBatch(keys), 129 FoldKind::ConversationMessages { conversation_id } => { 130 LoaderMsg::ConversationMessagesBatch { 131 conversation_id, 132 keys, 133 } 134 } 135 }; 136 137 self.msg_tx 138 .send(msg) 139 .map_err(|_| "messages loader channel closed".to_string())?; 140 self.egui_ctx.request_repaint(); 141 Ok(()) 142 } 143 } 144 145 /// Run a conversation list load and stream note keys. 146 fn load_conversation_list( 147 egui_ctx: &egui::Context, 148 ndb: &Ndb, 149 msg_tx: &chan::Sender<LoaderMsg>, 150 account_pubkey: Pubkey, 151 ) -> Result<(), String> { 152 let filters = conversation_filter(&account_pubkey); 153 fold_note_keys(egui_ctx, ndb, msg_tx, &filters, FoldKind::ConversationList)?; 154 let _ = msg_tx.send(LoaderMsg::ConversationFinished); 155 egui_ctx.request_repaint(); 156 Ok(()) 157 } 158 159 /// Handle loader commands on a worker thread. 160 fn handle_cmd( 161 cmd: LoaderCmd, 162 egui_ctx: &egui::Context, 163 ndb: &Ndb, 164 msg_tx: &chan::Sender<LoaderMsg>, 165 ) { 166 let result = match cmd { 167 LoaderCmd::LoadConversationList { account_pubkey } => { 168 load_conversation_list(egui_ctx, ndb, msg_tx, account_pubkey) 169 } 170 LoaderCmd::LoadConversationMessages { 171 conversation_id, 172 participants, 173 me, 174 } => load_conversation_messages(egui_ctx, ndb, msg_tx, conversation_id, participants, me), 175 }; 176 177 if let Err(err) = result { 178 let _ = msg_tx.send(LoaderMsg::Failed(err)); 179 egui_ctx.request_repaint(); 180 } 181 } 182 183 /// Run a conversation messages load and stream note keys. 184 fn load_conversation_messages( 185 egui_ctx: &egui::Context, 186 ndb: &Ndb, 187 msg_tx: &chan::Sender<LoaderMsg>, 188 conversation_id: ConversationId, 189 participants: Vec<Pubkey>, 190 me: Pubkey, 191 ) -> Result<(), String> { 192 let participant_bytes: Vec<[u8; 32]> = participants.iter().map(|p| *p.bytes()).collect(); 193 let participant_refs: Vec<&[u8; 32]> = participant_bytes.iter().collect(); 194 let filters = chatroom_filter(participant_refs, me.bytes()); 195 196 fold_note_keys( 197 egui_ctx, 198 ndb, 199 msg_tx, 200 &filters, 201 FoldKind::ConversationMessages { conversation_id }, 202 )?; 203 204 let _ = msg_tx.send(LoaderMsg::ConversationMessagesFinished { conversation_id }); 205 egui_ctx.request_repaint(); 206 Ok(()) 207 } 208 209 /// Fold over NostrDB results and emit note key batches. 210 fn fold_note_keys( 211 egui_ctx: &egui::Context, 212 ndb: &Ndb, 213 msg_tx: &chan::Sender<LoaderMsg>, 214 filters: &[Filter], 215 kind: FoldKind, 216 ) -> Result<(), String> { 217 let txn = Transaction::new(ndb).map_err(|e| e.to_string())?; 218 219 let acc = FoldAcc { 220 batch: Vec::with_capacity(FOLD_BATCH_SIZE), 221 msg_tx: msg_tx.clone(), 222 egui_ctx: egui_ctx.clone(), 223 kind, 224 }; 225 226 let acc = ndb 227 .fold(&txn, filters, acc, |mut acc, note| { 228 if let Some(key) = note.key() { 229 if let Err(err) = acc.push_key(key) { 230 tracing::error!("messages loader flush error: {err}"); 231 } 232 } 233 acc 234 }) 235 .map_err(|e| e.to_string())?; 236 237 let mut acc = acc; 238 acc.flush()?; 239 Ok(()) 240 }