notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

timeline_loader.rs (6071B)


      1 //! Background loader for initial timeline NostrDB queries.
      2 
      3 use crossbeam_channel as chan;
      4 use nostrdb::{Ndb, Transaction};
      5 use notedeck::{worker_count, AsyncLoader, FilterState, NoteRef};
      6 
      7 use crate::timeline::kind::AlgoTimeline;
      8 use crate::timeline::TimelineKind;
      9 
     10 use tracing::{info, warn};
     11 
     12 const FOLD_BATCH_SIZE: usize = 200;
     13 const MAX_TIMELINE_LOADER_WORKERS: usize = 4;
     14 
     15 /// Commands sent to the timeline loader worker thread.
     16 pub enum TimelineLoaderCmd {
     17     /// Load initial note refs for a timeline.
     18     LoadTimeline {
     19         /// Timeline identifier to apply batches to.
     20         kind: TimelineKind,
     21     },
     22 }
     23 
     24 /// Messages emitted by the timeline loader worker thread.
     25 pub enum TimelineLoaderMsg {
     26     /// Batch of note refs for a timeline.
     27     TimelineBatch {
     28         /// Timeline identifier to apply batches to.
     29         kind: TimelineKind,
     30         /// Note refs discovered by NostrDB fold.
     31         notes: Vec<NoteRef>,
     32     },
     33     /// Timeline initial load finished.
     34     TimelineFinished { kind: TimelineKind },
     35     /// Loader error for a timeline.
     36     Failed { kind: TimelineKind, error: String },
     37 }
     38 
     39 /// Handle for driving the timeline loader worker thread.
     40 pub struct TimelineLoader {
     41     loader: AsyncLoader<TimelineLoaderCmd, TimelineLoaderMsg>,
     42 }
     43 
     44 impl TimelineLoader {
     45     /// Create an uninitialized loader handle.
     46     pub fn new() -> Self {
     47         Self {
     48             loader: AsyncLoader::new(),
     49         }
     50     }
     51 
     52     /// Start the loader workers if they have not been started yet.
     53     pub fn start(&mut self, egui_ctx: egui::Context, ndb: Ndb) {
     54         let workers = worker_count(MAX_TIMELINE_LOADER_WORKERS);
     55         let started = self.loader.start(
     56             egui_ctx,
     57             ndb,
     58             workers,
     59             "columns-timeline-loader",
     60             handle_cmd,
     61         );
     62         if started {
     63             info!(workers, "starting timeline loader workers");
     64         }
     65     }
     66 
     67     /// Request an initial load for a timeline.
     68     pub fn load_timeline(&self, kind: TimelineKind) {
     69         self.loader.send(TimelineLoaderCmd::LoadTimeline { kind });
     70     }
     71 
     72     /// Try to receive the next loader message without blocking.
     73     pub fn try_recv(&self) -> Option<TimelineLoaderMsg> {
     74         self.loader.try_recv()
     75     }
     76 }
     77 
     78 impl Default for TimelineLoader {
     79     fn default() -> Self {
     80         Self::new()
     81     }
     82 }
     83 
     84 /// Handle loader commands on a worker thread.
     85 fn handle_cmd(
     86     cmd: TimelineLoaderCmd,
     87     egui_ctx: &egui::Context,
     88     ndb: &Ndb,
     89     msg_tx: &chan::Sender<TimelineLoaderMsg>,
     90 ) {
     91     let result = match cmd {
     92         TimelineLoaderCmd::LoadTimeline { kind } => load_timeline(egui_ctx, ndb, msg_tx, kind),
     93     };
     94 
     95     if let Err((kind, err)) = result {
     96         let _ = msg_tx.send(TimelineLoaderMsg::Failed { kind, error: err });
     97         egui_ctx.request_repaint();
     98     }
     99 }
    100 
    101 /// Fold accumulator for batching note refs.
    102 struct FoldAcc {
    103     batch: Vec<NoteRef>,
    104     msg_tx: chan::Sender<TimelineLoaderMsg>,
    105     egui_ctx: egui::Context,
    106     kind: TimelineKind,
    107 }
    108 
    109 impl FoldAcc {
    110     fn push_note(&mut self, note: NoteRef) -> Result<(), String> {
    111         self.batch.push(note);
    112         if self.batch.len() >= FOLD_BATCH_SIZE {
    113             self.flush()?;
    114         }
    115         Ok(())
    116     }
    117 
    118     fn flush(&mut self) -> Result<(), String> {
    119         if self.batch.is_empty() {
    120             return Ok(());
    121         }
    122 
    123         let notes = std::mem::take(&mut self.batch);
    124         self.msg_tx
    125             .send(TimelineLoaderMsg::TimelineBatch {
    126                 kind: self.kind.clone(),
    127                 notes,
    128             })
    129             .map_err(|_| "timeline loader channel closed".to_string())?;
    130         self.egui_ctx.request_repaint();
    131         Ok(())
    132     }
    133 }
    134 
    135 /// Run an initial timeline load and stream note ref batches.
    136 fn load_timeline(
    137     egui_ctx: &egui::Context,
    138     ndb: &Ndb,
    139     msg_tx: &chan::Sender<TimelineLoaderMsg>,
    140     kind: TimelineKind,
    141 ) -> Result<(), (TimelineKind, String)> {
    142     let txn = Transaction::new(ndb).map_err(|e| (kind.clone(), e.to_string()))?;
    143     let filter_state = kind.filters(&txn, ndb);
    144     let FilterState::Ready(filters) = filter_state else {
    145         warn!(?kind, "timeline loader filter not ready");
    146         return Err((kind, "timeline filter not ready".to_string()));
    147     };
    148 
    149     let mut acc = FoldAcc {
    150         batch: Vec::with_capacity(FOLD_BATCH_SIZE),
    151         msg_tx: msg_tx.clone(),
    152         egui_ctx: egui_ctx.clone(),
    153         kind: kind.clone(),
    154     };
    155 
    156     let use_query = matches!(kind, TimelineKind::Algo(AlgoTimeline::LastPerPubkey(_)));
    157 
    158     for package in filters.local().packages {
    159         if package.filters.is_empty() {
    160             warn!(?kind, "timeline loader received empty filter package");
    161         }
    162 
    163         if use_query {
    164             let mut lim = 0i32;
    165             for filter in package.filters {
    166                 lim += filter.limit().unwrap_or(1) as i32;
    167             }
    168 
    169             let cur_notes: Vec<NoteRef> = ndb
    170                 .query(&txn, package.filters, lim)
    171                 .map_err(|e| (kind.clone(), e.to_string()))?
    172                 .into_iter()
    173                 .map(NoteRef::from_query_result)
    174                 .collect();
    175             for note_ref in cur_notes {
    176                 if let Err(err) = acc.push_note(note_ref) {
    177                     tracing::error!("timeline loader push error: {err}");
    178                 }
    179             }
    180             continue;
    181         }
    182 
    183         let fold_result = ndb.fold(&txn, package.filters, acc, |mut acc, note| {
    184             if let Some(key) = note.key() {
    185                 let note_ref = NoteRef {
    186                     key,
    187                     created_at: note.created_at(),
    188                 };
    189                 if let Err(err) = acc.push_note(note_ref) {
    190                     tracing::error!("timeline loader flush error: {err}");
    191                 }
    192             }
    193             acc
    194         });
    195 
    196         acc = fold_result.map_err(|e| (kind.clone(), e.to_string()))?;
    197     }
    198 
    199     acc.flush().map_err(|e| (kind.clone(), e))?;
    200     let _ = msg_tx.send(TimelineLoaderMsg::TimelineFinished { kind });
    201     egui_ctx.request_repaint();
    202     Ok(())
    203 }