timeline_loader.rs (6071B)
1 //! Background loader for initial timeline NostrDB queries. 2 3 use crossbeam_channel as chan; 4 use nostrdb::{Ndb, Transaction}; 5 use notedeck::{worker_count, AsyncLoader, FilterState, NoteRef}; 6 7 use crate::timeline::kind::AlgoTimeline; 8 use crate::timeline::TimelineKind; 9 10 use tracing::{info, warn}; 11 12 const FOLD_BATCH_SIZE: usize = 200; 13 const MAX_TIMELINE_LOADER_WORKERS: usize = 4; 14 15 /// Commands sent to the timeline loader worker thread. 16 pub enum TimelineLoaderCmd { 17 /// Load initial note refs for a timeline. 18 LoadTimeline { 19 /// Timeline identifier to apply batches to. 20 kind: TimelineKind, 21 }, 22 } 23 24 /// Messages emitted by the timeline loader worker thread. 25 pub enum TimelineLoaderMsg { 26 /// Batch of note refs for a timeline. 27 TimelineBatch { 28 /// Timeline identifier to apply batches to. 29 kind: TimelineKind, 30 /// Note refs discovered by NostrDB fold. 31 notes: Vec<NoteRef>, 32 }, 33 /// Timeline initial load finished. 34 TimelineFinished { kind: TimelineKind }, 35 /// Loader error for a timeline. 36 Failed { kind: TimelineKind, error: String }, 37 } 38 39 /// Handle for driving the timeline loader worker thread. 40 pub struct TimelineLoader { 41 loader: AsyncLoader<TimelineLoaderCmd, TimelineLoaderMsg>, 42 } 43 44 impl TimelineLoader { 45 /// Create an uninitialized loader handle. 46 pub fn new() -> Self { 47 Self { 48 loader: AsyncLoader::new(), 49 } 50 } 51 52 /// Start the loader workers if they have not been started yet. 53 pub fn start(&mut self, egui_ctx: egui::Context, ndb: Ndb) { 54 let workers = worker_count(MAX_TIMELINE_LOADER_WORKERS); 55 let started = self.loader.start( 56 egui_ctx, 57 ndb, 58 workers, 59 "columns-timeline-loader", 60 handle_cmd, 61 ); 62 if started { 63 info!(workers, "starting timeline loader workers"); 64 } 65 } 66 67 /// Request an initial load for a timeline. 68 pub fn load_timeline(&self, kind: TimelineKind) { 69 self.loader.send(TimelineLoaderCmd::LoadTimeline { kind }); 70 } 71 72 /// Try to receive the next loader message without blocking. 73 pub fn try_recv(&self) -> Option<TimelineLoaderMsg> { 74 self.loader.try_recv() 75 } 76 } 77 78 impl Default for TimelineLoader { 79 fn default() -> Self { 80 Self::new() 81 } 82 } 83 84 /// Handle loader commands on a worker thread. 85 fn handle_cmd( 86 cmd: TimelineLoaderCmd, 87 egui_ctx: &egui::Context, 88 ndb: &Ndb, 89 msg_tx: &chan::Sender<TimelineLoaderMsg>, 90 ) { 91 let result = match cmd { 92 TimelineLoaderCmd::LoadTimeline { kind } => load_timeline(egui_ctx, ndb, msg_tx, kind), 93 }; 94 95 if let Err((kind, err)) = result { 96 let _ = msg_tx.send(TimelineLoaderMsg::Failed { kind, error: err }); 97 egui_ctx.request_repaint(); 98 } 99 } 100 101 /// Fold accumulator for batching note refs. 102 struct FoldAcc { 103 batch: Vec<NoteRef>, 104 msg_tx: chan::Sender<TimelineLoaderMsg>, 105 egui_ctx: egui::Context, 106 kind: TimelineKind, 107 } 108 109 impl FoldAcc { 110 fn push_note(&mut self, note: NoteRef) -> Result<(), String> { 111 self.batch.push(note); 112 if self.batch.len() >= FOLD_BATCH_SIZE { 113 self.flush()?; 114 } 115 Ok(()) 116 } 117 118 fn flush(&mut self) -> Result<(), String> { 119 if self.batch.is_empty() { 120 return Ok(()); 121 } 122 123 let notes = std::mem::take(&mut self.batch); 124 self.msg_tx 125 .send(TimelineLoaderMsg::TimelineBatch { 126 kind: self.kind.clone(), 127 notes, 128 }) 129 .map_err(|_| "timeline loader channel closed".to_string())?; 130 self.egui_ctx.request_repaint(); 131 Ok(()) 132 } 133 } 134 135 /// Run an initial timeline load and stream note ref batches. 136 fn load_timeline( 137 egui_ctx: &egui::Context, 138 ndb: &Ndb, 139 msg_tx: &chan::Sender<TimelineLoaderMsg>, 140 kind: TimelineKind, 141 ) -> Result<(), (TimelineKind, String)> { 142 let txn = Transaction::new(ndb).map_err(|e| (kind.clone(), e.to_string()))?; 143 let filter_state = kind.filters(&txn, ndb); 144 let FilterState::Ready(filters) = filter_state else { 145 warn!(?kind, "timeline loader filter not ready"); 146 return Err((kind, "timeline filter not ready".to_string())); 147 }; 148 149 let mut acc = FoldAcc { 150 batch: Vec::with_capacity(FOLD_BATCH_SIZE), 151 msg_tx: msg_tx.clone(), 152 egui_ctx: egui_ctx.clone(), 153 kind: kind.clone(), 154 }; 155 156 let use_query = matches!(kind, TimelineKind::Algo(AlgoTimeline::LastPerPubkey(_))); 157 158 for package in filters.local().packages { 159 if package.filters.is_empty() { 160 warn!(?kind, "timeline loader received empty filter package"); 161 } 162 163 if use_query { 164 let mut lim = 0i32; 165 for filter in package.filters { 166 lim += filter.limit().unwrap_or(1) as i32; 167 } 168 169 let cur_notes: Vec<NoteRef> = ndb 170 .query(&txn, package.filters, lim) 171 .map_err(|e| (kind.clone(), e.to_string()))? 172 .into_iter() 173 .map(NoteRef::from_query_result) 174 .collect(); 175 for note_ref in cur_notes { 176 if let Err(err) = acc.push_note(note_ref) { 177 tracing::error!("timeline loader push error: {err}"); 178 } 179 } 180 continue; 181 } 182 183 let fold_result = ndb.fold(&txn, package.filters, acc, |mut acc, note| { 184 if let Some(key) = note.key() { 185 let note_ref = NoteRef { 186 key, 187 created_at: note.created_at(), 188 }; 189 if let Err(err) = acc.push_note(note_ref) { 190 tracing::error!("timeline loader flush error: {err}"); 191 } 192 } 193 acc 194 }); 195 196 acc = fold_result.map_err(|e| (kind.clone(), e.to_string()))?; 197 } 198 199 acc.flush().map_err(|e| (kind.clone(), e))?; 200 let _ = msg_tx.send(TimelineLoaderMsg::TimelineFinished { kind }); 201 egui_ctx.request_repaint(); 202 Ok(()) 203 }