notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

loader.rs (6741B)


      1 //! Background loader for Messages NostrDB queries.
      2 
      3 use crossbeam_channel as chan;
      4 use enostr::Pubkey;
      5 use nostrdb::{Filter, Ndb, NoteKey, Transaction};
      6 
      7 use notedeck::AsyncLoader;
      8 
      9 use crate::{
     10     cache::ConversationId,
     11     nip17::{chatroom_filter, conversation_filter},
     12 };
     13 
     14 const FOLD_BATCH_SIZE: usize = 100;
     15 
     16 /// Commands sent to the messages loader worker thread.
     17 pub enum LoaderCmd {
     18     /// Load conversation list note keys for the current account.
     19     LoadConversationList { account_pubkey: Pubkey },
     20     /// Load note keys for a specific conversation.
     21     LoadConversationMessages {
     22         conversation_id: ConversationId,
     23         participants: Vec<Pubkey>,
     24         me: Pubkey,
     25     },
     26 }
     27 
     28 /// Messages emitted by the loader worker thread.
     29 pub enum LoaderMsg {
     30     /// Batch of conversation list note keys.
     31     ConversationBatch(Vec<NoteKey>),
     32     /// Conversation list load finished.
     33     ConversationFinished,
     34     /// Batch of note keys for a conversation.
     35     ConversationMessagesBatch {
     36         conversation_id: ConversationId,
     37         keys: Vec<NoteKey>,
     38     },
     39     /// Conversation messages load finished.
     40     ConversationMessagesFinished { conversation_id: ConversationId },
     41     /// Loader error.
     42     Failed(String),
     43 }
     44 
     45 /// Handle for driving the messages loader worker thread.
     46 pub struct MessagesLoader {
     47     loader: AsyncLoader<LoaderCmd, LoaderMsg>,
     48 }
     49 
     50 impl MessagesLoader {
     51     /// Create an uninitialized loader handle.
     52     pub fn new() -> Self {
     53         Self {
     54             loader: AsyncLoader::new(),
     55         }
     56     }
     57 
     58     /// Start the loader workers if they have not been started yet.
     59     pub fn start(&mut self, egui_ctx: egui::Context, ndb: Ndb) {
     60         let _ = self
     61             .loader
     62             .start(egui_ctx, ndb, 1, "messages-loader", handle_cmd);
     63     }
     64 
     65     /// Request a conversation list load for the given account.
     66     pub fn load_conversation_list(&self, account_pubkey: Pubkey) {
     67         self.loader
     68             .send(LoaderCmd::LoadConversationList { account_pubkey });
     69     }
     70 
     71     /// Request a conversation message load for the given conversation.
     72     pub fn load_conversation_messages(
     73         &self,
     74         conversation_id: ConversationId,
     75         participants: Vec<Pubkey>,
     76         me: Pubkey,
     77     ) {
     78         self.loader.send(LoaderCmd::LoadConversationMessages {
     79             conversation_id,
     80             participants,
     81             me,
     82         });
     83     }
     84 
     85     /// Try to receive the next loader message without blocking.
     86     pub fn try_recv(&self) -> Option<LoaderMsg> {
     87         self.loader.try_recv()
     88     }
     89 }
     90 
     91 impl Default for MessagesLoader {
     92     fn default() -> Self {
     93         Self::new()
     94     }
     95 }
     96 
     97 #[derive(Clone, Copy)]
     98 /// Internal fold target kind for note key batches.
     99 enum FoldKind {
    100     ConversationList,
    101     ConversationMessages { conversation_id: ConversationId },
    102 }
    103 
    104 /// Fold accumulator for batching note keys.
    105 struct FoldAcc {
    106     batch: Vec<NoteKey>,
    107     msg_tx: chan::Sender<LoaderMsg>,
    108     egui_ctx: egui::Context,
    109     kind: FoldKind,
    110 }
    111 
    112 impl FoldAcc {
    113     fn push_key(&mut self, key: NoteKey) -> Result<(), String> {
    114         self.batch.push(key);
    115         if self.batch.len() >= FOLD_BATCH_SIZE {
    116             self.flush()?;
    117         }
    118         Ok(())
    119     }
    120 
    121     fn flush(&mut self) -> Result<(), String> {
    122         if self.batch.is_empty() {
    123             return Ok(());
    124         }
    125 
    126         let keys = std::mem::take(&mut self.batch);
    127         let msg = match self.kind {
    128             FoldKind::ConversationList => LoaderMsg::ConversationBatch(keys),
    129             FoldKind::ConversationMessages { conversation_id } => {
    130                 LoaderMsg::ConversationMessagesBatch {
    131                     conversation_id,
    132                     keys,
    133                 }
    134             }
    135         };
    136 
    137         self.msg_tx
    138             .send(msg)
    139             .map_err(|_| "messages loader channel closed".to_string())?;
    140         self.egui_ctx.request_repaint();
    141         Ok(())
    142     }
    143 }
    144 
    145 /// Run a conversation list load and stream note keys.
    146 fn load_conversation_list(
    147     egui_ctx: &egui::Context,
    148     ndb: &Ndb,
    149     msg_tx: &chan::Sender<LoaderMsg>,
    150     account_pubkey: Pubkey,
    151 ) -> Result<(), String> {
    152     let filters = conversation_filter(&account_pubkey);
    153     fold_note_keys(egui_ctx, ndb, msg_tx, &filters, FoldKind::ConversationList)?;
    154     let _ = msg_tx.send(LoaderMsg::ConversationFinished);
    155     egui_ctx.request_repaint();
    156     Ok(())
    157 }
    158 
    159 /// Handle loader commands on a worker thread.
    160 fn handle_cmd(
    161     cmd: LoaderCmd,
    162     egui_ctx: &egui::Context,
    163     ndb: &Ndb,
    164     msg_tx: &chan::Sender<LoaderMsg>,
    165 ) {
    166     let result = match cmd {
    167         LoaderCmd::LoadConversationList { account_pubkey } => {
    168             load_conversation_list(egui_ctx, ndb, msg_tx, account_pubkey)
    169         }
    170         LoaderCmd::LoadConversationMessages {
    171             conversation_id,
    172             participants,
    173             me,
    174         } => load_conversation_messages(egui_ctx, ndb, msg_tx, conversation_id, participants, me),
    175     };
    176 
    177     if let Err(err) = result {
    178         let _ = msg_tx.send(LoaderMsg::Failed(err));
    179         egui_ctx.request_repaint();
    180     }
    181 }
    182 
    183 /// Run a conversation messages load and stream note keys.
    184 fn load_conversation_messages(
    185     egui_ctx: &egui::Context,
    186     ndb: &Ndb,
    187     msg_tx: &chan::Sender<LoaderMsg>,
    188     conversation_id: ConversationId,
    189     participants: Vec<Pubkey>,
    190     me: Pubkey,
    191 ) -> Result<(), String> {
    192     let participant_bytes: Vec<[u8; 32]> = participants.iter().map(|p| *p.bytes()).collect();
    193     let participant_refs: Vec<&[u8; 32]> = participant_bytes.iter().collect();
    194     let filters = chatroom_filter(participant_refs, me.bytes());
    195 
    196     fold_note_keys(
    197         egui_ctx,
    198         ndb,
    199         msg_tx,
    200         &filters,
    201         FoldKind::ConversationMessages { conversation_id },
    202     )?;
    203 
    204     let _ = msg_tx.send(LoaderMsg::ConversationMessagesFinished { conversation_id });
    205     egui_ctx.request_repaint();
    206     Ok(())
    207 }
    208 
    209 /// Fold over NostrDB results and emit note key batches.
    210 fn fold_note_keys(
    211     egui_ctx: &egui::Context,
    212     ndb: &Ndb,
    213     msg_tx: &chan::Sender<LoaderMsg>,
    214     filters: &[Filter],
    215     kind: FoldKind,
    216 ) -> Result<(), String> {
    217     let txn = Transaction::new(ndb).map_err(|e| e.to_string())?;
    218 
    219     let acc = FoldAcc {
    220         batch: Vec::with_capacity(FOLD_BATCH_SIZE),
    221         msg_tx: msg_tx.clone(),
    222         egui_ctx: egui_ctx.clone(),
    223         kind,
    224     };
    225 
    226     let acc = ndb
    227         .fold(&txn, filters, acc, |mut acc, note| {
    228             if let Some(key) = note.key() {
    229                 if let Err(err) = acc.push_key(key) {
    230                     tracing::error!("messages loader flush error: {err}");
    231                 }
    232             }
    233             acc
    234         })
    235         .map_err(|e| e.to_string())?;
    236 
    237     let mut acc = acc;
    238     acc.flush()?;
    239     Ok(())
    240 }