notedeck

One damus client to rule them all
git clone git://jb55.com/notedeck
Log | Files | Refs | README | LICENSE

lib.rs (15673B)


      1 use enostr::Pubkey;
      2 use nostrdb::Note;
      3 use rustc_hash::FxHashMap;
      4 use std::thread;
      5 use std::time::{Duration, Instant};
      6 
      7 use crossbeam_channel as chan;
      8 
      9 use nostrdb::{Filter, Ndb, Transaction};
     10 use notedeck::{AppContext, AppResponse, try_process_events_core};
     11 
     12 use chrono::{Datelike, TimeZone, Utc};
     13 
     14 mod chart;
     15 mod sparkline;
     16 mod ui;
     17 
     18 // ----------------------
     19 // Worker protocol
     20 // ----------------------
     21 
     22 #[derive(Debug)]
     23 enum WorkerCmd {
     24     Refresh,
     25     //Quit,
     26 }
     27 
     28 // Buckets are multiples of time ranges
     29 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
     30 pub enum Period {
     31     Daily,
     32     Weekly,
     33     Monthly,
     34 }
     35 
     36 impl Period {
     37     pub const ALL: [Period; 3] = [Period::Daily, Period::Weekly, Period::Monthly];
     38 
     39     pub fn label(self) -> &'static str {
     40         match self {
     41             Period::Daily => "day",
     42             Period::Weekly => "week",
     43             Period::Monthly => "month",
     44         }
     45     }
     46 }
     47 
     48 /// All the data we are interested in for a specific range
     49 #[derive(Default, Clone, Debug)]
     50 struct Bucket {
     51     pub total: u64,
     52     pub kinds: rustc_hash::FxHashMap<u64, u32>,
     53     pub clients: rustc_hash::FxHashMap<String, u32>,
     54     pub kind1_authors: rustc_hash::FxHashMap<Pubkey, u32>,
     55 }
     56 
     57 fn note_client_tag<'a>(note: &Note<'a>) -> Option<&'a str> {
     58     for tag in note.tags() {
     59         if tag.count() < 2 {
     60             continue;
     61         }
     62 
     63         let Some("client") = tag.get_str(0) else {
     64             continue;
     65         };
     66 
     67         return tag.get_str(1);
     68     }
     69 
     70     None
     71 }
     72 
     73 impl Bucket {
     74     #[inline(always)]
     75     pub fn bump(&mut self, note: &Note<'_>) {
     76         self.total += 1;
     77         let kind = note.kind();
     78         *self.kinds.entry(kind as u64).or_default() += 1;
     79 
     80         // Track kind1 authors
     81         if kind == 1 {
     82             let pk = Pubkey::new(*note.pubkey());
     83             *self.kind1_authors.entry(pk).or_default() += 1;
     84         }
     85 
     86         if let Some(client) = note_client_tag(note) {
     87             *self.clients.entry(client.to_string()).or_default() += 1;
     88         } else {
     89             // TODO(jb55): client fingerprinting ?
     90         }
     91     }
     92 }
     93 
     94 // bucket_end_ts(idx) - self.bucket_size_secs
     95 #[derive(Debug, Clone, Default)]
     96 struct RollingCache {
     97     pub bucket_size_secs: i64,
     98     pub anchor_end_ts: i64,
     99     pub buckets: Vec<Bucket>,
    100 }
    101 
    102 impl RollingCache {
    103     pub fn bucket_end_ts(&self, idx: usize) -> i64 {
    104         self.anchor_end_ts - (idx as i64) * self.bucket_size_secs
    105     }
    106 
    107     pub fn bucket_start_ts(&self, idx: usize) -> i64 {
    108         self.bucket_end_ts(idx) - self.bucket_size_secs
    109     }
    110 
    111     pub fn daily(now_ts: i64, days: usize) -> Self {
    112         let day_anchor = next_midnight_utc(now_ts);
    113 
    114         Self {
    115             bucket_size_secs: 86_400,
    116             anchor_end_ts: day_anchor,
    117             buckets: vec![Bucket::default(); days],
    118         }
    119     }
    120 
    121     pub fn weekly(now_ts: i64, weeks: usize, week_starts_monday: bool) -> Self {
    122         let anchor_end_ts = next_week_boundary_utc(now_ts, week_starts_monday);
    123         Self {
    124             bucket_size_secs: 7 * 86_400,
    125             anchor_end_ts,
    126             buckets: vec![Bucket::default(); weeks],
    127         }
    128     }
    129 
    130     // “month-ish” (30d buckets) but aligned so bucket 0 ends at the next month boundary
    131     pub fn monthly_30d(now_ts: i64, months: usize) -> Self {
    132         let anchor_end_ts = next_month_boundary_utc(now_ts);
    133         Self {
    134             bucket_size_secs: 30 * 86_400,
    135             anchor_end_ts,
    136             buckets: vec![Bucket::default(); months],
    137         }
    138     }
    139 
    140     #[inline(always)]
    141     pub fn bump(&mut self, note: &Note<'_>) {
    142         let ts = note.created_at() as i64;
    143 
    144         // bucket windows are [end-(i+1)*size, end-i*size)
    145         // so treat `end` itself as "future"
    146         let delta = (self.anchor_end_ts - 1) - ts;
    147 
    148         if delta < 0 {
    149             return; // ignore future timestamps
    150         }
    151 
    152         let idx = (delta / self.bucket_size_secs) as usize;
    153         if idx >= self.buckets.len() {
    154             return; // outside window
    155         }
    156 
    157         self.buckets[idx].bump(note);
    158     }
    159 }
    160 
    161 #[derive(Clone, Debug, Default)]
    162 struct DashboardState {
    163     total: Bucket,
    164     daily: RollingCache,
    165     weekly: RollingCache,
    166     monthly: RollingCache,
    167 }
    168 
    169 #[derive(Debug, Clone)]
    170 struct Snapshot {
    171     started_at: Instant,
    172     snapshot_at: Instant,
    173     state: DashboardState,
    174 }
    175 
    176 #[derive(Debug)]
    177 enum WorkerMsg {
    178     Snapshot(Snapshot),
    179     Finished {
    180         started_at: Instant,
    181         finished_at: Instant,
    182         state: DashboardState,
    183     },
    184     Failed {
    185         started_at: Instant,
    186         finished_at: Instant,
    187         error: String,
    188     },
    189 }
    190 
    191 // ----------------------
    192 // Dashboard (single pass, single worker)
    193 // ----------------------
    194 
    195 pub struct Dashboard {
    196     initialized: bool,
    197 
    198     // Worker channels
    199     cmd_tx: Option<chan::Sender<WorkerCmd>>,
    200     msg_rx: Option<chan::Receiver<WorkerMsg>>,
    201 
    202     // Refresh policy
    203     refresh_every: Duration,
    204     next_tick: Instant,
    205 
    206     // Global UI controls
    207     period: Period,
    208 
    209     // UI state (progressively filled via snapshots)
    210     running: bool,
    211 
    212     last_started: Option<Instant>,
    213     last_snapshot: Option<Instant>,
    214     last_finished: Option<Instant>,
    215     last_duration: Option<Duration>,
    216     last_error: Option<String>,
    217 
    218     state: DashboardState,
    219 }
    220 
    221 impl Default for Dashboard {
    222     fn default() -> Self {
    223         Self {
    224             initialized: false,
    225 
    226             period: Period::Weekly,
    227 
    228             cmd_tx: None,
    229             msg_rx: None,
    230 
    231             refresh_every: Duration::from_secs(300),
    232             next_tick: Instant::now(),
    233 
    234             running: false,
    235             last_started: None,
    236             last_snapshot: None,
    237             last_finished: None,
    238             last_duration: None,
    239             last_error: None,
    240 
    241             state: DashboardState::default(),
    242         }
    243     }
    244 }
    245 
    246 impl notedeck::App for Dashboard {
    247     fn update(&mut self, ctx: &mut AppContext<'_>, ui: &mut egui::Ui) -> AppResponse {
    248         try_process_events_core(ctx, ui.ctx(), |_, _| {});
    249 
    250         if !self.initialized {
    251             self.initialized = true;
    252             self.init(ui.ctx().clone(), ctx);
    253         }
    254 
    255         self.process_worker_msgs();
    256         self.schedule_refresh();
    257 
    258         self.show(ui, ctx);
    259 
    260         AppResponse::none()
    261     }
    262 }
    263 
    264 impl Dashboard {
    265     fn selected_cache(&self) -> &RollingCache {
    266         match self.period {
    267             Period::Daily => &self.state.daily,
    268             Period::Weekly => &self.state.weekly,
    269             Period::Monthly => &self.state.monthly,
    270         }
    271     }
    272 
    273     fn init(&mut self, egui_ctx: egui::Context, ctx: &mut AppContext<'_>) {
    274         // spawn single worker thread and keep it alive
    275         let (cmd_tx, cmd_rx) = chan::unbounded::<WorkerCmd>();
    276         let (msg_tx, msg_rx) = chan::unbounded::<WorkerMsg>();
    277 
    278         self.cmd_tx = Some(cmd_tx.clone());
    279         self.msg_rx = Some(msg_rx);
    280 
    281         // Clone the DB handle into the worker thread (Ndb is typically cheap/cloneable)
    282         let ndb = ctx.ndb.clone();
    283 
    284         spawn_worker(egui_ctx, ndb, cmd_rx, msg_tx);
    285 
    286         // kick the first run immediately
    287         let _ = cmd_tx.send(WorkerCmd::Refresh);
    288         self.running = true;
    289         self.last_error = None;
    290         self.last_started = Some(Instant::now());
    291         self.last_snapshot = None;
    292         self.last_finished = None;
    293         self.last_duration = None;
    294         self.state = DashboardState::default();
    295     }
    296 
    297     fn process_worker_msgs(&mut self) {
    298         let Some(rx) = &self.msg_rx else { return };
    299 
    300         let mut got_any = false;
    301 
    302         while let Ok(msg) = rx.try_recv() {
    303             got_any = true;
    304             match msg {
    305                 WorkerMsg::Snapshot(s) => {
    306                     self.running = true;
    307                     self.last_started = Some(s.started_at);
    308                     self.last_snapshot = Some(s.snapshot_at);
    309                     self.last_error = None;
    310 
    311                     self.state = s.state;
    312                 }
    313                 WorkerMsg::Finished {
    314                     started_at,
    315                     finished_at,
    316                     state,
    317                 } => {
    318                     self.running = false;
    319                     self.last_started = Some(started_at);
    320                     self.last_snapshot = Some(finished_at);
    321                     self.last_finished = Some(finished_at);
    322                     self.last_duration = Some(finished_at.saturating_duration_since(started_at));
    323                     self.last_error = None;
    324 
    325                     self.state = state;
    326                 }
    327                 WorkerMsg::Failed {
    328                     started_at,
    329                     finished_at,
    330                     error,
    331                 } => {
    332                     self.running = false;
    333                     self.last_started = Some(started_at);
    334                     self.last_snapshot = Some(finished_at);
    335                     self.last_finished = Some(finished_at);
    336                     self.last_duration = Some(finished_at.saturating_duration_since(started_at));
    337                     self.last_error = Some(error);
    338                 }
    339             }
    340         }
    341 
    342         if got_any {
    343             // No-op; we already requested repaint on every message.
    344         }
    345     }
    346 
    347     fn schedule_refresh(&mut self) {
    348         // throttle scheduling checks a bit
    349         let now = Instant::now();
    350         if now < self.next_tick {
    351             return;
    352         }
    353         self.next_tick = now + Duration::from_millis(200);
    354 
    355         if self.running {
    356             return;
    357         }
    358 
    359         // refresh every 30 seconds from the last finished time (or from init)
    360         let last = self
    361             .last_finished
    362             .or(self.last_started)
    363             .unwrap_or_else(Instant::now);
    364 
    365         if now.saturating_duration_since(last) >= self.refresh_every
    366             && let Some(tx) = &self.cmd_tx
    367         {
    368             // reset UI fields for progressive load, but keep old values visible until snapshots arrive
    369             self.running = true;
    370             self.last_error = None;
    371             self.last_started = Some(now);
    372             self.last_snapshot = None;
    373             self.last_finished = None;
    374             self.last_duration = None;
    375             self.state = DashboardState::default();
    376 
    377             let _ = tx.send(WorkerCmd::Refresh);
    378         }
    379     }
    380 
    381     fn show(&mut self, ui: &mut egui::Ui, ctx: &mut AppContext<'_>) {
    382         crate::ui::dashboard_ui(self, ui, ctx);
    383     }
    384 }
    385 
    386 // ----------------------
    387 // Worker side (single pass, periodic snapshots)
    388 // ----------------------
    389 
    390 fn spawn_worker(
    391     ctx: egui::Context,
    392     ndb: Ndb,
    393     cmd_rx: chan::Receiver<WorkerCmd>,
    394     msg_tx: chan::Sender<WorkerMsg>,
    395 ) {
    396     thread::Builder::new()
    397         .name("dashboard-worker".to_owned())
    398         .spawn(move || {
    399             let mut should_quit = false;
    400 
    401             while !should_quit {
    402                 match cmd_rx.recv() {
    403                     Ok(WorkerCmd::Refresh) => {
    404                         let started_at = Instant::now();
    405 
    406                         match materialize_single_pass(&ctx, &ndb, &msg_tx, started_at) {
    407                             Ok(state) => {
    408                                 let _ = msg_tx.send(WorkerMsg::Finished {
    409                                     started_at,
    410                                     finished_at: Instant::now(),
    411                                     state,
    412                                 });
    413                             }
    414                             Err(e) => {
    415                                 let _ = msg_tx.send(WorkerMsg::Failed {
    416                                     started_at,
    417                                     finished_at: Instant::now(),
    418                                     error: format!("{e:?}"),
    419                                 });
    420                             }
    421                         }
    422                     }
    423                     Err(_) => {
    424                         should_quit = true;
    425                     }
    426                 }
    427             }
    428         })
    429         .expect("failed to spawn dashboard worker thread");
    430 }
    431 
    432 struct Acc {
    433     last_emit: Instant,
    434 
    435     state: DashboardState,
    436 }
    437 
    438 fn materialize_single_pass(
    439     ctx: &egui::Context,
    440     ndb: &Ndb,
    441     msg_tx: &chan::Sender<WorkerMsg>,
    442     started_at: Instant,
    443 ) -> Result<DashboardState, nostrdb::Error> {
    444     // one transaction per refresh run
    445     let txn = Transaction::new(ndb)?;
    446 
    447     // all notes
    448     let filters = vec![Filter::new_with_capacity(1).build()];
    449 
    450     let days = 14;
    451     let weeks = 12;
    452     let months = 12;
    453     let week_starts_monday = true;
    454 
    455     let now = Utc::now().timestamp();
    456 
    457     let mut acc = Acc {
    458         last_emit: Instant::now(),
    459         state: DashboardState {
    460             total: Bucket::default(),
    461             daily: RollingCache::daily(now, days),
    462             weekly: RollingCache::weekly(now, weeks, week_starts_monday),
    463             monthly: RollingCache::monthly_30d(now, months),
    464         },
    465     };
    466 
    467     let emit_every = Duration::from_millis(32);
    468 
    469     let _ = ndb.fold(&txn, &filters, &mut acc, |acc, note| {
    470         acc.state.total.bump(&note);
    471         acc.state.daily.bump(&note);
    472         acc.state.weekly.bump(&note);
    473         acc.state.monthly.bump(&note);
    474 
    475         let now = Instant::now();
    476         if now.saturating_duration_since(acc.last_emit) >= emit_every {
    477             acc.last_emit = now;
    478 
    479             let _ = msg_tx.send(WorkerMsg::Snapshot(Snapshot {
    480                 started_at,
    481                 snapshot_at: now,
    482                 state: acc.state.clone(),
    483             }));
    484 
    485             ctx.request_repaint();
    486         }
    487 
    488         acc
    489     });
    490 
    491     Ok(acc.state)
    492 }
    493 
    494 fn next_midnight_utc(now_ts: i64) -> i64 {
    495     let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap();
    496     let tomorrow = dt.date_naive().succ_opt().unwrap();
    497     Utc.from_utc_datetime(&tomorrow.and_hms_opt(0, 0, 0).unwrap())
    498         .timestamp()
    499 }
    500 
    501 fn next_week_boundary_utc(now_ts: i64, starts_monday: bool) -> i64 {
    502     let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap();
    503     let today = dt.date_naive();
    504 
    505     let start = if starts_monday {
    506         chrono::Weekday::Mon
    507     } else {
    508         chrono::Weekday::Sun
    509     };
    510     let weekday = today.weekday();
    511 
    512     // days until next week start (0..6); if today is start, boundary is next week start (7 days)
    513     let mut delta =
    514         (7 + (start.num_days_from_monday() as i32) - (weekday.num_days_from_monday() as i32)) % 7;
    515     if delta == 0 {
    516         delta = 7;
    517     }
    518 
    519     let next = today + chrono::Duration::days(delta as i64);
    520     Utc.from_utc_datetime(&next.and_hms_opt(0, 0, 0).unwrap())
    521         .timestamp()
    522 }
    523 
    524 fn next_month_boundary_utc(now_ts: i64) -> i64 {
    525     let dt = Utc.timestamp_opt(now_ts, 0).single().unwrap();
    526     let y = dt.year();
    527     let m = dt.month();
    528 
    529     let (ny, nm) = if m == 12 { (y + 1, 1) } else { (y, m + 1) };
    530     Utc.with_ymd_and_hms(ny, nm, 1, 0, 0, 0)
    531         .single()
    532         .unwrap()
    533         .timestamp()
    534 }
    535 
    536 fn top_kinds_over(cache: &RollingCache, limit: usize) -> Vec<(u64, u64)> {
    537     let mut agg: FxHashMap<u64, u64> = Default::default();
    538 
    539     for b in &cache.buckets {
    540         for (kind, count) in &b.kinds {
    541             *agg.entry(*kind).or_default() += *count as u64;
    542         }
    543     }
    544 
    545     let mut v: Vec<_> = agg.into_iter().collect();
    546     v.sort_unstable_by(|a, b| b.1.cmp(&a.1).then_with(|| a.0.cmp(&b.0)));
    547     v.truncate(limit);
    548     v
    549 }
    550 
    551 pub(crate) fn top_kind1_authors_over(cache: &RollingCache, limit: usize) -> Vec<(Pubkey, u64)> {
    552     let mut agg: FxHashMap<Pubkey, u64> = Default::default();
    553     for b in &cache.buckets {
    554         for (pubkey, count) in &b.kind1_authors {
    555             *agg.entry(*pubkey).or_default() += *count as u64;
    556         }
    557     }
    558     let mut v: Vec<_> = agg.into_iter().collect();
    559     v.sort_unstable_by(|a, b| b.1.cmp(&a.1));
    560     v.truncate(limit);
    561     v
    562 }